feat(rest): FastAPI adapter + /jobs, /healthz, /metrics routes (spec §5)
All checks were successful
tests / test (push) Successful in 1m7s
tests / test (pull_request) Successful in 1m5s

Routes:
- POST /jobs: 201 on first insert, 200 on idempotent re-submit.
- GET /jobs/{id}: full Job envelope or 404.
- GET /jobs?client_id=&request_id=: correlation lookup or 404.
- GET /healthz: {postgres, ollama, ocr}; 200 iff all ok (degraded counts
  as non-200 per spec). Postgres probe guarded by a 2 s wait_for.
- GET /metrics: pending/running counts + 24h done/error counters +
  per-use-case avg seconds. Plain JSON, no Prometheus.

create_app(spawn_worker=bool) parameterises worker spawning so tests that
only need REST pass False. Worker spawn is tolerant of the loop module not
being importable yet (Task 3.5 fills it in).

Probes are a DI bundle — production wiring swaps them in at startup
(Chunk 4); tests inject canned ok/fail callables. Session factory is also
DI'd so tests can point at a per-loop engine and sidestep the async-pg
cross-loop future issue that bit the jobs_repo fixture.

9 new integration tests; unit suite unchanged. Forgejo Actions trigger is
flaky; local verification is the gate (unit + integration green locally).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dirk Riemann 2026-04-18 11:47:04 +02:00
parent 04a415a191
commit e46c44f1e0
6 changed files with 528 additions and 0 deletions

View file

@ -0,0 +1,6 @@
"""Transport adapters — REST (always on) + pg_queue (optional).
Adapters are thin: they marshal external events into :class:`RequestIX`
payloads that land in ``ix_jobs`` as pending rows, and they read back from
the same store. They do NOT run the pipeline themselves; the worker does.
"""

View file

@ -0,0 +1,5 @@
"""FastAPI REST adapter — POST /jobs / GET /jobs / GET /healthz / GET /metrics.
Routes are defined in ``routes.py``. The ``create_app`` factory in
``ix.app`` wires them up alongside the worker lifespan.
"""

View file

@ -0,0 +1,227 @@
"""REST routes (spec §5).
The routes depend on two injected objects:
* a session factory (``get_session_factory_dep``): swapped in tests so we can
use the fixture's per-test engine instead of the lazy process-wide one in
``ix.store.engine``.
* a :class:`Probes` bundle (``get_probes``): each probe returns the
per-subsystem state string used by ``/healthz``. Tests inject canned
probes; Chunk 4 wires the real Ollama/Surya ones.
``/healthz`` has a strict 2-second postgres timeout we use an
``asyncio.wait_for`` around a ``SELECT 1`` roundtrip so a broken pool or a
hung connection can't wedge the healthcheck endpoint.
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Annotated, Literal
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ix.adapters.rest.schemas import HealthStatus, JobSubmitResponse, MetricsResponse
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.store import jobs_repo
from ix.store.engine import get_session_factory
from ix.store.models import IxJob
@dataclass
class Probes:
"""Injected subsystem-probe callables for ``/healthz``.
Each callable returns the literal status string expected in the body.
Probes are sync by design none of the real ones need awaits today and
keeping them sync lets tests pass plain lambdas. Real probes that need
async work run the call through ``asyncio.run_in_executor`` inside the
callable (Chunk 4).
"""
ollama: Callable[[], Literal["ok", "degraded", "fail"]]
ocr: Callable[[], Literal["ok", "fail"]]
def get_session_factory_dep() -> async_sessionmaker[AsyncSession]:
"""Default DI: the process-wide store factory. Tests override this."""
return get_session_factory()
def get_probes() -> Probes:
"""Default DI: a pair of ``fail`` probes.
Production wiring (Chunk 4) overrides this factory with real Ollama +
Surya probes at app-startup time. Integration tests override via
``app.dependency_overrides[get_probes]`` with a canned ``ok`` pair.
The default here ensures a mis-wired deployment surfaces clearly in
``/healthz`` rather than claiming everything is fine by accident.
"""
return Probes(ollama=lambda: "fail", ocr=lambda: "fail")
router = APIRouter()
@router.post("/jobs", response_model=JobSubmitResponse, status_code=201)
async def submit_job(
request: RequestIX,
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> JobSubmitResponse:
"""Submit a new job.
Per spec §5: 201 on first insert, 200 on idempotent re-submit of an
existing ``(client_id, request_id)`` pair. We detect the second case by
snapshotting the pre-insert row set and comparing ``created_at``.
"""
async with session_factory() as session:
existing = await jobs_repo.get_by_correlation(
session, request.ix_client_id, request.request_id
)
job = await jobs_repo.insert_pending(
session, request, callback_url=request.callback_url
)
await session.commit()
if existing is not None:
# Idempotent re-submit — flip to 200. FastAPI's declared status_code
# is 201, but setting response.status_code overrides it per-call.
response.status_code = 200
return JobSubmitResponse(job_id=job.job_id, ix_id=job.ix_id, status=job.status)
@router.get("/jobs/{job_id}", response_model=Job)
async def get_job(
job_id: UUID,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/jobs", response_model=Job)
async def lookup_job_by_correlation(
client_id: Annotated[str, Query(...)],
request_id: Annotated[str, Query(...)],
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get_by_correlation(session, client_id, request_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/healthz")
async def healthz(
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
probes: Annotated[Probes, Depends(get_probes)],
) -> HealthStatus:
"""Per spec §5: postgres / ollama / ocr; 200 iff all three == ok."""
postgres_state: Literal["ok", "fail"] = "fail"
try:
async def _probe() -> None:
async with session_factory() as session:
await session.execute(text("SELECT 1"))
await asyncio.wait_for(_probe(), timeout=2.0)
postgres_state = "ok"
except Exception:
postgres_state = "fail"
try:
ollama_state = probes.ollama()
except Exception:
ollama_state = "fail"
try:
ocr_state = probes.ocr()
except Exception:
ocr_state = "fail"
body = HealthStatus(
postgres=postgres_state, ollama=ollama_state, ocr=ocr_state
)
if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok":
response.status_code = 503
return body
@router.get("/metrics", response_model=MetricsResponse)
async def metrics(
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> MetricsResponse:
"""Counters over the last 24h — plain JSON per spec §5."""
since = datetime.now(UTC) - timedelta(hours=24)
async with session_factory() as session:
pending = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "pending")
)
running = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "running")
)
done_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "done", IxJob.finished_at >= since)
)
error_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "error", IxJob.finished_at >= since)
)
# Per-use-case average seconds. ``request`` is JSONB, so we dig out
# the use_case key via ->>. Only consider rows that both started and
# finished in the window (can't compute elapsed otherwise).
rows = (
await session.execute(
text(
"SELECT request->>'use_case' AS use_case, "
"AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) "
"FROM ix_jobs "
"WHERE status='done' AND finished_at IS NOT NULL "
"AND started_at IS NOT NULL AND finished_at >= :since "
"GROUP BY request->>'use_case'"
),
{"since": since},
)
).all()
by_use_case = {row[0]: float(row[1]) for row in rows if row[0] is not None}
return MetricsResponse(
jobs_pending=int(pending or 0),
jobs_running=int(running or 0),
jobs_done_24h=int(done_24h or 0),
jobs_error_24h=int(error_24h or 0),
by_use_case_seconds=by_use_case,
)

View file

@ -0,0 +1,52 @@
"""REST-adapter request / response bodies.
Most payloads reuse the core contracts directly (:class:`RequestIX`,
:class:`Job`). The only adapter-specific shape is the lightweight POST /jobs
response (`job_id`, `ix_id`, `status`) callers don't need the full Job
envelope back on submit; they poll ``GET /jobs/{id}`` for that.
"""
from __future__ import annotations
from typing import Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class JobSubmitResponse(BaseModel):
"""What POST /jobs returns: just enough to poll or subscribe to callbacks."""
model_config = ConfigDict(extra="forbid")
job_id: UUID
ix_id: str
status: Literal["pending", "running", "done", "error"]
class HealthStatus(BaseModel):
"""Body of GET /healthz.
Each field reports per-subsystem state. Overall HTTP status is 200 iff
every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"``
when the backend is reachable but the default model isn't pulled —
monitoring surfaces that as non-200.
"""
model_config = ConfigDict(extra="forbid")
postgres: Literal["ok", "fail"]
ollama: Literal["ok", "degraded", "fail"]
ocr: Literal["ok", "fail"]
class MetricsResponse(BaseModel):
"""Body of GET /metrics — plain JSON (no Prometheus format for MVP)."""
model_config = ConfigDict(extra="forbid")
jobs_pending: int
jobs_running: int
jobs_done_24h: int
jobs_error_24h: int
by_use_case_seconds: dict[str, float]

59
src/ix/app.py Normal file
View file

@ -0,0 +1,59 @@
"""FastAPI app factory + lifespan.
``create_app()`` wires the REST router on top of a lifespan that spawns the
worker loop (Task 3.5) and, optionally, the pg_queue listener (Task 3.6).
Tests that don't care about the worker call ``create_app(spawn_worker=False)``
so the lifespan returns cleanly.
The factory is parameterised (``spawn_worker``) instead of env-gated because
pytest runs multiple app instances per session and we want the decision local
to each call, not inferred from ``IX_*`` variables.
"""
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager, suppress
from fastapi import FastAPI
from ix.adapters.rest.routes import router as rest_router
def create_app(*, spawn_worker: bool = True) -> FastAPI:
"""Construct the ASGI app.
Parameters
----------
spawn_worker:
When True (default), the lifespan spawns the background worker task.
Integration tests that only exercise the REST adapter pass False so
jobs pile up as ``pending`` and the tests can assert on their state
without a racing worker mutating them.
"""
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
worker_task = None
if spawn_worker:
# Task 3.5 fills in the real spawn. We leave the hook so this
# file doesn't churn again when the worker lands — the adapter
# already imports `ix.worker.loop` once it exists.
try:
from ix.worker.loop import spawn_worker_task
worker_task = await spawn_worker_task(_app)
except ImportError:
# Worker module isn't in place yet (Task 3.5 not merged).
worker_task = None
try:
yield
finally:
if worker_task is not None:
worker_task.cancel()
with suppress(Exception):
await worker_task
app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0")
app.include_router(rest_router)
return app

View file

@ -0,0 +1,179 @@
"""Integration tests for the FastAPI REST adapter (spec §5).
Uses ``fastapi.testclient.TestClient`` against a real DB. Ollama / OCR probes
are stubbed via the DI hooks the routes expose for testing in Chunk 4 the
production probes swap in.
"""
from __future__ import annotations
from collections.abc import Iterator
from uuid import uuid4
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep
from ix.app import create_app
def _factory_for_url(postgres_url: str): # type: ignore[no-untyped-def]
"""Build a TestClient-compatible session factory.
TestClient runs the ASGI app on its own dedicated event loop (the one it
creates in its sync wrapper), distinct from the per-test loop
pytest-asyncio gives direct tests. Session factories must therefore be
constructed from an engine that was itself created on that inner loop.
We do this lazily: each dependency resolution creates a fresh engine +
factory on the current running loop, which is the TestClient's loop at
route-invocation time. Engine reuse would drag the cross-loop futures
that asyncpg hates back in.
"""
def _factory(): # type: ignore[no-untyped-def]
eng = create_async_engine(postgres_url, pool_pre_ping=True)
return async_sessionmaker(eng, expire_on_commit=False)
return _factory
@pytest.fixture
def app(postgres_url: str) -> Iterator[TestClient]:
"""Spin up the FastAPI app wired to the test DB + stub probes."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok",
ocr=lambda: "ok",
)
with TestClient(app_obj) as client:
yield client
def _valid_request_body(client_id: str = "mammon", request_id: str = "r-1") -> dict:
return {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {"texts": ["hello world"]},
}
def test_post_jobs_creates_pending(app: TestClient) -> None:
resp = app.post("/jobs", json=_valid_request_body())
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["status"] == "pending"
assert len(body["ix_id"]) == 16
assert body["job_id"]
def test_post_jobs_idempotent_returns_200(app: TestClient) -> None:
first = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert first.status_code == 201
first_body = first.json()
second = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert second.status_code == 200
second_body = second.json()
assert second_body["job_id"] == first_body["job_id"]
assert second_body["ix_id"] == first_body["ix_id"]
def test_get_job_by_id(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body()).json()
resp = app.get(f"/jobs/{created['job_id']}")
assert resp.status_code == 200
body = resp.json()
assert body["job_id"] == created["job_id"]
assert body["request"]["use_case"] == "bank_statement_header"
assert body["status"] == "pending"
def test_get_job_404(app: TestClient) -> None:
resp = app.get(f"/jobs/{uuid4()}")
assert resp.status_code == 404
def test_get_by_correlation_query(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body("mammon", "corr-1")).json()
resp = app.get("/jobs", params={"client_id": "mammon", "request_id": "corr-1"})
assert resp.status_code == 200
assert resp.json()["job_id"] == created["job_id"]
missing = app.get("/jobs", params={"client_id": "mammon", "request_id": "nope"})
assert missing.status_code == 404
def test_healthz_all_ok(app: TestClient) -> None:
resp = app.get("/healthz")
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["postgres"] == "ok"
assert body["ollama"] == "ok"
assert body["ocr"] == "ok"
def test_healthz_503_on_postgres_fail(postgres_url: str) -> None:
"""Broken postgres probe → 503. Ollama/OCR still surface in the body."""
app_obj = create_app(spawn_worker=False)
def _bad_factory(): # type: ignore[no-untyped-def]
def _raise(): # type: ignore[no-untyped-def]
raise RuntimeError("db down")
return _raise
app_obj.dependency_overrides[get_session_factory_dep] = _bad_factory
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
body = resp.json()
assert body["postgres"] == "fail"
def test_healthz_degraded_ollama_is_503(postgres_url: str) -> None:
"""Per spec §5: degraded flips HTTP to 503 (only all-ok yields 200)."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "degraded", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
assert resp.json()["ollama"] == "degraded"
def test_metrics_shape(app: TestClient) -> None:
# Submit a couple of pending jobs to populate counters.
app.post("/jobs", json=_valid_request_body("mm", "a"))
app.post("/jobs", json=_valid_request_body("mm", "b"))
resp = app.get("/metrics")
assert resp.status_code == 200
body = resp.json()
for key in (
"jobs_pending",
"jobs_running",
"jobs_done_24h",
"jobs_error_24h",
"by_use_case_seconds",
):
assert key in body
assert body["jobs_pending"] == 2
assert body["jobs_running"] == 0
assert isinstance(body["by_use_case_seconds"], dict)