#!/usr/bin/env python3 # READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql """ SAMHSA N-SSATS + N-MHSS bulk downloads. SAMHSA Data Archive hosts annual CSV/SAS files. The landing pages do not expose a machine-listing API, so we maintain a manifest of known direct URLs and parse whichever are present. Update the MANIFEST when new years drop. """ import csv import io import logging import sys import zipfile from _common import RateLimitedSession, bulk_insert, job_run LOG = logging.getLogger("bhi.samhsa_surveys") # Known bulk files. Confirmed on samhsa.gov/data as of 2026. Update as needed. MANIFEST = [ # (year, survey, url) ("2022", "N-MHSS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42936/2022-nmhss-datafile-csv.zip"), ("2022", "N-SSATS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42725/2022-nssats-datafile-csv.zip"), ] def test_endpoint(): s = RateLimitedSession() ok = True for year, survey, url in MANIFEST: r = s.head(url, allow_redirects=True) print(f"{survey} {year}: {r.status_code}") ok = ok and r.status_code == 200 return ok def fetch_rows(): s = RateLimitedSession(min_interval=0.5) out = [] for year, survey, url in MANIFEST: LOG.info("fetching %s %s", survey, year) try: r = s.get(url) z = zipfile.ZipFile(io.BytesIO(r.content)) csvname = next((n for n in z.namelist() if n.lower().endswith(".csv")), None) if not csvname: continue with z.open(csvname) as f: reader = csv.DictReader(io.TextIOWrapper(f, encoding="latin-1")) for row in reader: row["_survey"] = survey row["_year"] = year out.append(row) except Exception as e: LOG.warning("failed %s %s: %s", survey, year, e) LOG.info("total rows: %d", len(out)) return out def write_rows(conn, raw): cols = [ "ccn","npi","name","address","city","state","zip","county_fips", "lat","lon","facility_type","ownership","bed_count","psych_bed_count", "pediatric_psych_bed_count","adolescent_unit","young_adult_unit", "services_offered","populations_served","payment_accepted", "medicaid_accepted","accreditation","opened_date","closed_date", "last_verified","source","source_raw_id", ] rows = [] for r in raw: def y(field): v = r.get(field) or r.get(field.upper()) or r.get(field.lower()) return v == "1" or str(v).lower() == "yes" name = r.get("NAME") or r.get("name") or r.get("FACNAME") or "" rows.append(( None, None, name, r.get("STREET1") or r.get("street1"), r.get("CITY") or r.get("city"), r.get("STATE") or r.get("state"), r.get("ZIP") or r.get("zip"), None, None, None, "sud" if r["_survey"] == "N-SSATS" else "mh", None, None, None, None, y("YOUTH") or y("ADOLESCENT"), y("YAD") or y("YOUNGADULT"), [], [], [], None, None, None, None, None, f"samhsa_{r['_survey'].lower()}_{r['_year']}", None, )) bulk_insert(conn, "bhi_facilities", cols, rows) return len(rows) def main(): with job_run("bhi_samhsa_surveys") as (conn, _): n = write_rows(conn, fetch_rows()) LOG.info("inserted %d", n) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "test": sys.exit(0 if test_endpoint() else 1) main()