Files
economic-brain-bhi/jobs/ingestion/samhsa_nssats_nmhss.py
2026-04-05 20:15:36 +00:00

103 lines
3.6 KiB
Python

#!/usr/bin/env python3
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
"""
SAMHSA N-SSATS + N-MHSS bulk downloads.
SAMHSA Data Archive hosts annual CSV/SAS files. The landing pages do not
expose a machine-listing API, so we maintain a manifest of known direct URLs
and parse whichever are present. Update the MANIFEST when new years drop.
"""
import csv
import io
import logging
import sys
import zipfile
from _common import RateLimitedSession, bulk_insert, job_run
LOG = logging.getLogger("bhi.samhsa_surveys")
# Known bulk files. Confirmed on samhsa.gov/data as of 2026. Update as needed.
MANIFEST = [
# (year, survey, url)
("2022", "N-MHSS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42936/2022-nmhss-datafile-csv.zip"),
("2022", "N-SSATS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42725/2022-nssats-datafile-csv.zip"),
]
def test_endpoint():
s = RateLimitedSession()
ok = True
for year, survey, url in MANIFEST:
r = s.head(url, allow_redirects=True)
print(f"{survey} {year}: {r.status_code}")
ok = ok and r.status_code == 200
return ok
def fetch_rows():
s = RateLimitedSession(min_interval=0.5)
out = []
for year, survey, url in MANIFEST:
LOG.info("fetching %s %s", survey, year)
try:
r = s.get(url)
z = zipfile.ZipFile(io.BytesIO(r.content))
csvname = next((n for n in z.namelist() if n.lower().endswith(".csv")), None)
if not csvname:
continue
with z.open(csvname) as f:
reader = csv.DictReader(io.TextIOWrapper(f, encoding="latin-1"))
for row in reader:
row["_survey"] = survey
row["_year"] = year
out.append(row)
except Exception as e:
LOG.warning("failed %s %s: %s", survey, year, e)
LOG.info("total rows: %d", len(out))
return out
def write_rows(conn, raw):
cols = [
"ccn","npi","name","address","city","state","zip","county_fips",
"lat","lon","facility_type","ownership","bed_count","psych_bed_count",
"pediatric_psych_bed_count","adolescent_unit","young_adult_unit",
"services_offered","populations_served","payment_accepted",
"medicaid_accepted","accreditation","opened_date","closed_date",
"last_verified","source","source_raw_id",
]
rows = []
for r in raw:
def y(field):
v = r.get(field) or r.get(field.upper()) or r.get(field.lower())
return v == "1" or str(v).lower() == "yes"
name = r.get("NAME") or r.get("name") or r.get("FACNAME") or ""
rows.append((
None, None, name,
r.get("STREET1") or r.get("street1"),
r.get("CITY") or r.get("city"),
r.get("STATE") or r.get("state"),
r.get("ZIP") or r.get("zip"),
None, None, None,
"sud" if r["_survey"] == "N-SSATS" else "mh",
None, None, None, None,
y("YOUTH") or y("ADOLESCENT"),
y("YAD") or y("YOUNGADULT"),
[], [], [], None, None, None, None, None,
f"samhsa_{r['_survey'].lower()}_{r['_year']}", None,
))
bulk_insert(conn, "bhi_facilities", cols, rows)
return len(rows)
def main():
with job_run("bhi_samhsa_surveys") as (conn, _):
n = write_rows(conn, fetch_rows())
LOG.info("inserted %d", n)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test":
sys.exit(0 if test_endpoint() else 1)
main()