""" Shared helpers for BHI ingestion jobs. READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql Base Brain is expected to expose: - env DATABASE_URL pointing at the `brain` Postgres - a `job_runs` table (the base Brain maintains this) - optional Vault at http://localhost:8200 for API keys Every BHI job imports from this module to keep behavior consistent. """ from __future__ import annotations import logging import os import time from contextlib import contextmanager from datetime import datetime from typing import Any, Callable, Iterable import requests try: import psycopg2 import psycopg2.extras except ImportError: psycopg2 = None # type: ignore LOG_FMT = "%(asctime)s %(levelname)s %(name)s | %(message)s" logging.basicConfig(level=os.environ.get("BHI_LOG_LEVEL", "INFO"), format=LOG_FMT) # --- HTTP session with retries + rate limiting ------------------------------ class RateLimitedSession(requests.Session): def __init__(self, min_interval: float = 0.2, max_retries: int = 5): super().__init__() self.headers.update({"User-Agent": "EconomicBrain-BHI/1.0 (+research)"}) self.min_interval = min_interval self.max_retries = max_retries self._last = 0.0 def request(self, method, url, **kw): # type: ignore[override] kw.setdefault("timeout", 60) backoff = 1.0 for attempt in range(self.max_retries): dt = time.monotonic() - self._last if dt < self.min_interval: time.sleep(self.min_interval - dt) self._last = time.monotonic() try: resp = super().request(method, url, **kw) if resp.status_code in (429, 500, 502, 503, 504): logging.warning("HTTP %s on %s, retrying in %.1fs", resp.status_code, url, backoff) time.sleep(backoff) backoff *= 2 continue resp.raise_for_status() return resp except requests.RequestException as e: logging.warning("Request error: %s (attempt %d)", e, attempt + 1) time.sleep(backoff) backoff *= 2 raise RuntimeError(f"Exceeded retries for {url}") # --- DB helpers ------------------------------------------------------------- def get_conn(): if psycopg2 is None: raise RuntimeError("psycopg2 not installed. pip install psycopg2-binary") dsn = os.environ.get("DATABASE_URL") or os.environ.get("BRAIN_DATABASE_URL") if not dsn: raise RuntimeError("DATABASE_URL env var not set") return psycopg2.connect(dsn) @contextmanager def job_run(job_name: str): """Context manager that logs a row in the base Brain's job_runs table.""" conn = get_conn() run_id = None started = datetime.utcnow() try: with conn.cursor() as c: c.execute( """ INSERT INTO job_runs (job_name, started_at, status) VALUES (%s, %s, 'running') RETURNING id """, (job_name, started), ) run_id = c.fetchone()[0] conn.commit() yield conn, run_id with conn.cursor() as c: c.execute( "UPDATE job_runs SET status='success', finished_at=%s WHERE id=%s", (datetime.utcnow(), run_id), ) conn.commit() except Exception as e: if run_id is not None: try: with conn.cursor() as c: c.execute( "UPDATE job_runs SET status='error', finished_at=%s, error=%s WHERE id=%s", (datetime.utcnow(), str(e)[:2000], run_id), ) conn.commit() except Exception: pass raise finally: conn.close() def bulk_insert(conn, table: str, columns: list[str], rows: Iterable[tuple]): with conn.cursor() as c: psycopg2.extras.execute_values( c, f"INSERT INTO {table} ({', '.join(columns)}) VALUES %s", list(rows), page_size=500, ) conn.commit() # --- Vault (optional) ------------------------------------------------------- def vault_secret(path: str, key: str) -> str | None: token = os.environ.get("VAULT_TOKEN") addr = os.environ.get("VAULT_ADDR", "http://localhost:8200") if not token: return os.environ.get(key.upper()) try: r = requests.get( f"{addr}/v1/{path}", headers={"X-Vault-Token": token}, timeout=5, ) return r.json()["data"]["data"].get(key) except Exception as e: logging.warning("vault fetch failed: %s", e) return os.environ.get(key.upper())