BHI layer v1: docs, schema, Phase A ingestion stubs
This commit is contained in:
143
jobs/ingestion/cms_pos.py
Normal file
143
jobs/ingestion/cms_pos.py
Normal file
@@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env python3
|
||||
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
|
||||
"""
|
||||
CMS Provider of Services (POS) file — quarterly bulk CSV with every
|
||||
Medicare-certified facility including provider category (IPFs, PRTFs, etc.),
|
||||
bed counts, certification date, and termination date. Critical for
|
||||
closure/opening tracking used in composite_score.capacity_trend.
|
||||
"""
|
||||
import csv
|
||||
import io
|
||||
import logging
|
||||
import sys
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from _common import RateLimitedSession, bulk_insert, job_run
|
||||
|
||||
LOG = logging.getLogger("bhi.cms_pos")
|
||||
CATALOG_URL = "https://data.cms.gov/data.json"
|
||||
|
||||
|
||||
def test_endpoint():
|
||||
s = RateLimitedSession()
|
||||
r = s.get(CATALOG_URL).json()
|
||||
pos = [d for d in r.get("dataset", []) if "provider of services" in d.get("title", "").lower()]
|
||||
print(f"OK: {len(pos)} POS datasets in catalog")
|
||||
for d in pos[:3]:
|
||||
print(" -", d.get("title"))
|
||||
return len(pos) > 0
|
||||
|
||||
|
||||
def _latest_pos_distribution():
|
||||
s = RateLimitedSession(min_interval=0.3)
|
||||
r = s.get(CATALOG_URL).json()
|
||||
pos = [d for d in r.get("dataset", [])
|
||||
if "provider of services" in d.get("title", "").lower()
|
||||
and "hospital" in d.get("title", "").lower()]
|
||||
if not pos:
|
||||
return None
|
||||
latest = max(pos, key=lambda d: d.get("modified", ""))
|
||||
for dist in latest.get("distribution", []):
|
||||
url = dist.get("downloadURL") or dist.get("accessURL", "")
|
||||
if url.endswith((".zip", ".csv")):
|
||||
return url
|
||||
return None
|
||||
|
||||
|
||||
def fetch_rows():
|
||||
url = _latest_pos_distribution()
|
||||
if not url:
|
||||
LOG.error("Could not resolve POS download URL")
|
||||
return []
|
||||
LOG.info("fetching POS: %s", url)
|
||||
s = RateLimitedSession(min_interval=0.5)
|
||||
r = s.get(url)
|
||||
content = r.content
|
||||
if url.endswith(".zip"):
|
||||
z = zipfile.ZipFile(io.BytesIO(content))
|
||||
csvname = next((n for n in z.namelist() if n.lower().endswith(".csv")), None)
|
||||
with z.open(csvname) as f:
|
||||
text = io.TextIOWrapper(f, encoding="latin-1").read()
|
||||
else:
|
||||
text = content.decode("latin-1", errors="replace")
|
||||
reader = csv.DictReader(io.StringIO(text))
|
||||
# Filter to psychiatric + BH provider categories
|
||||
# CMS PRVDR_CTGRY_CD: 04 = psych hospital, sub-category variations
|
||||
keep = []
|
||||
for row in reader:
|
||||
cat = row.get("PRVDR_CTGRY_CD") or row.get("prvdr_ctgry_cd") or ""
|
||||
subcat = row.get("PRVDR_CTGRY_SBTYP_CD") or row.get("prvdr_ctgry_sbtyp_cd") or ""
|
||||
if cat in ("04",) or "psych" in (row.get("FAC_NAME", "") + row.get("fac_name", "")).lower():
|
||||
keep.append(row)
|
||||
LOG.info("filtered POS to %d BH-relevant rows", len(keep))
|
||||
return keep
|
||||
|
||||
|
||||
def _parse_date(s):
|
||||
if not s:
|
||||
return None
|
||||
for fmt in ("%Y-%m-%d", "%m/%d/%Y", "%Y%m%d"):
|
||||
try:
|
||||
return datetime.strptime(s, fmt).date()
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def _num(v):
|
||||
try:
|
||||
return int(float(v)) if v not in (None, "") else None
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def write_rows(conn, raw):
|
||||
cols = [
|
||||
"ccn","npi","name","address","city","state","zip","county_fips",
|
||||
"lat","lon","facility_type","ownership","bed_count","psych_bed_count",
|
||||
"pediatric_psych_bed_count","adolescent_unit","young_adult_unit",
|
||||
"services_offered","populations_served","payment_accepted",
|
||||
"medicaid_accepted","accreditation","opened_date","closed_date",
|
||||
"last_verified","source","source_raw_id",
|
||||
]
|
||||
rows = []
|
||||
for r in raw:
|
||||
def g(*keys):
|
||||
for k in keys:
|
||||
v = r.get(k) or r.get(k.lower())
|
||||
if v:
|
||||
return v
|
||||
return None
|
||||
rows.append((
|
||||
g("PRVDR_NUM", "prvdr_num"), None,
|
||||
g("FAC_NAME", "fac_name"),
|
||||
g("ST_ADR", "st_adr"),
|
||||
g("CITY_NAME", "city_name"),
|
||||
g("STATE_CD", "state_cd"),
|
||||
g("ZIP_CD", "zip_cd"),
|
||||
None, None, None,
|
||||
"IPF",
|
||||
g("GNRL_CNTL_TYPE_CD", "gnrl_cntl_type_cd"),
|
||||
_num(g("BED_CNT", "bed_cnt")),
|
||||
_num(g("CRTFD_BED_CNT", "crtfd_bed_cnt")),
|
||||
None, None, None,
|
||||
[], [], [], None, None,
|
||||
_parse_date(g("ORGNL_PRTCPTN_DT", "orgnl_prtcptn_dt")),
|
||||
_parse_date(g("TRMNTN_EXPRTN_DT", "trmntn_exprtn_dt")),
|
||||
None,
|
||||
"cms_pos", None,
|
||||
))
|
||||
bulk_insert(conn, "bhi_facilities", cols, rows)
|
||||
return len(rows)
|
||||
|
||||
|
||||
def main():
|
||||
with job_run("bhi_cms_pos") as (conn, _):
|
||||
n = write_rows(conn, fetch_rows())
|
||||
LOG.info("inserted %d", n)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "test":
|
||||
sys.exit(0 if test_endpoint() else 1)
|
||||
main()
|
||||
Reference in New Issue
Block a user