#!/usr/bin/env python3 # READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql """ SAMHSA findtreatment.gov behavioral health facility locator. Source: https://findtreatment.gov/locator/exportsAsJson/v2 Confirmed: 96,009 facilities across 3,201 pages (sType=BH). """ import logging import sys from _common import RateLimitedSession, bulk_insert, job_run LOG = logging.getLogger("bhi.samhsa_locator") BASE = "https://findtreatment.gov/locator/exportsAsJson/v2" ZIP_SEED = "10001" # any valid zip works; results are national in the 'BH' sType PAGE_SIZE = 30 # server default; respected def test_endpoint(): s = RateLimitedSession() r = s.get(BASE, params={"sType": "BH", "sAddr": ZIP_SEED, "page": 1}).json() print(f"OK: recordCount={r.get('recordCount')}, totalPages={r.get('totalPages')}") rows = r.get("rows", []) if rows: print("sample:", rows[0].get("name1"), rows[0].get("state")) return bool(rows) def fetch_rows(max_pages: int | None = None): s = RateLimitedSession(min_interval=0.3) out = [] page = 1 total = None while True: r = s.get(BASE, params={"sType": "BH", "sAddr": ZIP_SEED, "pageSize": PAGE_SIZE, "page": page}).json() total = total or r.get("totalPages", 1) out.extend(r.get("rows", [])) if page % 50 == 0: LOG.info("page %d/%d (total rows %d)", page, total, len(out)) if page >= total or (max_pages and page >= max_pages): break page += 1 LOG.info("fetched %d facilities", len(out)) return out def _parse_float(v): try: return float(v) if v not in (None, "") else None except (TypeError, ValueError): return None def write_rows(conn, raw): cols = [ "ccn","npi","name","address","city","state","zip","county_fips", "lat","lon","facility_type","ownership","bed_count","psych_bed_count", "pediatric_psych_bed_count","adolescent_unit","young_adult_unit", "services_offered","populations_served","payment_accepted", "medicaid_accepted","accreditation","opened_date","closed_date", "last_verified","source","source_raw_id", ] rows = [] for r in raw: name = " ".join(filter(None, [r.get("name1"), (r.get("name2") or "").strip()])).strip() services = (r.get("services") or "").split(",") if r.get("services") else [] # SAMHSA flags adolescent/young-adult services in the services string services_lc = [s.lower() for s in services] adolescent = any("adolescent" in s or "youth" in s or "teen" in s for s in services_lc) or None young_adult = any("young adult" in s or "transitional age" in s for s in services_lc) or None rows.append(( None, None, # ccn/npi unknown from this source name, r.get("street1"), r.get("city"), r.get("state"), r.get("zip"), None, _parse_float(r.get("latitude")), _parse_float(r.get("longitude")), r.get("typeFacility") or "bh_facility", None, None, None, None, adolescent, young_adult, services, [], [], None, None, None, None, None, "samhsa_locator", None, )) bulk_insert(conn, "bhi_facilities", cols, rows) return len(rows) def main(): with job_run("bhi_samhsa_locator") as (conn, _): n = write_rows(conn, fetch_rows()) LOG.info("inserted %d", n) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "test": sys.exit(0 if test_endpoint() else 1) main()