Files
economic-brain-bhi/jobs/ingestion/samhsa_locator.py
2026-04-05 20:15:36 +00:00

96 lines
3.5 KiB
Python

#!/usr/bin/env python3
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
"""
SAMHSA findtreatment.gov behavioral health facility locator.
Source: https://findtreatment.gov/locator/exportsAsJson/v2
Confirmed: 96,009 facilities across 3,201 pages (sType=BH).
"""
import logging
import sys
from _common import RateLimitedSession, bulk_insert, job_run
LOG = logging.getLogger("bhi.samhsa_locator")
BASE = "https://findtreatment.gov/locator/exportsAsJson/v2"
ZIP_SEED = "10001" # any valid zip works; results are national in the 'BH' sType
PAGE_SIZE = 30 # server default; respected
def test_endpoint():
s = RateLimitedSession()
r = s.get(BASE, params={"sType": "BH", "sAddr": ZIP_SEED, "page": 1}).json()
print(f"OK: recordCount={r.get('recordCount')}, totalPages={r.get('totalPages')}")
rows = r.get("rows", [])
if rows:
print("sample:", rows[0].get("name1"), rows[0].get("state"))
return bool(rows)
def fetch_rows(max_pages: int | None = None):
s = RateLimitedSession(min_interval=0.3)
out = []
page = 1
total = None
while True:
r = s.get(BASE, params={"sType": "BH", "sAddr": ZIP_SEED, "pageSize": PAGE_SIZE, "page": page}).json()
total = total or r.get("totalPages", 1)
out.extend(r.get("rows", []))
if page % 50 == 0:
LOG.info("page %d/%d (total rows %d)", page, total, len(out))
if page >= total or (max_pages and page >= max_pages):
break
page += 1
LOG.info("fetched %d facilities", len(out))
return out
def _parse_float(v):
try:
return float(v) if v not in (None, "") else None
except (TypeError, ValueError):
return None
def write_rows(conn, raw):
cols = [
"ccn","npi","name","address","city","state","zip","county_fips",
"lat","lon","facility_type","ownership","bed_count","psych_bed_count",
"pediatric_psych_bed_count","adolescent_unit","young_adult_unit",
"services_offered","populations_served","payment_accepted",
"medicaid_accepted","accreditation","opened_date","closed_date",
"last_verified","source","source_raw_id",
]
rows = []
for r in raw:
name = " ".join(filter(None, [r.get("name1"), (r.get("name2") or "").strip()])).strip()
services = (r.get("services") or "").split(",") if r.get("services") else []
# SAMHSA flags adolescent/young-adult services in the services string
services_lc = [s.lower() for s in services]
adolescent = any("adolescent" in s or "youth" in s or "teen" in s for s in services_lc) or None
young_adult = any("young adult" in s or "transitional age" in s for s in services_lc) or None
rows.append((
None, None, # ccn/npi unknown from this source
name, r.get("street1"),
r.get("city"), r.get("state"), r.get("zip"), None,
_parse_float(r.get("latitude")), _parse_float(r.get("longitude")),
r.get("typeFacility") or "bh_facility",
None, None, None, None,
adolescent, young_adult,
services, [], [], None, None, None, None, None,
"samhsa_locator", None,
))
bulk_insert(conn, "bhi_facilities", cols, rows)
return len(rows)
def main():
with job_run("bhi_samhsa_locator") as (conn, _):
n = write_rows(conn, fetch_rows())
LOG.info("inserted %d", n)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test":
sys.exit(0 if test_endpoint() else 1)
main()