103 lines
3.6 KiB
Python
103 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
|
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
|
|
"""
|
|
SAMHSA N-SSATS + N-MHSS bulk downloads.
|
|
|
|
SAMHSA Data Archive hosts annual CSV/SAS files. The landing pages do not
|
|
expose a machine-listing API, so we maintain a manifest of known direct URLs
|
|
and parse whichever are present. Update the MANIFEST when new years drop.
|
|
"""
|
|
import csv
|
|
import io
|
|
import logging
|
|
import sys
|
|
import zipfile
|
|
from _common import RateLimitedSession, bulk_insert, job_run
|
|
|
|
LOG = logging.getLogger("bhi.samhsa_surveys")
|
|
|
|
# Known bulk files. Confirmed on samhsa.gov/data as of 2026. Update as needed.
|
|
MANIFEST = [
|
|
# (year, survey, url)
|
|
("2022", "N-MHSS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42936/2022-nmhss-datafile-csv.zip"),
|
|
("2022", "N-SSATS", "https://www.samhsa.gov/data/sites/default/files/reports/rpt42725/2022-nssats-datafile-csv.zip"),
|
|
]
|
|
|
|
|
|
def test_endpoint():
|
|
s = RateLimitedSession()
|
|
ok = True
|
|
for year, survey, url in MANIFEST:
|
|
r = s.head(url, allow_redirects=True)
|
|
print(f"{survey} {year}: {r.status_code}")
|
|
ok = ok and r.status_code == 200
|
|
return ok
|
|
|
|
|
|
def fetch_rows():
|
|
s = RateLimitedSession(min_interval=0.5)
|
|
out = []
|
|
for year, survey, url in MANIFEST:
|
|
LOG.info("fetching %s %s", survey, year)
|
|
try:
|
|
r = s.get(url)
|
|
z = zipfile.ZipFile(io.BytesIO(r.content))
|
|
csvname = next((n for n in z.namelist() if n.lower().endswith(".csv")), None)
|
|
if not csvname:
|
|
continue
|
|
with z.open(csvname) as f:
|
|
reader = csv.DictReader(io.TextIOWrapper(f, encoding="latin-1"))
|
|
for row in reader:
|
|
row["_survey"] = survey
|
|
row["_year"] = year
|
|
out.append(row)
|
|
except Exception as e:
|
|
LOG.warning("failed %s %s: %s", survey, year, e)
|
|
LOG.info("total rows: %d", len(out))
|
|
return out
|
|
|
|
|
|
def write_rows(conn, raw):
|
|
cols = [
|
|
"ccn","npi","name","address","city","state","zip","county_fips",
|
|
"lat","lon","facility_type","ownership","bed_count","psych_bed_count",
|
|
"pediatric_psych_bed_count","adolescent_unit","young_adult_unit",
|
|
"services_offered","populations_served","payment_accepted",
|
|
"medicaid_accepted","accreditation","opened_date","closed_date",
|
|
"last_verified","source","source_raw_id",
|
|
]
|
|
rows = []
|
|
for r in raw:
|
|
def y(field):
|
|
v = r.get(field) or r.get(field.upper()) or r.get(field.lower())
|
|
return v == "1" or str(v).lower() == "yes"
|
|
name = r.get("NAME") or r.get("name") or r.get("FACNAME") or ""
|
|
rows.append((
|
|
None, None, name,
|
|
r.get("STREET1") or r.get("street1"),
|
|
r.get("CITY") or r.get("city"),
|
|
r.get("STATE") or r.get("state"),
|
|
r.get("ZIP") or r.get("zip"),
|
|
None, None, None,
|
|
"sud" if r["_survey"] == "N-SSATS" else "mh",
|
|
None, None, None, None,
|
|
y("YOUTH") or y("ADOLESCENT"),
|
|
y("YAD") or y("YOUNGADULT"),
|
|
[], [], [], None, None, None, None, None,
|
|
f"samhsa_{r['_survey'].lower()}_{r['_year']}", None,
|
|
))
|
|
bulk_insert(conn, "bhi_facilities", cols, rows)
|
|
return len(rows)
|
|
|
|
|
|
def main():
|
|
with job_run("bhi_samhsa_surveys") as (conn, _):
|
|
n = write_rows(conn, fetch_rows())
|
|
LOG.info("inserted %d", n)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
if len(sys.argv) > 1 and sys.argv[1] == "test":
|
|
sys.exit(0 if test_endpoint() else 1)
|
|
main()
|