#!/usr/bin/env python3
# READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
"""
CDC WONDER — Underlying Cause of Death by county, age bracket, ICD-10.
Posts XML request body to https://wonder.cdc.gov/controller/datarequest/D76
(Underlying Cause of Death 1999-2020) or D77 (2018+). The public non-restricted
datasets return XML tables; county-level cells with <10 deaths are suppressed.
We request two slices:
1. Suicide (X60-X84) for ages 13-17 and 18-25, by county
2. Drug poisoning (X40-X44, Y10-Y14) for 13-17 and 18-25, by county
"""
import logging
import sys
import xml.etree.ElementTree as ET
from _common import RateLimitedSession, bulk_insert, job_run
LOG = logging.getLogger("bhi.cdc_wonder")
ENDPOINT = "https://wonder.cdc.gov/controller/datarequest/D76"
def _build_xml(icd_codes: list[str], age_bracket: str) -> str:
"""Assemble WONDER POST XML. Structure is value-order dependent."""
# Age groups in WONDER: 15-19, 20-24, 25-29 etc. Adolescent and young-adult
# brackets don't align perfectly with 5-year WONDER bins — closest fit:
ages = {
"13-17": ["15-19"], # approximate
"18-25": ["20-24", "25-29"],
}[age_bracket]
icd_vals = "".join(f"{c}" for c in icd_codes)
age_vals = "".join(f"{a}" for a in ages)
return f"""
accept_datause_restrictionstrue
B_1D76.V2-level1
B_2D76.V51
F_D76.V1{age_vals}
F_D76.V2*All*
F_D76.V22{icd_vals}
O_ageD76.V51
O_locationD76.V9
VM_D76.M6_D76.V10
"""
def test_endpoint():
s = RateLimitedSession(min_interval=1.0)
body = _build_xml(["X60-X84"], "13-17")
r = s.post(ENDPOINT, data={"request_xml": body, "accept_datause_restrictions": "true"})
ok = r.status_code == 200 and b" %d rows", measure, bracket, len(rows))
return out
def _parse_wonder_xml(xml_text: str, measure: str, bracket: str):
out = []
try:
root = ET.fromstring(xml_text)
except ET.ParseError:
LOG.error("WONDER XML parse failed")
return out
# WONDER returns with rows containing
for r in root.iter("r"):
cells = [c.get("l") or c.text for c in r.findall("c")]
if len(cells) < 3:
continue
county = cells[0]
try:
rate = float(cells[-1])
except (TypeError, ValueError):
continue
out.append({
"geo_type": "county",
"geo_code": county,
"measure": measure,
"age_bracket": bracket,
"period": "2018-2022", # WONDER typical 5-year window
"value": rate,
"source": "cdc_wonder",
})
return out
def write_rows(conn, raw):
cols = ["geo_type","geo_code","measure","age_bracket","period","value","source"]
rows = [(r["geo_type"], r["geo_code"], r["measure"], r["age_bracket"],
r["period"], r["value"], r["source"]) for r in raw]
bulk_insert(conn, "bhi_demand_indicators", cols, rows)
return len(rows)
def main():
with job_run("bhi_cdc_wonder") as (conn, _):
n = write_rows(conn, fetch_rows())
LOG.info("inserted %d", n)
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "test":
sys.exit(0 if test_endpoint() else 1)
main()