#!/usr/bin/env python3 # READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql """ BLS OES (Occupational Employment and Wage Statistics) — behavioral health workforce by MSA. Primary approach: annual bulk download (no auth, simplest): https://www.bls.gov/oes/special-requests/oesmYYma.zip Fallback / enrichment: BLS public API (optional free key via vault). """ import csv import io import logging import sys import zipfile from _common import RateLimitedSession, bulk_insert, job_run, vault_secret LOG = logging.getLogger("bhi.bls_oes") BULK_URL = "https://www.bls.gov/oes/special-requests/oesm23ma.zip" # update year annually BH_SOC_CODES = { "29-1223": "Psychiatrists", "29-1229": "Physicians, All Other", "21-1014": "Mental Health Counselors", "21-1015": "Rehabilitation Counselors", "21-1018": "SUD / Behavioral Disorder Counselors", "21-1023": "Mental Health & Substance Abuse Social Workers", "19-3033": "Clinical & Counseling Psychologists", } def test_endpoint(): s = RateLimitedSession() r = s.head(BULK_URL, allow_redirects=True) print(f"OK: status={r.status_code}, content-length={r.headers.get('content-length')}") return r.status_code == 200 def fetch_rows(): s = RateLimitedSession(min_interval=1.0) r = s.get(BULK_URL) z = zipfile.ZipFile(io.BytesIO(r.content)) # Bulk zip contains one CSV/XLSX with MSA rows csv_name = next((n for n in z.namelist() if n.lower().endswith(".csv")), None) if not csv_name: LOG.error("no CSV in BLS zip") return [] with z.open(csv_name) as f: reader = csv.DictReader(io.TextIOWrapper(f, encoding="latin-1")) rows = [r for r in reader if (r.get("OCC_CODE") or r.get("occ_code")) in BH_SOC_CODES] LOG.info("BLS OES BH rows: %d", len(rows)) return rows def _num(v): try: return float(str(v).replace(",", "")) if v not in (None, "", "*", "#") else None except (TypeError, ValueError): return None def write_rows(conn, raw): cols = ["msa_code","msa_name","occupation_code","occupation_title", "employment","annual_wage_median","annual_wage_mean","period","source"] rows = [] for r in raw: code = r.get("OCC_CODE") or r.get("occ_code") rows.append(( r.get("AREA") or r.get("area"), r.get("AREA_TITLE") or r.get("area_title"), code, BH_SOC_CODES.get(code, r.get("OCC_TITLE") or r.get("occ_title")), int(_num(r.get("TOT_EMP") or r.get("tot_emp")) or 0) or None, _num(r.get("A_MEDIAN") or r.get("a_median")), _num(r.get("A_MEAN") or r.get("a_mean")), "May2023", "bls_oes", )) bulk_insert(conn, "bhi_workforce", cols, rows) return len(rows) def main(): with job_run("bhi_bls_oes") as (conn, _): n = write_rows(conn, fetch_rows()) LOG.info("inserted %d", n) if __name__ == "__main__": if len(sys.argv) > 1 and sys.argv[1] == "test": sys.exit(0 if test_endpoint() else 1) main()