147 lines
4.8 KiB
Python
147 lines
4.8 KiB
Python
"""
|
|
Shared helpers for BHI ingestion jobs.
|
|
|
|
READY TO DEPLOY — requires base Brain Postgres schema + run schemas/bhi_tables.sql
|
|
|
|
Base Brain is expected to expose:
|
|
- env DATABASE_URL pointing at the `brain` Postgres
|
|
- a `job_runs` table (the base Brain maintains this)
|
|
- optional Vault at http://localhost:8200 for API keys
|
|
|
|
Every BHI job imports from this module to keep behavior consistent.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import time
|
|
from contextlib import contextmanager
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Iterable
|
|
|
|
import requests
|
|
|
|
try:
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
except ImportError:
|
|
psycopg2 = None # type: ignore
|
|
|
|
LOG_FMT = "%(asctime)s %(levelname)s %(name)s | %(message)s"
|
|
logging.basicConfig(level=os.environ.get("BHI_LOG_LEVEL", "INFO"), format=LOG_FMT)
|
|
|
|
|
|
# --- HTTP session with retries + rate limiting ------------------------------
|
|
|
|
class RateLimitedSession(requests.Session):
|
|
def __init__(self, min_interval: float = 0.2, max_retries: int = 5):
|
|
super().__init__()
|
|
self.headers.update({"User-Agent": "EconomicBrain-BHI/1.0 (+research)"})
|
|
self.min_interval = min_interval
|
|
self.max_retries = max_retries
|
|
self._last = 0.0
|
|
|
|
def request(self, method, url, **kw): # type: ignore[override]
|
|
kw.setdefault("timeout", 60)
|
|
backoff = 1.0
|
|
for attempt in range(self.max_retries):
|
|
dt = time.monotonic() - self._last
|
|
if dt < self.min_interval:
|
|
time.sleep(self.min_interval - dt)
|
|
self._last = time.monotonic()
|
|
try:
|
|
resp = super().request(method, url, **kw)
|
|
if resp.status_code in (429, 500, 502, 503, 504):
|
|
logging.warning("HTTP %s on %s, retrying in %.1fs", resp.status_code, url, backoff)
|
|
time.sleep(backoff)
|
|
backoff *= 2
|
|
continue
|
|
resp.raise_for_status()
|
|
return resp
|
|
except requests.RequestException as e:
|
|
logging.warning("Request error: %s (attempt %d)", e, attempt + 1)
|
|
time.sleep(backoff)
|
|
backoff *= 2
|
|
raise RuntimeError(f"Exceeded retries for {url}")
|
|
|
|
|
|
# --- DB helpers -------------------------------------------------------------
|
|
|
|
def get_conn():
|
|
if psycopg2 is None:
|
|
raise RuntimeError("psycopg2 not installed. pip install psycopg2-binary")
|
|
dsn = os.environ.get("DATABASE_URL") or os.environ.get("BRAIN_DATABASE_URL")
|
|
if not dsn:
|
|
raise RuntimeError("DATABASE_URL env var not set")
|
|
return psycopg2.connect(dsn)
|
|
|
|
|
|
@contextmanager
|
|
def job_run(job_name: str):
|
|
"""Context manager that logs a row in the base Brain's job_runs table."""
|
|
conn = get_conn()
|
|
run_id = None
|
|
started = datetime.utcnow()
|
|
try:
|
|
with conn.cursor() as c:
|
|
c.execute(
|
|
"""
|
|
INSERT INTO job_runs (job_name, started_at, status)
|
|
VALUES (%s, %s, 'running') RETURNING id
|
|
""",
|
|
(job_name, started),
|
|
)
|
|
run_id = c.fetchone()[0]
|
|
conn.commit()
|
|
yield conn, run_id
|
|
with conn.cursor() as c:
|
|
c.execute(
|
|
"UPDATE job_runs SET status='success', finished_at=%s WHERE id=%s",
|
|
(datetime.utcnow(), run_id),
|
|
)
|
|
conn.commit()
|
|
except Exception as e:
|
|
if run_id is not None:
|
|
try:
|
|
with conn.cursor() as c:
|
|
c.execute(
|
|
"UPDATE job_runs SET status='error', finished_at=%s, error=%s WHERE id=%s",
|
|
(datetime.utcnow(), str(e)[:2000], run_id),
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
pass
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def bulk_insert(conn, table: str, columns: list[str], rows: Iterable[tuple]):
|
|
with conn.cursor() as c:
|
|
psycopg2.extras.execute_values(
|
|
c,
|
|
f"INSERT INTO {table} ({', '.join(columns)}) VALUES %s",
|
|
list(rows),
|
|
page_size=500,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# --- Vault (optional) -------------------------------------------------------
|
|
|
|
def vault_secret(path: str, key: str) -> str | None:
|
|
token = os.environ.get("VAULT_TOKEN")
|
|
addr = os.environ.get("VAULT_ADDR", "http://localhost:8200")
|
|
if not token:
|
|
return os.environ.get(key.upper())
|
|
try:
|
|
r = requests.get(
|
|
f"{addr}/v1/{path}",
|
|
headers={"X-Vault-Token": token},
|
|
timeout=5,
|
|
)
|
|
return r.json()["data"]["data"].get(key)
|
|
except Exception as e:
|
|
logging.warning("vault fetch failed: %s", e)
|
|
return os.environ.get(key.upper())
|