From e46c44f1e0f1ff3a52cf8fd02fba6ddf085be28a Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 11:47:04 +0200 Subject: [PATCH] =?UTF-8?q?feat(rest):=20FastAPI=20adapter=20+=20/jobs,=20?= =?UTF-8?q?/healthz,=20/metrics=20routes=20(spec=20=C2=A75)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Routes: - POST /jobs: 201 on first insert, 200 on idempotent re-submit. - GET /jobs/{id}: full Job envelope or 404. - GET /jobs?client_id=&request_id=: correlation lookup or 404. - GET /healthz: {postgres, ollama, ocr}; 200 iff all ok (degraded counts as non-200 per spec). Postgres probe guarded by a 2 s wait_for. - GET /metrics: pending/running counts + 24h done/error counters + per-use-case avg seconds. Plain JSON, no Prometheus. create_app(spawn_worker=bool) parameterises worker spawning so tests that only need REST pass False. Worker spawn is tolerant of the loop module not being importable yet (Task 3.5 fills it in). Probes are a DI bundle — production wiring swaps them in at startup (Chunk 4); tests inject canned ok/fail callables. Session factory is also DI'd so tests can point at a per-loop engine and sidestep the async-pg cross-loop future issue that bit the jobs_repo fixture. 9 new integration tests; unit suite unchanged. Forgejo Actions trigger is flaky; local verification is the gate (unit + integration green locally). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ix/adapters/__init__.py | 6 + src/ix/adapters/rest/__init__.py | 5 + src/ix/adapters/rest/routes.py | 227 +++++++++++++++++++++++++ src/ix/adapters/rest/schemas.py | 52 ++++++ src/ix/app.py | 59 +++++++ tests/integration/test_rest_adapter.py | 179 +++++++++++++++++++ 6 files changed, 528 insertions(+) create mode 100644 src/ix/adapters/__init__.py create mode 100644 src/ix/adapters/rest/__init__.py create mode 100644 src/ix/adapters/rest/routes.py create mode 100644 src/ix/adapters/rest/schemas.py create mode 100644 src/ix/app.py create mode 100644 tests/integration/test_rest_adapter.py diff --git a/src/ix/adapters/__init__.py b/src/ix/adapters/__init__.py new file mode 100644 index 0000000..a4c6b8c --- /dev/null +++ b/src/ix/adapters/__init__.py @@ -0,0 +1,6 @@ +"""Transport adapters — REST (always on) + pg_queue (optional). + +Adapters are thin: they marshal external events into :class:`RequestIX` +payloads that land in ``ix_jobs`` as pending rows, and they read back from +the same store. They do NOT run the pipeline themselves; the worker does. +""" diff --git a/src/ix/adapters/rest/__init__.py b/src/ix/adapters/rest/__init__.py new file mode 100644 index 0000000..28000d4 --- /dev/null +++ b/src/ix/adapters/rest/__init__.py @@ -0,0 +1,5 @@ +"""FastAPI REST adapter — POST /jobs / GET /jobs / GET /healthz / GET /metrics. + +Routes are defined in ``routes.py``. The ``create_app`` factory in +``ix.app`` wires them up alongside the worker lifespan. +""" diff --git a/src/ix/adapters/rest/routes.py b/src/ix/adapters/rest/routes.py new file mode 100644 index 0000000..912b9b4 --- /dev/null +++ b/src/ix/adapters/rest/routes.py @@ -0,0 +1,227 @@ +"""REST routes (spec §5). + +The routes depend on two injected objects: + +* a session factory (``get_session_factory_dep``): swapped in tests so we can + use the fixture's per-test engine instead of the lazy process-wide one in + ``ix.store.engine``. +* a :class:`Probes` bundle (``get_probes``): each probe returns the + per-subsystem state string used by ``/healthz``. Tests inject canned + probes; Chunk 4 wires the real Ollama/Surya ones. + +``/healthz`` has a strict 2-second postgres timeout — we use an +``asyncio.wait_for`` around a ``SELECT 1`` roundtrip so a broken pool or a +hung connection can't wedge the healthcheck endpoint. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from dataclasses import dataclass +from datetime import UTC, datetime, timedelta +from typing import Annotated, Literal +from uuid import UUID + +from fastapi import APIRouter, Depends, HTTPException, Query, Response +from sqlalchemy import func, select, text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + +from ix.adapters.rest.schemas import HealthStatus, JobSubmitResponse, MetricsResponse +from ix.contracts.job import Job +from ix.contracts.request import RequestIX +from ix.store import jobs_repo +from ix.store.engine import get_session_factory +from ix.store.models import IxJob + + +@dataclass +class Probes: + """Injected subsystem-probe callables for ``/healthz``. + + Each callable returns the literal status string expected in the body. + Probes are sync by design — none of the real ones need awaits today and + keeping them sync lets tests pass plain lambdas. Real probes that need + async work run the call through ``asyncio.run_in_executor`` inside the + callable (Chunk 4). + """ + + ollama: Callable[[], Literal["ok", "degraded", "fail"]] + ocr: Callable[[], Literal["ok", "fail"]] + + +def get_session_factory_dep() -> async_sessionmaker[AsyncSession]: + """Default DI: the process-wide store factory. Tests override this.""" + + return get_session_factory() + + +def get_probes() -> Probes: + """Default DI: a pair of ``fail`` probes. + + Production wiring (Chunk 4) overrides this factory with real Ollama + + Surya probes at app-startup time. Integration tests override via + ``app.dependency_overrides[get_probes]`` with a canned ``ok`` pair. + The default here ensures a mis-wired deployment surfaces clearly in + ``/healthz`` rather than claiming everything is fine by accident. + """ + + return Probes(ollama=lambda: "fail", ocr=lambda: "fail") + + +router = APIRouter() + + +@router.post("/jobs", response_model=JobSubmitResponse, status_code=201) +async def submit_job( + request: RequestIX, + response: Response, + session_factory: Annotated[ + async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) + ], +) -> JobSubmitResponse: + """Submit a new job. + + Per spec §5: 201 on first insert, 200 on idempotent re-submit of an + existing ``(client_id, request_id)`` pair. We detect the second case by + snapshotting the pre-insert row set and comparing ``created_at``. + """ + + async with session_factory() as session: + existing = await jobs_repo.get_by_correlation( + session, request.ix_client_id, request.request_id + ) + job = await jobs_repo.insert_pending( + session, request, callback_url=request.callback_url + ) + await session.commit() + + if existing is not None: + # Idempotent re-submit — flip to 200. FastAPI's declared status_code + # is 201, but setting response.status_code overrides it per-call. + response.status_code = 200 + + return JobSubmitResponse(job_id=job.job_id, ix_id=job.ix_id, status=job.status) + + +@router.get("/jobs/{job_id}", response_model=Job) +async def get_job( + job_id: UUID, + session_factory: Annotated[ + async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) + ], +) -> Job: + async with session_factory() as session: + job = await jobs_repo.get(session, job_id) + if job is None: + raise HTTPException(status_code=404, detail="job not found") + return job + + +@router.get("/jobs", response_model=Job) +async def lookup_job_by_correlation( + client_id: Annotated[str, Query(...)], + request_id: Annotated[str, Query(...)], + session_factory: Annotated[ + async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) + ], +) -> Job: + async with session_factory() as session: + job = await jobs_repo.get_by_correlation(session, client_id, request_id) + if job is None: + raise HTTPException(status_code=404, detail="job not found") + return job + + +@router.get("/healthz") +async def healthz( + response: Response, + session_factory: Annotated[ + async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) + ], + probes: Annotated[Probes, Depends(get_probes)], +) -> HealthStatus: + """Per spec §5: postgres / ollama / ocr; 200 iff all three == ok.""" + + postgres_state: Literal["ok", "fail"] = "fail" + try: + async def _probe() -> None: + async with session_factory() as session: + await session.execute(text("SELECT 1")) + + await asyncio.wait_for(_probe(), timeout=2.0) + postgres_state = "ok" + except Exception: + postgres_state = "fail" + + try: + ollama_state = probes.ollama() + except Exception: + ollama_state = "fail" + try: + ocr_state = probes.ocr() + except Exception: + ocr_state = "fail" + + body = HealthStatus( + postgres=postgres_state, ollama=ollama_state, ocr=ocr_state + ) + if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok": + response.status_code = 503 + return body + + +@router.get("/metrics", response_model=MetricsResponse) +async def metrics( + session_factory: Annotated[ + async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) + ], +) -> MetricsResponse: + """Counters over the last 24h — plain JSON per spec §5.""" + + since = datetime.now(UTC) - timedelta(hours=24) + + async with session_factory() as session: + pending = await session.scalar( + select(func.count()).select_from(IxJob).where(IxJob.status == "pending") + ) + running = await session.scalar( + select(func.count()).select_from(IxJob).where(IxJob.status == "running") + ) + done_24h = await session.scalar( + select(func.count()) + .select_from(IxJob) + .where(IxJob.status == "done", IxJob.finished_at >= since) + ) + error_24h = await session.scalar( + select(func.count()) + .select_from(IxJob) + .where(IxJob.status == "error", IxJob.finished_at >= since) + ) + + # Per-use-case average seconds. ``request`` is JSONB, so we dig out + # the use_case key via ->>. Only consider rows that both started and + # finished in the window (can't compute elapsed otherwise). + rows = ( + await session.execute( + text( + "SELECT request->>'use_case' AS use_case, " + "AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) " + "FROM ix_jobs " + "WHERE status='done' AND finished_at IS NOT NULL " + "AND started_at IS NOT NULL AND finished_at >= :since " + "GROUP BY request->>'use_case'" + ), + {"since": since}, + ) + ).all() + + by_use_case = {row[0]: float(row[1]) for row in rows if row[0] is not None} + + return MetricsResponse( + jobs_pending=int(pending or 0), + jobs_running=int(running or 0), + jobs_done_24h=int(done_24h or 0), + jobs_error_24h=int(error_24h or 0), + by_use_case_seconds=by_use_case, + ) diff --git a/src/ix/adapters/rest/schemas.py b/src/ix/adapters/rest/schemas.py new file mode 100644 index 0000000..cc57f78 --- /dev/null +++ b/src/ix/adapters/rest/schemas.py @@ -0,0 +1,52 @@ +"""REST-adapter request / response bodies. + +Most payloads reuse the core contracts directly (:class:`RequestIX`, +:class:`Job`). The only adapter-specific shape is the lightweight POST /jobs +response (`job_id`, `ix_id`, `status`) — callers don't need the full Job +envelope back on submit; they poll ``GET /jobs/{id}`` for that. +""" + +from __future__ import annotations + +from typing import Literal +from uuid import UUID + +from pydantic import BaseModel, ConfigDict + + +class JobSubmitResponse(BaseModel): + """What POST /jobs returns: just enough to poll or subscribe to callbacks.""" + + model_config = ConfigDict(extra="forbid") + + job_id: UUID + ix_id: str + status: Literal["pending", "running", "done", "error"] + + +class HealthStatus(BaseModel): + """Body of GET /healthz. + + Each field reports per-subsystem state. Overall HTTP status is 200 iff + every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"`` + when the backend is reachable but the default model isn't pulled — + monitoring surfaces that as non-200. + """ + + model_config = ConfigDict(extra="forbid") + + postgres: Literal["ok", "fail"] + ollama: Literal["ok", "degraded", "fail"] + ocr: Literal["ok", "fail"] + + +class MetricsResponse(BaseModel): + """Body of GET /metrics — plain JSON (no Prometheus format for MVP).""" + + model_config = ConfigDict(extra="forbid") + + jobs_pending: int + jobs_running: int + jobs_done_24h: int + jobs_error_24h: int + by_use_case_seconds: dict[str, float] diff --git a/src/ix/app.py b/src/ix/app.py new file mode 100644 index 0000000..6517d4a --- /dev/null +++ b/src/ix/app.py @@ -0,0 +1,59 @@ +"""FastAPI app factory + lifespan. + +``create_app()`` wires the REST router on top of a lifespan that spawns the +worker loop (Task 3.5) and, optionally, the pg_queue listener (Task 3.6). +Tests that don't care about the worker call ``create_app(spawn_worker=False)`` +so the lifespan returns cleanly. + +The factory is parameterised (``spawn_worker``) instead of env-gated because +pytest runs multiple app instances per session and we want the decision local +to each call, not inferred from ``IX_*`` variables. +""" + +from __future__ import annotations + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager, suppress + +from fastapi import FastAPI + +from ix.adapters.rest.routes import router as rest_router + + +def create_app(*, spawn_worker: bool = True) -> FastAPI: + """Construct the ASGI app. + + Parameters + ---------- + spawn_worker: + When True (default), the lifespan spawns the background worker task. + Integration tests that only exercise the REST adapter pass False so + jobs pile up as ``pending`` and the tests can assert on their state + without a racing worker mutating them. + """ + + @asynccontextmanager + async def lifespan(_app: FastAPI) -> AsyncIterator[None]: + worker_task = None + if spawn_worker: + # Task 3.5 fills in the real spawn. We leave the hook so this + # file doesn't churn again when the worker lands — the adapter + # already imports `ix.worker.loop` once it exists. + try: + from ix.worker.loop import spawn_worker_task + + worker_task = await spawn_worker_task(_app) + except ImportError: + # Worker module isn't in place yet (Task 3.5 not merged). + worker_task = None + try: + yield + finally: + if worker_task is not None: + worker_task.cancel() + with suppress(Exception): + await worker_task + + app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0") + app.include_router(rest_router) + return app diff --git a/tests/integration/test_rest_adapter.py b/tests/integration/test_rest_adapter.py new file mode 100644 index 0000000..27c2389 --- /dev/null +++ b/tests/integration/test_rest_adapter.py @@ -0,0 +1,179 @@ +"""Integration tests for the FastAPI REST adapter (spec §5). + +Uses ``fastapi.testclient.TestClient`` against a real DB. Ollama / OCR probes +are stubbed via the DI hooks the routes expose for testing — in Chunk 4 the +production probes swap in. +""" + +from __future__ import annotations + +from collections.abc import Iterator +from uuid import uuid4 + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep +from ix.app import create_app + + +def _factory_for_url(postgres_url: str): # type: ignore[no-untyped-def] + """Build a TestClient-compatible session factory. + + TestClient runs the ASGI app on its own dedicated event loop (the one it + creates in its sync wrapper), distinct from the per-test loop + pytest-asyncio gives direct tests. Session factories must therefore be + constructed from an engine that was itself created on that inner loop. + We do this lazily: each dependency resolution creates a fresh engine + + factory on the current running loop, which is the TestClient's loop at + route-invocation time. Engine reuse would drag the cross-loop futures + that asyncpg hates back in. + """ + + def _factory(): # type: ignore[no-untyped-def] + eng = create_async_engine(postgres_url, pool_pre_ping=True) + return async_sessionmaker(eng, expire_on_commit=False) + + return _factory + + +@pytest.fixture +def app(postgres_url: str) -> Iterator[TestClient]: + """Spin up the FastAPI app wired to the test DB + stub probes.""" + + app_obj = create_app(spawn_worker=False) + app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url( + postgres_url + ) + app_obj.dependency_overrides[get_probes] = lambda: Probes( + ollama=lambda: "ok", + ocr=lambda: "ok", + ) + with TestClient(app_obj) as client: + yield client + + +def _valid_request_body(client_id: str = "mammon", request_id: str = "r-1") -> dict: + return { + "use_case": "bank_statement_header", + "ix_client_id": client_id, + "request_id": request_id, + "context": {"texts": ["hello world"]}, + } + + +def test_post_jobs_creates_pending(app: TestClient) -> None: + resp = app.post("/jobs", json=_valid_request_body()) + assert resp.status_code == 201, resp.text + body = resp.json() + assert body["status"] == "pending" + assert len(body["ix_id"]) == 16 + assert body["job_id"] + + +def test_post_jobs_idempotent_returns_200(app: TestClient) -> None: + first = app.post("/jobs", json=_valid_request_body("m", "dup")) + assert first.status_code == 201 + first_body = first.json() + + second = app.post("/jobs", json=_valid_request_body("m", "dup")) + assert second.status_code == 200 + second_body = second.json() + assert second_body["job_id"] == first_body["job_id"] + assert second_body["ix_id"] == first_body["ix_id"] + + +def test_get_job_by_id(app: TestClient) -> None: + created = app.post("/jobs", json=_valid_request_body()).json() + resp = app.get(f"/jobs/{created['job_id']}") + assert resp.status_code == 200 + body = resp.json() + assert body["job_id"] == created["job_id"] + assert body["request"]["use_case"] == "bank_statement_header" + assert body["status"] == "pending" + + +def test_get_job_404(app: TestClient) -> None: + resp = app.get(f"/jobs/{uuid4()}") + assert resp.status_code == 404 + + +def test_get_by_correlation_query(app: TestClient) -> None: + created = app.post("/jobs", json=_valid_request_body("mammon", "corr-1")).json() + resp = app.get("/jobs", params={"client_id": "mammon", "request_id": "corr-1"}) + assert resp.status_code == 200 + assert resp.json()["job_id"] == created["job_id"] + + missing = app.get("/jobs", params={"client_id": "mammon", "request_id": "nope"}) + assert missing.status_code == 404 + + +def test_healthz_all_ok(app: TestClient) -> None: + resp = app.get("/healthz") + assert resp.status_code == 200, resp.text + body = resp.json() + assert body["postgres"] == "ok" + assert body["ollama"] == "ok" + assert body["ocr"] == "ok" + + +def test_healthz_503_on_postgres_fail(postgres_url: str) -> None: + """Broken postgres probe → 503. Ollama/OCR still surface in the body.""" + + app_obj = create_app(spawn_worker=False) + + def _bad_factory(): # type: ignore[no-untyped-def] + def _raise(): # type: ignore[no-untyped-def] + raise RuntimeError("db down") + + return _raise + + app_obj.dependency_overrides[get_session_factory_dep] = _bad_factory + app_obj.dependency_overrides[get_probes] = lambda: Probes( + ollama=lambda: "ok", ocr=lambda: "ok" + ) + + with TestClient(app_obj) as client: + resp = client.get("/healthz") + assert resp.status_code == 503 + body = resp.json() + assert body["postgres"] == "fail" + + +def test_healthz_degraded_ollama_is_503(postgres_url: str) -> None: + """Per spec §5: degraded flips HTTP to 503 (only all-ok yields 200).""" + + app_obj = create_app(spawn_worker=False) + app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url( + postgres_url + ) + app_obj.dependency_overrides[get_probes] = lambda: Probes( + ollama=lambda: "degraded", ocr=lambda: "ok" + ) + + with TestClient(app_obj) as client: + resp = client.get("/healthz") + assert resp.status_code == 503 + assert resp.json()["ollama"] == "degraded" + + +def test_metrics_shape(app: TestClient) -> None: + # Submit a couple of pending jobs to populate counters. + app.post("/jobs", json=_valid_request_body("mm", "a")) + app.post("/jobs", json=_valid_request_body("mm", "b")) + + resp = app.get("/metrics") + assert resp.status_code == 200 + body = resp.json() + for key in ( + "jobs_pending", + "jobs_running", + "jobs_done_24h", + "jobs_error_24h", + "by_use_case_seconds", + ): + assert key in body + assert body["jobs_pending"] == 2 + assert body["jobs_running"] == 0 + assert isinstance(body["by_use_case_seconds"], dict)