Files
2026-04-05 20:15:36 +00:00

94 lines
3.0 KiB
Python

#!/usr/bin/env python3
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
"""
BLS OES (Occupational Employment and Wage Statistics) — behavioral health
workforce by MSA.
Primary approach: annual bulk download (no auth, simplest):
https://www.bls.gov/oes/special-requests/oesmYYma.zip
Fallback / enrichment: BLS public API (optional free key via vault).
"""
import csv
import io
import logging
import sys
import zipfile
from _common import RateLimitedSession, bulk_insert, job_run, vault_secret
LOG = logging.getLogger("bhi.bls_oes")
BULK_URL = "https://www.bls.gov/oes/special-requests/oesm23ma.zip" # update year annually
BH_SOC_CODES = {
"29-1223": "Psychiatrists",
"29-1229": "Physicians, All Other",
"21-1014": "Mental Health Counselors",
"21-1015": "Rehabilitation Counselors",
"21-1018": "SUD / Behavioral Disorder Counselors",
"21-1023": "Mental Health & Substance Abuse Social Workers",
"19-3033": "Clinical & Counseling Psychologists",
}
def test_endpoint():
s = RateLimitedSession()
r = s.head(BULK_URL, allow_redirects=True)
print(f"OK: status={r.status_code}, content-length={r.headers.get('content-length')}")
return r.status_code == 200
def fetch_rows():
s = RateLimitedSession(min_interval=1.0)
r = s.get(BULK_URL)
z = zipfile.ZipFile(io.BytesIO(r.content))
# Bulk zip contains one CSV/XLSX with MSA rows
csv_name = next((n for n in z.namelist() if n.lower().endswith(".csv")), None)
if not csv_name:
LOG.error("no CSV in BLS zip")
return []
with z.open(csv_name) as f:
reader = csv.DictReader(io.TextIOWrapper(f, encoding="latin-1"))
rows = [r for r in reader if (r.get("OCC_CODE") or r.get("occ_code")) in BH_SOC_CODES]
LOG.info("BLS OES BH rows: %d", len(rows))
return rows
def _num(v):
try:
return float(str(v).replace(",", "")) if v not in (None, "", "*", "#") else None
except (TypeError, ValueError):
return None
def write_rows(conn, raw):
cols = ["msa_code","msa_name","occupation_code","occupation_title",
"employment","annual_wage_median","annual_wage_mean","period","source"]
rows = []
for r in raw:
code = r.get("OCC_CODE") or r.get("occ_code")
rows.append((
r.get("AREA") or r.get("area"),
r.get("AREA_TITLE") or r.get("area_title"),
code,
BH_SOC_CODES.get(code, r.get("OCC_TITLE") or r.get("occ_title")),
int(_num(r.get("TOT_EMP") or r.get("tot_emp")) or 0) or None,
_num(r.get("A_MEDIAN") or r.get("a_median")),
_num(r.get("A_MEAN") or r.get("a_mean")),
"May2023",
"bls_oes",
))
bulk_insert(conn, "bhi_workforce", cols, rows)
return len(rows)
def main():
with job_run("bhi_bls_oes") as (conn, _):
n = write_rows(conn, fetch_rows())
LOG.info("inserted %d", n)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test":
sys.exit(0 if test_endpoint() else 1)
main()