#!/usr/bin/env python3 # READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql """ CMS Provider of Services (POS) file — quarterly bulk CSV with every Medicare-certified facility including provider category (IPFs, PRTFs, etc.), bed counts, certification date, and termination date. Critical for closure/opening tracking used in composite_score.capacity_trend. """ import csv import io import logging import sys import zipfile from datetime import datetime from _common import RateLimitedSession, bulk_insert, job_run LOG = logging.getLogger("bhi.cms_pos") CATALOG_URL = "https://data.cms.gov/data.json" def test_endpoint(): s = RateLimitedSession() r = s.get(CATALOG_URL).json() pos = [d for d in r.get("dataset", []) if "provider of services" in d.get("title", "").lower()] print(f"OK: {len(pos)} POS datasets in catalog") for d in pos[:3]: print(" -", d.get("title")) return len(pos) > 0 def _latest_pos_distribution(): s = RateLimitedSession(min_interval=0.3) r = s.get(CATALOG_URL).json() pos = [d for d in r.get("dataset", []) if "provider of services" in d.get("title", "").lower() and "hospital" in d.get("title", "").lower()] if not pos: return None latest = max(pos, key=lambda d: d.get("modified", "")) for dist in latest.get("distribution", []): url = dist.get("downloadURL") or dist.get("accessURL", "") if url.endswith((".zip", ".csv")): return url return None def fetch_rows(): url = _latest_pos_distribution() if not url: LOG.error("Could not resolve POS download URL") return [] LOG.info("fetching POS: %s", url) s = RateLimitedSession(min_interval=0.5) r = s.get(url) content = r.content if url.endswith(".zip"): z = zipfile.ZipFile(io.BytesIO(content)) csvname = next((n for n in z.namelist() if n.lower().endswith(".csv")), None) with z.open(csvname) as f: text = io.TextIOWrapper(f, encoding="latin-1").read() else: text = content.decode("latin-1", errors="replace") reader = csv.DictReader(io.StringIO(text)) # Filter to psychiatric + BH provider categories # CMS PRVDR_CTGRY_CD: 04 = psych hospital, sub-category variations keep = [] for row in reader: cat = row.get("PRVDR_CTGRY_CD") or row.get("prvdr_ctgry_cd") or "" subcat = row.get("PRVDR_CTGRY_SBTYP_CD") or row.get("prvdr_ctgry_sbtyp_cd") or "" if cat in ("04",) or "psych" in (row.get("FAC_NAME", "") + row.get("fac_name", "")).lower(): keep.append(row) LOG.info("filtered POS to %d BH-relevant rows", len(keep)) return keep def _parse_date(s): if not s: return None for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y%m%d"): try: return datetime.strptime(s, fmt).date() except ValueError: continue return None def _num(v): try: return int(float(v)) if v not in (None, "") else None except (TypeError, ValueError): return None def write_rows(conn, raw): cols = [ "ccn","npi","name","address","city","state","zip","county_fips", "lat","lon","facility_type","ownership","bed_count","psych_bed_count", "pediatric_psych_bed_count","adolescent_unit","young_adult_unit", "services_offered","populations_served","payment_accepted", "medicaid_accepted","accreditation","opened_date","closed_date", "last_verified","source","source_raw_id", ] rows = [] for r in raw: def g(*keys): for k in keys: v = r.get(k) or r.get(k.lower()) if v: return v return None rows.append(( g("PRVDR_NUM", "prvdr_num"), None, g("FAC_NAME", "fac_name"), g("ST_ADR", "st_adr"), g("CITY_NAME", "city_name"), g("STATE_CD", "state_cd"), g("ZIP_CD", "zip_cd"), None, None, None, "IPF", g("GNRL_CNTL_TYPE_CD", "gnrl_cntl_type_cd"), _num(g("BED_CNT", "bed_cnt")), _num(g("CRTFD_BED_CNT", "crtfd_bed_cnt")), None, None, None, [], [], [], None, None, _parse_date(g("ORGNL_PRTCPTN_DT", "orgnl_prtcptn_dt")), _parse_date(g("TRMNTN_EXPRTN_DT", "trmntn_exprtn_dt")), None, "cms_pos", None, )) bulk_insert(conn, "bhi_facilities", cols, rows) return len(rows) def main(): with job_run("bhi_cms_pos") as (conn, _): n = write_rows(conn, fetch_rows()) LOG.info("inserted %d", n) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "test": sys.exit(0 if test_endpoint() else 1) main()