Compare commits

...

60 commits

Author SHA1 Message Date
f6934bdf2a chore(compose): pin project name to infoxtractor
All checks were successful
tests / test (push) Successful in 2m6s
Without `name:`, Compose infers the project from the parent directory
(`app/` on the server), so containers show up under an "app" stack in
the infra monitoring dashboard instead of "infoxtractor".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:57:16 +02:00
ce33aff174 chore: MVP deployed (#42)
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 12:08:21 +00:00
842c4da90c chore: MVP deployed — readme, AGENTS.md status, deploy runbook filled in
All checks were successful
tests / test (push) Successful in 1m16s
tests / test (pull_request) Successful in 1m12s
First deploy done 2026-04-18. E2E extraction of the bank_statement_header
use case completes in 35 s against the live service, with 7 of 9 header
fields provenance-verified + text-agreement-green. closing_balance
asserts from spec §12 all pass.

Updates:
- README.md: status -> "MVP deployed"; worked example curl snippet;
  pointers to deployment runbook + spec + plan.
- AGENTS.md: status line updated with the live URL + date.
- pyproject.toml: version comment referencing the first deploy.
- docs/deployment.md: "First deploy" section filled in with times,
  field-level extraction result, plus a log of every small Docker/ops
  follow-up PR that had to land to make the first deploy healthy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:08:07 +02:00
95a576f744 fix(genai): extract trailing JSON (#41)
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 12:05:46 +00:00
81e3b9a7d0 fix(genai): drop Ollama format flag; extract trailing JSON from response
All checks were successful
tests / test (push) Successful in 1m30s
tests / test (pull_request) Successful in 1m21s
qwen3:14b (and deepseek-r1, other reasoning models) wrap their output
in <think>…</think> chains-of-thought before emitting real output.
With format=json the constrained sampler terminated immediately at
`{}` because the thinking block wasn't valid JSON; without format the
model thinks normally and appends the actual JSON at the end.

OllamaClient now omits the format flag and extracts the outermost
balanced `{…}` block from the response (brace depth counter, string-
literal aware). Works for reasoning models, ```json``` code-fenced
outputs, and plain JSON alike.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:05:28 +02:00
763407ba1c fix(genai): schema in prompt (#40)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 12:02:38 +00:00
34f8268cd5 fix(genai): inject JSON schema into Ollama system prompt
All checks were successful
tests / test (push) Successful in 1m8s
tests / test (pull_request) Successful in 1m18s
format=json loose mode gives valid JSON but no shape — models default
to emitting {} when the system prompt doesn't list fields. Prepend a
schema-guidance system message with the full Pydantic schema (after
the existing null-branch sanitiser) so the model sees exactly what
shape to produce. Pydantic still validates on parse.

Unit tests updated to check the schema message is prepended without
disturbing the caller's own messages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:02:25 +02:00
9c73895318 fix(genai): ollama loose JSON (#39)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 11:59:18 +00:00
2efc4d1088 fix(genai): send format="json" (loose mode) to Ollama
All checks were successful
tests / test (push) Successful in 1m13s
tests / test (pull_request) Successful in 1m23s
Ollama 0.11.8 segfaults on any Pydantic-shaped structured-output schema
with $ref, anyOf, or pattern — confirmed on the deploy host with the
simplest MVP case (BankStatementHeader alone). The earlier null-stripping
sanitiser wasn't enough.

Switch to format="json", which is "emit valid JSON" mode. We're already
describing the exact JSON shape in the system prompt (via GenAIStep +
the use case's citation instruction appendix) and validating the
response body through Pydantic on parse — which raises IX_002_001 on
schema mismatch, exactly as before.

Stronger guarantees can come back later via a newer Ollama, an API
fix, or a different GenAIClient impl. None of that is needed for the
MVP to work end to end.

Unit tests: the sanitiser left in place (harmless, still tested). The
"happy path" test now asserts format == "json".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:59:04 +02:00
f6ce97d7fd fix(compose): persist model caches (#38)
All checks were successful
tests / test (push) Successful in 1m8s
2026-04-18 11:49:24 +00:00
9e33923f71 fix(compose): persist Surya + HF caches so rebuilds don't redownload models
All checks were successful
tests / test (push) Successful in 2m1s
tests / test (pull_request) Successful in 1m18s
First /healthz call on a fresh container triggers Surya to fetch the
text-recognition (1.34 GB) and detection (73 MB) models from HuggingFace.
Without a volume they land in the container fs and vanish on every
rebuild, which is every deploy.

Mount named volumes for /root/.cache/datalab (Surya) and
/root/.cache/huggingface. Rebuild now keeps /healthz warm.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:49:09 +02:00
65670af78f fix(genai): sanitise Optional for Ollama (#37)
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 11:48:43 +00:00
9cb62d69af fix(genai): strip null branches from anyOf before sending to Ollama
All checks were successful
tests / test (push) Successful in 1m33s
tests / test (pull_request) Successful in 4m29s
Ollama 0.11.8's llama.cpp structured-output implementation segfaults on
Pydantic v2's standard Optional pattern:

    {"anyOf": [{"type": "string"}, {"type": "null"}]}

Confirmed on the deploy host: /api/chat request with the MVP's
ProvenanceWrappedResponse schema crashed Ollama with SIGSEGV; the client
saw httpx RemoteProtocolError → IX_002_000.

New _sanitise_schema_for_ollama walks the schema recursively and drops
"type: null" branches from every anyOf. Single-branch unions are
inlined so sibling keys (default, title) survive. This only narrows
what the LLM is *told* it may emit; Pydantic still validates the real
response body against the original schema and accepts None for
Optional fields if they were absent or explicitly null.

Existing unit tests updated: the "happy path" test no longer pins the
format to `_Schema.model_json_schema()` verbatim — instead it asserts
the sanitisation effect on a known-Optional field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:48:26 +02:00
4c0746950e fix(deps): surya ^0.17 (#36)
All checks were successful
tests / test (push) Successful in 2m58s
2026-04-18 11:21:54 +00:00
a418969251 fix(deps): pin surya-ocr ^0.17 and drop cu124 index
All checks were successful
tests / test (push) Successful in 1m23s
tests / test (pull_request) Successful in 2m23s
Our client code imports surya.foundation (added in 0.17). The earlier
cu124 torch pin forced uv to downgrade surya to 0.14.1, which doesn't
have that module and depends on a transformers version that lacks
QuantizedCacheConfig. Net: ocr: fail at /healthz.

Drop the cu124 index pin. surya 0.17.1 needs torch >= 2.7, which the
default pypi torch (2.11) satisfies. The deploy host's CUDA 12.4
driver doesn't match torch 2.11's cu13 wheels, so CUDA init warns and
the GPU isn't available — torch + Surya transparently fall back to CPU.
Slower than GPU but correct for MVP. A host driver upgrade later will
unlock GPU with no code changes.

Unit suite stays green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:21:40 +02:00
fae8c3267f fix(deps): torch cu124 (#35)
All checks were successful
tests / test (push) Successful in 3m48s
2026-04-18 11:02:38 +00:00
d90117807b fix(deps): pin torch to the CUDA 12.4 wheel channel
All checks were successful
tests / test (push) Successful in 3m21s
tests / test (pull_request) Successful in 3m40s
The default pypi torch (2.11 as of lockfile) ships cu13 wheels, which
refuse to initialise against the deploy host's NVIDIA 12.4 driver
(UserWarning: "driver on your system is too old (found version 12040)").
/healthz reported ocr: fail because Surya couldn't pick up the GPU.

Use `tool.uv.sources` to route torch through PyTorch's cu124 index.
That pulls torch 2.6.0+cu124 (still satisfies surya-ocr >= 0.9). Lock
updated. transformers downgraded to 4.57.6, triton to 3.2.0 — all
compatible with surya and each other.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:02:26 +02:00
44c3428993 fix(deploy): network_mode: host (#34)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 11:00:14 +00:00
c7dc40c51e fix(deploy): switch to network_mode: host — reach postgis + ollama on loopback
All checks were successful
tests / test (push) Successful in 1m12s
tests / test (pull_request) Successful in 1m10s
The shared postgis container is bound to 127.0.0.1 on the host (security
hardening, infrastructure §T12). Ollama is similarly LAN-hardened. The
previous `host.docker.internal + extra_hosts: host-gateway` approach
points at the bridge gateway IP, not loopback, so the container couldn't
reach either service.

Switch to `network_mode: host` (same pattern goldstein uses) and update
the default IX_POSTGRES_URL / IX_OLLAMA_URL to 127.0.0.1. Keep the GPU
reservation block; drop the now-meaningless ports: declaration (host mode
publishes directly).

AppConfig defaults + .env.example + test_config assertions + inline
docstring examples all follow.

Caught on fourth deploy attempt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:00:02 +02:00
39a6c10634 fix(compose): drop runtime: nvidia (#33)
All checks were successful
tests / test (push) Successful in 1m19s
2026-04-18 10:56:15 +00:00
9f793da778 fix(compose): drop runtime: nvidia — use deploy.resources.devices only
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
Docker on the deploy host doesn't register 'nvidia' as a named runtime
(modern nvidia-container-toolkit hooks via --gpus all / resources.devices
instead). Immich-ml on the same host uses only deploy.resources.devices
with driver: nvidia, which is enough. Drop the legacy runtime line.

Caught on third deploy attempt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:56:03 +02:00
4802e086a0 fix(docker): README.md for hatchling (#32)
All checks were successful
tests / test (push) Successful in 2m27s
2026-04-18 10:42:41 +00:00
f54f0d317d fix(docker): include README.md in the uv sync COPY so hatchling finds it
All checks were successful
tests / test (push) Successful in 1m22s
tests / test (pull_request) Successful in 1m36s
pyproject.toml names README.md as the readme; hatchling validates that
file exists when uv sync resolves the editable install of the project
itself. Caught on second deploy build.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:42:29 +02:00
e6fcd5fc54 fix(docker): uv via standalone installer (#31)
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 10:33:11 +00:00
1c31444611 fix(docker): install uv via standalone installer (no system pip)
All checks were successful
tests / test (pull_request) Successful in 1m28s
tests / test (push) Successful in 1m19s
Python 3.12 from deadsnakes on Ubuntu 22.04 drops `distutils` from the
stdlib, and Ubuntu's system pip still imports from it — so `pip install`
fails immediately with ModuleNotFoundError: distutils. Switch to the uv
standalone installer, which doesn't need pip at all.

Caught during the first deploy build.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:32:55 +02:00
a9e510362d chore(model): qwen3:14b default (#30)
All checks were successful
tests / test (push) Successful in 2m55s
unblock first deploy
2026-04-18 10:20:38 +00:00
5ee74f367c chore(model): switch default IX_DEFAULT_MODEL to qwen3:14b (already on host)
All checks were successful
tests / test (push) Successful in 1m52s
tests / test (pull_request) Successful in 1m45s
The home server's Ollama doesn't have gpt-oss:20b pulled; qwen3:14b is
already there and is what mammon's chat agent uses. Switching the default
now so the first deploy passes the /healthz ollama probe without an extra
`ollama pull` step. The spec lists gpt-oss:20b as a concrete example;
qwen3:14b is equally on-prem and Ollama-structured-output-compatible.

Touched: AppConfig default, BankStatementHeader Request.default_model,
.env.example, setup_server.sh ollama-list check, AGENTS.md, deployment.md,
live tests. Unit tests that hard-coded the old model string but don't
assert the default were left alone.

Also: ASCII en-dash in e2e_smoke.py Paperless-style text (ruff RUF001).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:20:23 +02:00
f6cc99f062 feat(e2e): e2e_smoke.py deploy gate (#29)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.4.
2026-04-18 10:18:23 +00:00
d0648fe01d feat(e2e): scripts/e2e_smoke.py — live deploy gate
All checks were successful
tests / test (push) Successful in 1m11s
tests / test (pull_request) Successful in 2m14s
Runs from the Mac after every `git push server main`.

Flow: starts a tiny HTTP server on the Mac's LAN IP serving
tests/fixtures/synthetic_giro.pdf → POST /jobs with bank_statement_header
+ Paperless-style texts so text_agreement has something to check against →
poll GET /jobs/{id} until terminal → assert status=done, bank_name
non-empty, closing_balance.provenance_verified=True, text_agreement=True,
elapsed < 60 s. Non-zero exit blocks the deploy.

Uses only stdlib (http.server, urllib) — no extra deps on the Mac-side,
no test framework overhead.

Task 5.4 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:18:07 +02:00
5841bc09c0 feat(deploy): setup_server.sh + deployment runbook (#28)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.2.
2026-04-18 10:17:14 +00:00
6d1bc720b4 feat(deploy): setup_server.sh + deployment runbook
All checks were successful
tests / test (push) Successful in 1m9s
tests / test (pull_request) Successful in 1m10s
- scripts/setup_server.sh: idempotent one-shot. Creates bare repo,
  post-receive hook (which rebuilds docker compose + gates on /healthz),
  infoxtractor Postgres role + DB on the shared postgis container, .env
  (0600) from .env.example with the password substituted in, verifies
  gpt-oss:20b is pulled.
- docs/deployment.md: topology, one-time setup command, normal deploy
  workflow, rollback-via-revert pattern (never force-push main),
  operational checklists for the common /healthz degraded states.
- First deploy section reserved; filled in after Task 5.3 runs.

Task 5.2 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:16:58 +02:00
3c7d607776 feat(docker): Dockerfile + compose (#27)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.1.
2026-04-18 10:15:45 +00:00
4646180942 feat(docker): Dockerfile (CUDA+python3.12) + compose with GPU reservation
All checks were successful
tests / test (push) Successful in 1m13s
tests / test (pull_request) Successful in 1m10s
- nvidia/cuda:12.4 runtime base matches the deploy host's driver stack
  (immich-ml / monitoring use the same pattern).
- python3.12 via deadsnakes (Ubuntu 22.04 ships 3.10 only).
- System deps: libmagic1 (python-magic), libgl1/libglib2 (PIL + PyMuPDF
  headless), curl (post-receive /healthz probe), ca-certs (httpx TLS).
- uv sync --frozen --no-dev --extra ocr installs prod + Surya/torch;
  dev tooling stays out of the image.
- CMD runs `alembic upgrade head && uvicorn ix.app:create_app` — idempotent.
- Compose: single service, port 8994, GPU reservation mirroring immich-ml,
  labels for monitoring dashboard auto-discovery + backup opt-in.
- host.docker.internal:host-gateway lets ix reach the host's Ollama and
  postgis containers (same pattern mammon uses).

Task 5.1 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:15:26 +02:00
c234b67bbf Merge pull request 'feat(app): production wiring factories + /healthz real probes (Task 4.3)' (#26) from feat/production-wiring into main
All checks were successful
tests / test (push) Successful in 1m10s
2026-04-18 10:09:34 +00:00
ebefee4184 feat(app): production wiring — factories, pipeline, /healthz real probes
All checks were successful
tests / test (push) Successful in 1m9s
tests / test (pull_request) Successful in 1m13s
Task 4.3 closes the loop on Chunk 4: the FastAPI lifespan now selects
fake vs real clients via IX_TEST_MODE (new AppConfig field), wires
/healthz probes to the live selfcheck() on OllamaClient / SuryaOCRClient,
and spawns the worker with a production Pipeline factory that builds
SetupStep -> OCRStep -> GenAIStep -> ReliabilityStep -> ResponseHandler
over the injected clients.

Factories:
- make_genai_client(cfg) -> FakeGenAIClient | OllamaClient
- make_ocr_client(cfg)   -> FakeOCRClient  | SuryaOCRClient (spec §6.2)

Probes run the async selfcheck on a fresh event loop in a short-lived
thread so they're safe to call from either sync callers or a live
FastAPI handler without stalling the request loop.

Drops the worker-loop spawn_worker_task stub — the app module owns the
production spawn directly.

Tests: +11 unit tests (5 factories + 6 app-wiring / probe adapter /
pipeline build). Full suite: 236 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:09:11 +02:00
b737ed7b21 Merge pull request 'feat(ocr): SuryaOCRClient real OCR backend (spec 6.2)' (#25) from feat/surya-client into main
All checks were successful
tests / test (push) Successful in 1m10s
2026-04-18 10:04:41 +00:00
322f6b2b1b feat(ocr): SuryaOCRClient — real OCR backend (spec §6.2)
All checks were successful
tests / test (push) Successful in 1m14s
tests / test (pull_request) Successful in 1m14s
Runs Surya's detection + recognition over PIL images rendered from each
Page's source file (PDFs via PyMuPDF, images via Pillow). Lazy warm_up
so FastAPI lifespan start stays predictable. Deferred Surya/torch
imports keep the base install slim — the heavy deps stay under [ocr].

Extends OCRClient Protocol with optional files + page_metadata kwargs
so the engine can resolve each page back to its on-disk source; Fake
accepts-and-ignores to keep hermetic tests unchanged.

selfcheck() runs the predictors on a 1x1 PIL image — wired into /healthz
by Task 4.3.

Tests: 6 hermetic unit tests (Surya predictors mocked, no model
download); 2 live tests gated on IX_TEST_OLLAMA=1 (never run in CI).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:04:19 +02:00
0f045f814a Merge pull request 'feat(genai): OllamaClient structured-output /api/chat backend (spec 6)' (#24) from feat/ollama-client into main
All checks were successful
tests / test (push) Successful in 1m15s
2026-04-18 09:58:38 +00:00
90e46b707d feat(genai): OllamaClient — structured-output /api/chat backend (spec §6)
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m5s
Real GenAIClient for the production pipeline. Sends `format=<pydantic JSON
schema>`, `stream=false`, and mapped options (`temperature`; drops
`reasoning_effort`). Content-parts lists joined to a single string since
MVP models don't speak native content-parts. Error mapping per spec:
connection/timeout/5xx → IX_002_000, schema violations → IX_002_001.
`selfcheck()` probes /api/tags with a fixed 5 s timeout for /healthz.

Tests: 10 hermetic pytest-httpx unit tests; 2 live tests gated on
IX_TEST_OLLAMA=1 (never run in CI).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:58:15 +02:00
6183b9c886 Merge pull request 'feat(pg-queue): LISTEN ix_jobs_new + 10s fallback poll' (#23) from feat/pg-queue-adapter into main
All checks were successful
tests / test (push) Successful in 1m11s
2026-04-18 09:52:45 +00:00
050f80dcd7 feat(pg-queue): LISTEN ix_jobs_new + 10s fallback poll (spec §4)
All checks were successful
tests / test (push) Successful in 1m8s
tests / test (pull_request) Successful in 1m9s
PgQueueListener:
- Dedicated asyncpg connection outside the SQLAlchemy pool (LISTEN
  needs a persistent connection; pooled connections check in/out).
- Exposes wait_for_work(timeout) — resolves on NOTIFY or timeout,
  whichever fires first. The worker treats both wakes identically.
- asyncpg_dsn_from_sqlalchemy_url strips the +asyncpg driver segment
  and percent-decodes the password so the same URL in IX_POSTGRES_URL
  works for both SQLAlchemy and raw asyncpg.

app.py lifespan now also spawns the listener alongside the worker;
both are gated on spawn_worker=True so REST-only tests stay fast.

2 new integration tests: NOTIFY path (wake within 2 s despite 60 s
poll) + missed-NOTIFY path (fallback poll recovers within 5 s). 33
integration tests total, 209 unit. Forgejo Actions trigger is flaky;
local verification is the gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:52:26 +02:00
415e03fba1 Merge pull request 'feat(worker): async worker loop + one-shot callback delivery' (#22) from feat/worker-loop into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:50:11 +00:00
406a7ea2fd feat(worker): async worker loop + one-shot callback delivery (spec §5)
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m8s
Worker:
- Startup: sweep_orphans(now, max_running_seconds) rescues rows stuck
  in 'running' from a crashed prior process.
- Loop: claim_next_pending → build pipeline via injected factory → run
  → mark_done/mark_error → deliver callback if set → record outcome.
- Non-IX exceptions from the pipeline collapse to IX_002_000 so callers
  see a stable error code.
- Sleep loop uses a cancellable wait so the stop event reacts
  immediately; the wait_for_work hook is ready for Task 3.6 to plug in
  the LISTEN-driven event without the worker knowing about NOTIFY.

Callback:
- One-shot POST, 2xx → delivered, anything else (incl. connect/timeout
  exceptions) → failed. No retries.
- Callback record never reverts the job's terminal state — GET /jobs/{id}
  stays the authoritative fallback.

7 integration tests: happy path, pipeline-raise → error, callback 2xx,
callback 5xx, orphan sweep on startup, no-callback rows stay
callback_status=None (x2 via parametrize). Unit suite still 209.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:49:54 +02:00
ee023d6e34 Merge pull request 'feat(rest): FastAPI adapter + /jobs /healthz /metrics (spec 5)' (#21) from feat/rest-adapter into main
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 09:47:35 +00:00
e46c44f1e0 feat(rest): FastAPI adapter + /jobs, /healthz, /metrics routes (spec §5)
All checks were successful
tests / test (push) Successful in 1m7s
tests / test (pull_request) Successful in 1m5s
Routes:
- POST /jobs: 201 on first insert, 200 on idempotent re-submit.
- GET /jobs/{id}: full Job envelope or 404.
- GET /jobs?client_id=&request_id=: correlation lookup or 404.
- GET /healthz: {postgres, ollama, ocr}; 200 iff all ok (degraded counts
  as non-200 per spec). Postgres probe guarded by a 2 s wait_for.
- GET /metrics: pending/running counts + 24h done/error counters +
  per-use-case avg seconds. Plain JSON, no Prometheus.

create_app(spawn_worker=bool) parameterises worker spawning so tests that
only need REST pass False. Worker spawn is tolerant of the loop module not
being importable yet (Task 3.5 fills it in).

Probes are a DI bundle — production wiring swaps them in at startup
(Chunk 4); tests inject canned ok/fail callables. Session factory is also
DI'd so tests can point at a per-loop engine and sidestep the async-pg
cross-loop future issue that bit the jobs_repo fixture.

9 new integration tests; unit suite unchanged. Forgejo Actions trigger is
flaky; local verification is the gate (unit + integration green locally).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:47:04 +02:00
04a415a191 Merge pull request 'feat(store): JobsRepo CRUD over ix_jobs + integration fixtures' (#20) from feat/jobs-repo into main
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 09:43:28 +00:00
141153ffa7 feat(store): JobsRepo CRUD over ix_jobs + integration fixtures (spec §4)
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
JobsRepo covers the full job-lifecycle surface:

- insert_pending: idempotent on (client_id, request_id) via ON CONFLICT
  DO NOTHING + re-select; assigns a 16-hex ix_id.
- claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never
  double-dispatch a row.
- get / get_by_correlation: hydrates JSONB back through Pydantic.
- mark_done: done iff response.error is None, else error.
- mark_error: explicit convenience wrapper.
- update_callback_status: delivered | failed (no status transition).
- sweep_orphans: time-based rescue of stuck running rows; attempts++.

Integration fixtures (tests/integration/conftest.py):
- Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is
  set (unit suite stays runnable on a bare laptop).
- Alembic upgrade/downgrade runs in a subprocess so its internal
  asyncio.run() doesn't collide with pytest-asyncio's loop.
- Per-test engine + truncate so loops never cross and tests start clean.

15 integration tests against a live postgres:16, including SKIP LOCKED
concurrency + orphan sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:43:11 +02:00
8bb220ae43 Merge pull request 'feat(config): AppConfig + cached get_config()' (#19) from feat/config into main
All checks were successful
tests / test (push) Successful in 59s
2026-04-18 09:39:00 +00:00
95728accbf feat(config): AppConfig + cached get_config() (spec §9)
All checks were successful
tests / test (push) Successful in 1m1s
tests / test (pull_request) Successful in 58s
Typed pydantic-settings view over every IX_* env var, defaults matching
spec §9 exactly. @lru_cache-wrapped accessor so parsing/validation happens
once per process; tests clear the cache via get_config.cache_clear().

extra="ignore" keeps the container robust against typo'd env vars in
production .env files. engine.py's URL resolver now goes through
get_config() when ix.config is importable (bootstrap fallback remains so
hypothetical early-import callers don't crash).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:38:44 +02:00
dc6d28bda1 Merge pull request 'feat(store): Alembic scaffolding + initial ix_jobs migration' (#18) from feat/alembic-init into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:37:37 +00:00
1c60c30084 feat(store): Alembic scaffolding + initial ix_jobs migration (spec §4)
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m2s
Lands the async-friendly Alembic env (NullPool, reads IX_POSTGRES_URL), the
hand-written 001 migration matching the spec's table layout exactly
(CHECK on status, partial index on pending rows, UNIQUE on
(client_id, request_id)), the SQLAlchemy 2.0 ORM mapping, and a lazy
engine/session factory. The factory reads the URL through ix.config when
available; Task 3.2 makes that the only path.

Smoke-tested: alembic upgrade head + downgrade base against a live
postgres:16 produce the expected table shape and tear down cleanly.
Unit tests assert the migration source contains every required column/index
so the migration can't drift from spec at import time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:37:21 +02:00
a54a968313 Merge pull request 'test(pipeline): end-to-end hermetic test with fakes + synthetic fixture' (#17) from feat/pipeline-e2e-fakes into main
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 09:24:51 +00:00
b109bba873 test(pipeline): end-to-end hermetic test with fakes + synthetic fixture
All checks were successful
tests / test (push) Successful in 59s
tests / test (pull_request) Successful in 57s
Wires the five pipeline steps together with FakeOCRClient +
FakeGenAIClient, feeds the committed synthetic_giro.pdf fixture via
file:// URL, and asserts the full response shape.

- scripts/create_fixture_pdf.py: PyMuPDF-based builder. One-page A4 PDF
  with six known header strings (bank name, IBAN, period, balances,
  statement date). Re-runnable on demand; the committed PDF is what CI
  consumes.
- tests/fixtures/synthetic_giro.pdf: committed output.
- tests/unit/test_pipeline_end_to_end.py: 5 tests covering
  * ix_result.result fields populated from the fake LLM
  * provenance.fields["result.closing_balance"].provenance_verified True
  * text_agreement True when Paperless-style texts match the value
  * metadata.timings has one entry per step in the right order
  * response.error is None and context is not serialised

197 tests total; ruff clean. No integration tests, no real clients,
no network.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:24:29 +02:00
118d77c428 Merge pull request 'feat(pipeline): ResponseHandlerStep (spec §8)' (#16) from feat/step-response-handler into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:21:50 +00:00
565d8d0676 feat(pipeline): ResponseHandlerStep — shape-up final payload (spec §8)
All checks were successful
tests / test (push) Successful in 1m0s
tests / test (pull_request) Successful in 1m2s
Final pipeline step. Three mechanical transforms:

1. include_ocr_text -> concatenate non-tag line texts, pages joined
   with \n\n, write to ocr_result.result.text.
2. include_geometries=False (default) -> strip ocr_result.result.pages
   + ocr_result.meta_data. Geometries are heavy; callers opt in.
3. Delete response.context so the internal accumulator never leaks to
   the caller (belt-and-braces; Field(exclude=True) already does this).

validate() always returns True per spec.

7 unit tests in tests/unit/test_response_handler_step.py cover all
three branches + context-not-in-model_dump check.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:21:36 +02:00
83c1996702 Merge pull request 'feat(pipeline): ReliabilityStep (spec §6)' (#15) from feat/step-reliability into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:20:38 +00:00
132f110463 feat(pipeline): ReliabilityStep — writes reliability flags (spec §6)
All checks were successful
tests / test (push) Successful in 1m3s
tests / test (pull_request) Successful in 1m1s
Thin wrapper around ix.provenance.apply_reliability_flags. Validate
skips entirely when include_provenance is off OR when no provenance
data was built (text-only request, etc.). Process reads
context.texts + context.use_case_response and lets the verifier mutate
the FieldProvenance entries + fill quality_metrics counters in place.

11 unit tests in tests/unit/test_reliability_step.py cover: validate
skips on flag off / missing provenance, runs otherwise; per-type
flag behaviour (string verified + text_agreement, Literal -> None,
None value -> None, short numeric -> text_agreement None, date with
both sides parsed, IBAN whitespace-insensitive, disagreement -> False);
quality_metrics verified_fields / text_agreement_fields counters.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:20:18 +02:00
6d9c239e82 Merge pull request 'feat(pipeline): GenAIStep (spec §6.3, §7, §9.2)' (#14) from feat/step-genai into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:18:59 +00:00
abee9cea7b feat(pipeline): GenAIStep — LLM call + provenance mapping (spec §6.3, §7, §9.2)
All checks were successful
tests / test (push) Successful in 1m14s
tests / test (pull_request) Successful in 1m10s
Assembles the prompt, picks the structured-output schema, calls the
injected GenAIClient, and maps any emitted segment_citations into
response.provenance. Reliability flags stay None here; ReliabilityStep
fills them in Task 2.7.

- System prompt = use_case.system_prompt + (provenance-on) the verbatim
  citation instruction from spec §9.2.
- User text = SegmentIndex.to_prompt_text([p1_l0] style) when provenance
  is on, else plain OCR flat text + texts joined.
- Response schema = UseCaseResponse directly, or a runtime
  create_model("ProvenanceWrappedResponse", result=(UCR, ...),
  segment_citations=(list[SegmentCitation], Field(default_factory=list)))
  when provenance is on.
- Model = request override -> use-case default.
- Failure modes: httpx / connection / timeout errors -> IX_002_000;
  pydantic.ValidationError -> IX_002_001.
- Writes ix_result.result + ix_result.meta_data (model_name +
  token_usage); builds response.provenance via
  map_segment_refs_to_provenance when provenance is on.

17 unit tests in tests/unit/test_genai_step.py cover validate
(ocr_only skip, empty -> IX_001_000, text-only, ocr-text path), process
happy path, system-prompt shape with/without citation instruction, user
text tagged vs. plain, response schema plain vs. wrapped, provenance
mapping, error mapping (IX_002_000 + IX_002_001), and model selection
(request override + use-case default).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:18:44 +02:00
acb2d55ce3 Merge pull request 'feat(pipeline): OCRStep (spec §6.2)' (#13) from feat/step-ocr into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:16:04 +00:00
61 changed files with 6762 additions and 89 deletions

View file

@ -4,11 +4,11 @@
# the Postgres password.
# --- Job store -----------------------------------------------------------
IX_POSTGRES_URL=postgresql+asyncpg://infoxtractor:<password>@host.docker.internal:5431/infoxtractor
IX_POSTGRES_URL=postgresql+asyncpg://infoxtractor:<password>@127.0.0.1:5431/infoxtractor
# --- LLM backend ---------------------------------------------------------
IX_OLLAMA_URL=http://host.docker.internal:11434
IX_DEFAULT_MODEL=gpt-oss:20b
IX_OLLAMA_URL=http://127.0.0.1:11434
IX_DEFAULT_MODEL=qwen3:14b
# --- OCR -----------------------------------------------------------------
IX_OCR_ENGINE=surya

1
.gitignore vendored
View file

@ -15,6 +15,7 @@ dist/
build/
*.log
/tmp/
.claude/
# uv
# uv.lock is committed intentionally for reproducible builds.

View file

@ -4,7 +4,7 @@ Async, on-prem, LLM-powered structured information extraction microservice. Give
Designed to be used by other on-prem services (e.g. mammon) as a reliable fallback / second opinion for format-specific deterministic parsers.
Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP spec will live at `docs/superpowers/specs/`.
Status: MVP deployed (2026-04-18) at `http://192.168.68.42:8994` — LAN only. Full reference spec at `docs/spec-core-pipeline.md`; MVP spec at `docs/superpowers/specs/2026-04-18-ix-mvp-design.md`; deploy runbook at `docs/deployment.md`.
## Guiding Principles
@ -25,7 +25,7 @@ Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP s
- **Language**: Python 3.12, asyncio
- **Web/REST**: FastAPI + uvicorn
- **OCR (pluggable)**: Surya OCR first (GPU, shares RTX 3090 with Ollama / Immich ML)
- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `gpt-oss:20b`, configurable per use case
- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `qwen3:14b`, configurable per use case
- **State**: Postgres on the shared `postgis` container (:5431), new `infoxtractor` database
- **Deployment**: Docker, `git push server main` → post-receive rebuild (pattern from other apps)

69
Dockerfile Normal file
View file

@ -0,0 +1,69 @@
# InfoXtractor container image.
#
# Base image ships CUDA 12.4 runtime libraries so the Surya OCR client can
# use the RTX 3090 on the deploy host. Ubuntu 22.04 is the LTS used across
# the home-server stack (immich-ml, monitoring) so GPU drivers line up.
FROM nvidia/cuda:12.4.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# --- System deps --------------------------------------------------------
# - python3.12 via deadsnakes PPA (pinned; Ubuntu 22.04 ships 3.10 only)
# - libmagic1 : python-magic backend for MIME sniffing
# - libgl1 : libGL.so needed by Pillow/OpenCV wheels used by Surya
# - libglib2.0 : shared by Pillow/PyMuPDF headless rendering
# - curl : post-receive hook's /healthz probe & general ops
# - ca-certs : httpx TLS verification
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
software-properties-common \
ca-certificates \
curl \
gnupg \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
python3.12 \
python3.12-venv \
python3.12-dev \
libmagic1 \
libgl1 \
libglib2.0-0 \
&& ln -sf /usr/bin/python3.12 /usr/local/bin/python \
&& ln -sf /usr/bin/python3.12 /usr/local/bin/python3 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# --- uv (dependency resolver used by the project) -----------------------
# Install via the standalone installer; avoids needing a working system pip
# (python3.12 on Ubuntu 22.04 has no `distutils`, which breaks Ubuntu pip).
RUN curl -LsSf https://astral.sh/uv/install.sh | sh \
&& ln -sf /root/.local/bin/uv /usr/local/bin/uv
WORKDIR /app
# Copy dependency manifests + README early so the heavy `uv sync` layer
# caches whenever only application code changes. README.md is required
# because pyproject.toml names it as the package's readme — hatchling
# validates it exists when resolving the editable install.
COPY pyproject.toml uv.lock .python-version README.md ./
# Prod + OCR extras, no dev tooling. --frozen means "must match uv.lock";
# CI catches drift before it reaches the image.
RUN uv sync --frozen --no-dev --extra ocr
# --- Application code ---------------------------------------------------
COPY src src
COPY alembic alembic
COPY alembic.ini ./
EXPOSE 8994
# Migrations are idempotent (alembic upgrade head is a no-op on a current
# DB) so running them on every start keeps the image + DB aligned without
# an extra orchestration step.
CMD ["sh", "-c", "uv run alembic upgrade head && uv run uvicorn ix.app:create_app --factory --host 0.0.0.0 --port 8994"]

View file

@ -4,10 +4,12 @@ Async, on-prem, LLM-powered structured information extraction microservice.
Given a document (PDF, image, text) and a named *use case*, ix returns a structured JSON result whose shape matches the use-case schema — together with per-field provenance (OCR segment IDs, bounding boxes, cross-OCR agreement flags) that let the caller decide how much to trust each extracted value.
**Status:** design phase. Implementation about to start.
**Status:** MVP deployed. Live on the home LAN at `http://192.168.68.42:8994`.
- Full reference spec: [`docs/spec-core-pipeline.md`](docs/spec-core-pipeline.md) (aspirational; MVP is a strict subset)
- **MVP design:** [`docs/superpowers/specs/2026-04-18-ix-mvp-design.md`](docs/superpowers/specs/2026-04-18-ix-mvp-design.md)
- **Implementation plan:** [`docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md`](docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md)
- **Deployment runbook:** [`docs/deployment.md`](docs/deployment.md)
- Agent / development notes: [`AGENTS.md`](AGENTS.md)
## Principles
@ -15,3 +17,44 @@ Given a document (PDF, image, text) and a named *use case*, ix returns a structu
- **On-prem always.** LLM = Ollama, OCR = local engines (Surya first). No OpenAI / Anthropic / Azure / AWS / cloud.
- **Grounded extraction, not DB truth.** ix returns best-effort fields + provenance; the caller decides what to trust.
- **Transport-agnostic pipeline core.** REST + Postgres-queue adapters in parallel on one job store.
## Submitting a job
```bash
curl -X POST http://192.168.68.42:8994/jobs \
-H "Content-Type: application/json" \
-d '{
"use_case": "bank_statement_header",
"ix_client_id": "mammon",
"request_id": "some-correlation-id",
"context": {
"files": [{
"url": "http://paperless.local/api/documents/42/download/",
"headers": {"Authorization": "Token …"}
}],
"texts": ["<Paperless Tesseract OCR content>"]
}
}'
# → {"job_id":"…","ix_id":"…","status":"pending"}
```
Poll `GET /jobs/{job_id}` until `status` is `done` or `error`. Optionally pass `callback_url` to receive a webhook on completion (one-shot, no retry; polling stays authoritative).
Full REST surface + provenance response shape documented in the MVP design spec.
## Running locally
```bash
uv sync --extra dev
uv run pytest tests/unit -v # hermetic unit + integration suite
IX_TEST_OLLAMA=1 uv run pytest tests/live -v # needs LAN access to Ollama + GPU
```
## Deploying
```bash
git push server main # rebuilds Docker image, restarts container, /healthz deploy gate
python scripts/e2e_smoke.py # E2E acceptance against the live service
```
See [`docs/deployment.md`](docs/deployment.md) for full runbook + rollback.

47
alembic.ini Normal file
View file

@ -0,0 +1,47 @@
; Alembic configuration for infoxtractor.
;
; The sqlalchemy.url is filled at runtime from the IX_POSTGRES_URL env var
; (alembic/env.py does the substitution). We keep the template here so
; ``alembic check`` / ``alembic history`` tools work without an env var set.
[alembic]
script_location = alembic
file_template = %%(rev)s_%%(slug)s
prepend_sys_path = .
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

89
alembic/env.py Normal file
View file

@ -0,0 +1,89 @@
"""Alembic async env — reads ``IX_POSTGRES_URL`` from the environment.
Mirrors mammon's ``alembic/env.py`` pattern (async engine + ``run_sync`` bridge)
so anyone familiar with that repo can read this one without context switch.
The only deviations:
* We source the URL from ``IX_POSTGRES_URL`` via ``os.environ`` rather than via
the alembic.ini ``sqlalchemy.url`` setting. Config parsing happens at import
time and depending on pydantic-settings here would introduce a cycle with
``src/ix/config.py`` (which lands in Task 3.2).
* We use ``NullPool`` migrations open/close their connection once, pooling
would hold an unused async connection open after ``alembic upgrade head``
returned, which breaks the container's CMD chain.
Run offline by setting ``-x url=...`` or the env var + ``--sql``.
"""
from __future__ import annotations
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from ix.store.models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def _database_url() -> str:
"""Resolve the connection URL from env, falling back to alembic.ini.
The env var is the primary source (container CMD sets it). The ini value
remains available so ``alembic -x url=...`` or a manual ``alembic.ini``
edit still work for one-off scripts.
"""
env_url = os.environ.get("IX_POSTGRES_URL")
if env_url:
return env_url
ini_url = config.get_main_option("sqlalchemy.url")
if ini_url and ini_url != "driver://user:pass@localhost/dbname":
return ini_url
raise RuntimeError(
"IX_POSTGRES_URL not set and alembic.ini sqlalchemy.url not configured"
)
def run_migrations_offline() -> None:
"""Emit migrations as SQL without a live connection."""
context.configure(
url=_database_url(),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection) -> None: # type: ignore[no-untyped-def]
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
engine = create_async_engine(_database_url(), poolclass=NullPool)
async with engine.connect() as connection:
await connection.run_sync(do_run_migrations)
await engine.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View file

@ -0,0 +1,90 @@
"""Initial migration — creates the ``ix_jobs`` table per spec §4.
Hand-written (do NOT ``alembic revision --autogenerate``) so the table layout
stays byte-exact with the MVP spec. autogenerate tends to add/drop indexes in
an order that makes diffs noisy and occasionally swaps JSONB for JSON on
dialects that don't distinguish them.
Revision ID: 001
Revises:
Create Date: 2026-04-18
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create ``ix_jobs`` + its indexes exactly as spec §4 describes.
JSONB for ``request`` and ``response`` (Postgres-only; the MVP doesn't
support any other backend). CHECK constraint bakes the status enum into
the DDL so direct SQL inserts (the pg_queue_adapter path) can't land
bogus values. The partial index on ``status='pending'`` matches the
claim query's ``WHERE status='pending' ORDER BY created_at`` pattern.
"""
op.create_table(
"ix_jobs",
sa.Column("job_id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("ix_id", sa.Text(), nullable=False),
sa.Column("client_id", sa.Text(), nullable=False),
sa.Column("request_id", sa.Text(), nullable=False),
sa.Column("status", sa.Text(), nullable=False),
sa.Column("request", postgresql.JSONB(), nullable=False),
sa.Column("response", postgresql.JSONB(), nullable=True),
sa.Column("callback_url", sa.Text(), nullable=True),
sa.Column("callback_status", sa.Text(), nullable=True),
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("now()"),
),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint(
"status IN ('pending', 'running', 'done', 'error')",
name="ix_jobs_status_check",
),
sa.CheckConstraint(
"callback_status IS NULL OR callback_status IN "
"('pending', 'delivered', 'failed')",
name="ix_jobs_callback_status_check",
),
)
# Partial index: the claim query hits only pending rows ordered by age.
# Partial-ness keeps the index small as done/error rows accumulate.
op.create_index(
"ix_jobs_status_created",
"ix_jobs",
["status", "created_at"],
postgresql_where=sa.text("status = 'pending'"),
)
# Unique index on (client_id, request_id) enforces caller-side idempotency
# at the DB layer. The repo relies on the unique violation to detect an
# existing pending/running row and return it unchanged.
op.create_index(
"ix_jobs_client_request",
"ix_jobs",
["client_id", "request_id"],
unique=True,
)
def downgrade() -> None:
op.drop_index("ix_jobs_client_request", table_name="ix_jobs")
op.drop_index("ix_jobs_status_created", table_name="ix_jobs")
op.drop_table("ix_jobs")

42
docker-compose.yml Normal file
View file

@ -0,0 +1,42 @@
# InfoXtractor Docker Compose stack.
#
# Single service. Uses host networking so the container can reach:
# - Ollama at 127.0.0.1:11434
# - postgis at 127.0.0.1:5431 (bound to loopback only; security hardening)
# Both services are LAN-hardened on the host and never exposed publicly,
# so host-network access stays on-prem. This matches the `goldstein`
# container pattern on the same server.
#
# The GPU reservation block matches immich-ml / the shape Docker Compose
# expects for GPU allocation on this host.
name: infoxtractor
services:
infoxtractor:
build: .
container_name: infoxtractor
network_mode: host
restart: always
env_file: .env
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
# Persist Surya (datalab) + HuggingFace model caches so rebuilds don't
# re-download ~1.5 GB of weights every time.
- ix_surya_cache:/root/.cache/datalab
- ix_hf_cache:/root/.cache/huggingface
labels:
infrastructure.web_url: "http://192.168.68.42:8994"
backup.enable: "true"
backup.type: "postgres"
backup.name: "infoxtractor"
volumes:
ix_surya_cache:
ix_hf_cache:

153
docs/deployment.md Normal file
View file

@ -0,0 +1,153 @@
# Deployment
On-prem deploy to `192.168.68.42`. Push-to-deploy via a bare git repo + `post-receive` hook that rebuilds the Docker Compose stack. Pattern mirrors mammon and unified_messaging.
## Topology
```
Mac (dev)
│ git push server main
192.168.68.42:/home/server/Public/infoxtractor/repos.git (bare)
│ post-receive → GIT_WORK_TREE=/…/app git checkout -f main
│ docker compose up -d --build
│ curl /healthz (60 s gate)
Docker container `infoxtractor` (port 8994)
├─ 127.0.0.1:11434 → Ollama (qwen3:14b; host-network mode)
└─ 127.0.0.1:5431 → postgis (database `infoxtractor`; host-network mode)
```
## One-time server setup
Run **once** from the Mac. Idempotent.
```bash
export IX_POSTGRES_PASSWORD=<generate-a-strong-one>
./scripts/setup_server.sh
```
The script:
1. Creates `/home/server/Public/infoxtractor/repos.git` (bare) + `/home/server/Public/infoxtractor/app/` (worktree).
2. Installs the `post-receive` hook (see `scripts/setup_server.sh` for the template).
3. Creates the `infoxtractor` Postgres role + database on the shared `postgis` container.
4. Writes `/home/server/Public/infoxtractor/app/.env` (mode 0600) from `.env.example` with the password substituted in.
5. Verifies `qwen3:14b` is pulled in Ollama.
6. Prints a hint to open UFW for port 8994 on the LAN subnet if it's missing.
After the script finishes, add the deploy remote to the local repo:
```bash
git remote add server ssh://server@192.168.68.42/home/server/Public/infoxtractor/repos.git
```
## Normal deploy workflow
```bash
# after merging a feat branch into main
git push server main
# tail the server's deploy log
ssh server@192.168.68.42 "tail -f /tmp/infoxtractor-deploy.log"
# healthz gate (the post-receive hook also waits up to 60 s for this)
curl http://192.168.68.42:8994/healthz
# end-to-end smoke — this IS the real acceptance test
python scripts/e2e_smoke.py
```
If the post-receive hook exits non-zero (healthz never reaches 200), the deploy is considered failed. The previous container keeps running (the hook swaps via `docker compose up -d --build`, which first builds the new image and only swaps if the build succeeds; if the new container fails `/healthz`, it's still up but broken). Investigate with `docker compose logs --tail 200` in `${APP_DIR}` and either fix forward or revert (see below).
## Rollback
Never force-push `main`. Rollbacks happen as **forward commits** via `git revert`:
```bash
git revert HEAD # creates a revert commit for the last change
git push forgejo main
git push server main
```
## First deploy
- **Date:** 2026-04-18
- **Commit:** `fix/ollama-extract-json` (#36, the last of several Docker/ops follow-ups after PR #27 shipped the initial Dockerfile)
- **`/healthz`:** all three probes (`postgres`, `ollama`, `ocr`) green. First-pass took ~7 min for the fresh container because Surya's recognition (1.34 GB) + detection (73 MB) models download from HuggingFace on first run; subsequent rebuilds reuse the named volumes declared in `docker-compose.yml` and come up in <30 s.
- **E2E extraction:** `bank_statement_header` against `tests/fixtures/synthetic_giro.pdf` with Paperless-style texts:
- Pipeline completes in **35 s**.
- Extracted: `bank_name=DKB`, `account_iban=DE89370400440532013000`, `currency=EUR`, `opening_balance=1234.56`, `closing_balance=1450.22`, `statement_date=2026-03-31`, `statement_period_end=2026-03-31`, `statement_period_start=2026-03-01`, `account_type=null`.
- Provenance: 8 / 9 leaf fields have sources; 7 / 8 `provenance_verified` and `text_agreement` are True. `statement_period_start` shows up in the OCR but normalisation fails (dateutil picks a different interpretation of the cited day); to be chased in a follow-up.
### Docker-ops follow-ups that landed during the first deploy
All small, each merged as its own PR. In commit order after the scaffold (#27):
- **#31** `fix(docker): uv via standalone installer` — Python 3.12 on Ubuntu 22.04 drops `distutils`; Ubuntu's pip needed it. Switched to the `uv` standalone installer, which has no pip dependency.
- **#32** `fix(docker): include README.md in the uv sync COPY``hatchling` validates the readme file exists when resolving the editable project install.
- **#33** `fix(compose): drop runtime: nvidia` — the deploy host's Docker daemon doesn't register a named `nvidia` runtime; `deploy.resources.devices` is sufficient and matches immich-ml.
- **#34** `fix(deploy): network_mode: host``postgis` is bound to `127.0.0.1` on the host (security hardening T12). `host.docker.internal` points at the bridge gateway, not loopback, so the container couldn't reach postgis. Goldstein uses the same pattern.
- **#35** `fix(deps): pin surya-ocr ^0.17` — earlier cu124 torch pin had forced surya to 0.14.1, which breaks our `surya.foundation` import and needs a transformers version that lacks `QuantizedCacheConfig`.
- **#36** `fix(genai): drop Ollama format flag; extract trailing JSON` — Ollama 0.11.8 segfaults on Pydantic JSON Schemas (`$ref`, `anyOf`, `pattern`), and `format="json"` terminates reasoning models (qwen3) at `{}` because their `<think>…</think>` chain-of-thought isn't valid JSON. Omit the flag, inject the schema into the system prompt, extract the outermost `{…}` balanced block from the response.
- **volumes** — named `ix_surya_cache` + `ix_hf_cache` mount `/root/.cache/datalab` + `/root/.cache/huggingface` so rebuilds don't re-download ~1.5 GB of model weights.
Production notes:
- `IX_DEFAULT_MODEL=qwen3:14b` (already pulled on the host). Spec listed `gpt-oss:20b` as a concrete example; swapped to keep the deploy on-prem without an extra `ollama pull`.
- Torch 2.11 default cu13 wheels fall back to CPU against the host's CUDA 12.4 driver — Surya runs on CPU. Expected inference times: seconds per page. Upgrading the NVIDIA driver (or pinning a cu12-compatible torch wheel newer than 2.7) will unlock GPU with no code changes.
## E2E smoke test (`scripts/e2e_smoke.py`)
What it does (from the Mac):
1. Checks `/healthz`.
2. Starts a tiny HTTP server on the Mac's LAN IP serving `tests/fixtures/synthetic_giro.pdf`.
3. Submits a `POST /jobs` with `use_case=bank_statement_header`, the fixture URL in `context.files`, and a Paperless-style OCR text in `context.texts` (to exercise the `text_agreement` cross-check).
4. Polls `GET /jobs/{id}` every 2 s until terminal or 120 s timeout.
5. Asserts: `status=="done"`, `bank_name` non-empty, `provenance.fields["result.closing_balance"].provenance_verified=True`, `text_agreement=True`, total elapsed `< 60s`.
Non-zero exit means the deploy is not healthy. Roll back via `git revert HEAD`.
## Operational checklists
### After `ollama pull` on the host
The `IX_DEFAULT_MODEL` env var on the server's `.env` must match something in `ollama list`. Changing the default means:
1. Edit `/home/server/Public/infoxtractor/app/.env``IX_DEFAULT_MODEL=<new>`.
2. `docker compose --project-directory /home/server/Public/infoxtractor/app restart`.
3. `curl http://192.168.68.42:8994/healthz` → confirm `ollama: ok`.
### If `/healthz` shows `ollama: degraded`
`qwen3:14b` (or the configured default) is not pulled. On the host:
```bash
ssh server@192.168.68.42 "docker exec ollama ollama pull qwen3:14b"
```
### If `/healthz` shows `ocr: fail`
Surya couldn't initialize (model missing, CUDA unavailable, OOM). First run can be slow — models download on first call. Check container logs:
```bash
ssh server@192.168.68.42 "docker logs infoxtractor --tail 200"
```
### If the container fails to start
```bash
ssh server@192.168.68.42 "tail -100 /tmp/infoxtractor-deploy.log"
ssh server@192.168.68.42 "docker compose -f /home/server/Public/infoxtractor/app/docker-compose.yml logs --tail 200"
```
## Monitoring
- Monitoring dashboard auto-discovers via the `infrastructure.web_url` label on the container: `http://192.168.68.42:8001` → "infoxtractor" card.
- Backup opt-in via `backup.enable=true` + `backup.type=postgres` + `backup.name=infoxtractor` labels. The daily backup script picks up the `infoxtractor` Postgres database automatically.
## Ports
| Port | Direction | Source | Service |
|------|-----------|--------|---------|
| 8994/tcp | ALLOW | 192.168.68.0/24 | ix REST + healthz (LAN only; not publicly exposed) |
No VPS Caddy entry; no `infrastructure.docs_url` label — this is an internal service.

View file

@ -1,6 +1,8 @@
[project]
name = "infoxtractor"
version = "0.1.0"
# Released 2026-04-18 with the first live deploy of the MVP. See
# docs/deployment.md §"First deploy" for the commit + /healthz times.
description = "Async on-prem LLM-powered structured information extraction microservice"
readme = "README.md"
requires-python = ">=3.12"
@ -31,10 +33,12 @@ dependencies = [
[project.optional-dependencies]
ocr = [
# Real OCR engine — pulls torch + CUDA wheels. Kept optional so CI
# (no GPU) can install the base package without the model deps.
"surya-ocr>=0.9",
"torch>=2.4",
# Real OCR engine. Kept optional so CI (no GPU) can install the base
# package without the model deps.
# surya >= 0.17 is required: the client code uses the
# `surya.foundation` module, which older releases don't expose.
"surya-ocr>=0.17,<0.18",
"torch>=2.7",
]
dev = [
"pytest>=8.3",
@ -44,6 +48,11 @@ dev = [
"mypy>=1.13",
]
# Note: the default pypi torch ships cu13 wheels, which emit a
# UserWarning and fall back to CPU against the deploy host's CUDA 12.4
# driver. Surya then runs on CPU — slower but correct for MVP. A future
# driver upgrade unlocks GPU Surya with no code changes.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

View file

@ -0,0 +1,66 @@
"""Build the synthetic E2E fixture PDF at ``tests/fixtures/synthetic_giro.pdf``.
Re-runnable on demand. Output bytes are stable across runs in page
content, layout, and text only the PDF's embedded timestamps change,
which pipeline tests don't read. The committed fixture is what CI
consumes; re-run this script locally if you change the ground truth.
Contents: one A4 portrait page with six known strings placed at fixed
positions near the top. The goal is reproducible ground truth, not a
realistic bank statement. The pipeline's fake OCR client is seeded with
those same strings (at plausible bboxes) so the E2E test can assert
exact matches.
Usage::
uv run python scripts/create_fixture_pdf.py
"""
from __future__ import annotations
from pathlib import Path
import fitz # PyMuPDF
OUT_PATH = (
Path(__file__).resolve().parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
)
LINES: list[str] = [
"DKB",
"IBAN: DE89370400440532013000",
"Statement period: 01.03.2026 - 31.03.2026",
"Opening balance: 1234.56 EUR",
"Closing balance: 1450.22 EUR",
"Statement date: 31.03.2026",
]
def build() -> None:
doc = fitz.open()
# A4 @ 72 dpi -> 595 x 842 points.
page = doc.new_page(width=595, height=842)
y = 72.0
for line in LINES:
page.insert_text(
(72.0, y),
line,
fontsize=12,
fontname="helv",
)
y += 24.0
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
# deflate=False + garbage=0 keeps the output byte-stable.
doc.save(
str(OUT_PATH),
deflate=False,
deflate_images=False,
garbage=0,
clean=False,
)
doc.close()
if __name__ == "__main__":
build()
print(f"wrote {OUT_PATH}")

210
scripts/e2e_smoke.py Executable file
View file

@ -0,0 +1,210 @@
"""End-to-end smoke test against the deployed infoxtractor service.
Uploads a synthetic bank-statement fixture, polls for completion, and asserts
the provenance flags per spec §12 E2E. Intended to run from the Mac after
every `git push server main` as the deploy gate.
Prerequisites:
- The service is running and reachable at --base-url (default
http://192.168.68.42:8994).
- The fixture `tests/fixtures/synthetic_giro.pdf` is present.
- The Mac and the server are on the same LAN (the server must be able to
reach the Mac to download the fixture).
Exit codes:
0 all assertions passed within the timeout
1 at least one assertion failed
2 the job never reached a terminal state in time
3 the service was unreachable or returned an unexpected error
Usage:
python scripts/e2e_smoke.py
python scripts/e2e_smoke.py --base-url http://localhost:8994
"""
from __future__ import annotations
import argparse
import http.server
import json
import socket
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
import uuid
from pathlib import Path
DEFAULT_BASE_URL = "http://192.168.68.42:8994"
FIXTURE = Path(__file__).parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
TIMEOUT_SECONDS = 120
POLL_INTERVAL_SECONDS = 2
def find_lan_ip() -> str:
"""Return the Mac's LAN IP that the server can reach."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 192.168.68.42 is the server; getting the default route towards it
# yields the NIC with the matching subnet.
s.connect(("192.168.68.42", 80))
return s.getsockname()[0]
finally:
s.close()
def serve_fixture_in_background(fixture: Path) -> tuple[str, threading.Event]:
"""Serve the fixture on a temporary HTTP server; return the URL and a stop event."""
if not fixture.exists():
print(f"FIXTURE MISSING: {fixture}", file=sys.stderr)
sys.exit(3)
directory = fixture.parent
filename = fixture.name
lan_ip = find_lan_ip()
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
def log_message(self, format: str, *args) -> None: # quiet
pass
# Pick any free port.
httpd = socketserver.TCPServer((lan_ip, 0), Handler)
port = httpd.server_address[1]
url = f"http://{lan_ip}:{port}/{filename}"
stop = threading.Event()
def _serve():
try:
while not stop.is_set():
httpd.handle_request()
finally:
httpd.server_close()
# Run in a thread. Use a loose timeout so handle_request returns when stop is set.
httpd.timeout = 0.5
t = threading.Thread(target=_serve, daemon=True)
t.start()
return url, stop
def post_job(base_url: str, file_url: str, client_id: str, request_id: str) -> dict:
# Include a Paperless-style OCR of the fixture as context.texts so the
# text_agreement cross-check has something to compare against.
paperless_text = (
"DKB\n"
"DE89370400440532013000\n"
"Statement period: 01.03.2026 - 31.03.2026\n"
"Opening balance: 1234.56 EUR\n"
"Closing balance: 1450.22 EUR\n"
"31.03.2026\n"
)
payload = {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {
"files": [file_url],
"texts": [paperless_text],
},
}
req = urllib.request.Request(
f"{base_url}/jobs",
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_job(base_url: str, job_id: str) -> dict:
req = urllib.request.Request(f"{base_url}/jobs/{job_id}")
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--timeout", type=int, default=TIMEOUT_SECONDS)
args = parser.parse_args()
# Sanity-check the service is up.
try:
with urllib.request.urlopen(f"{args.base_url}/healthz", timeout=5) as resp:
health = json.loads(resp.read().decode("utf-8"))
print(f"healthz: {health}")
except urllib.error.URLError as e:
print(f"service unreachable: {e}", file=sys.stderr)
return 3
fixture_url, stop_server = serve_fixture_in_background(FIXTURE)
print(f"serving fixture at {fixture_url}")
try:
client_id = "e2e_smoke"
request_id = f"smoke-{uuid.uuid4().hex[:8]}"
submit = post_job(args.base_url, fixture_url, client_id, request_id)
job_id = submit["job_id"]
print(f"submitted job_id={job_id}")
started = time.monotonic()
last_status = None
job = None
while time.monotonic() - started < args.timeout:
job = get_job(args.base_url, job_id)
if job["status"] != last_status:
print(f"[{time.monotonic() - started:5.1f}s] status={job['status']}")
last_status = job["status"]
if job["status"] in ("done", "error"):
break
time.sleep(POLL_INTERVAL_SECONDS)
else:
print(f"FAIL: timed out after {args.timeout}s", file=sys.stderr)
return 2
assert job is not None
failed = []
if job["status"] != "done":
failed.append(f"status={job['status']!r} (want 'done')")
response = job.get("response") or {}
if response.get("error"):
failed.append(f"response.error={response['error']!r}")
result = (response.get("ix_result") or {}).get("result") or {}
bank = result.get("bank_name")
if not isinstance(bank, str) or not bank.strip():
failed.append(f"bank_name={bank!r} (want non-empty string)")
fields = (response.get("provenance") or {}).get("fields") or {}
closing = fields.get("result.closing_balance") or {}
if not closing.get("provenance_verified"):
failed.append(f"closing_balance.provenance_verified={closing.get('provenance_verified')!r}")
if closing.get("text_agreement") is not True:
failed.append(f"closing_balance.text_agreement={closing.get('text_agreement')!r} (Paperless-style text submitted)")
elapsed = time.monotonic() - started
if elapsed >= 60:
failed.append(f"elapsed={elapsed:.1f}s (≥ 60s; slow path)")
print(json.dumps(result, indent=2, default=str))
if failed:
print("\n".join(f"FAIL: {f}" for f in failed), file=sys.stderr)
return 1
print(f"\nPASS in {elapsed:.1f}s")
return 0
finally:
stop_server.set()
if __name__ == "__main__":
sys.exit(main())

127
scripts/setup_server.sh Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env bash
# One-shot server setup for InfoXtractor. Idempotent: safe to re-run.
#
# Run from the Mac:
# IX_POSTGRES_PASSWORD=<pw> ./scripts/setup_server.sh
#
# What it does on 192.168.68.42:
# 1. Creates the bare git repo `/home/server/Public/infoxtractor/repos.git` if missing.
# 2. Writes the post-receive hook (or updates it) and makes it executable.
# 3. Creates the Postgres role + database on the shared `postgis` container.
# 4. Writes `/home/server/Public/infoxtractor/app/.env` (0600) from .env.example.
# 5. Verifies `qwen3:14b` is pulled in Ollama.
set -euo pipefail
SERVER="${IX_SERVER:-server@192.168.68.42}"
APP_BASE="/home/server/Public/infoxtractor"
REPOS_GIT="${APP_BASE}/repos.git"
APP_DIR="${APP_BASE}/app"
DB_NAME="infoxtractor"
DB_USER="infoxtractor"
if [ -z "${IX_POSTGRES_PASSWORD:-}" ]; then
read -r -s -p "Postgres password for role '${DB_USER}': " IX_POSTGRES_PASSWORD
echo
fi
if [ -z "${IX_POSTGRES_PASSWORD}" ]; then
echo "IX_POSTGRES_PASSWORD is required." >&2
exit 1
fi
echo "==> 1/5 Ensuring bare repo + post-receive hook on ${SERVER}"
ssh "${SERVER}" bash -s <<EOF
set -euo pipefail
mkdir -p "${REPOS_GIT}" "${APP_DIR}"
if [ ! -f "${REPOS_GIT}/HEAD" ]; then
git init --bare "${REPOS_GIT}"
fi
cat >"${REPOS_GIT}/hooks/post-receive" <<'HOOK'
#!/usr/bin/env bash
set -eo pipefail
APP_DIR="${APP_DIR}"
LOG="/tmp/infoxtractor-deploy.log"
echo "[\$(date -u '+%FT%TZ')] post-receive start" >> "\$LOG"
mkdir -p "\$APP_DIR"
GIT_WORK_TREE="\$APP_DIR" git --git-dir="${REPOS_GIT}" checkout -f main >> "\$LOG" 2>&1
cd "\$APP_DIR"
docker compose up -d --build >> "\$LOG" 2>&1
# Deploy gate: /healthz must return 200 within 60 s.
for i in \$(seq 1 30); do
if curl -fsS http://localhost:8994/healthz > /dev/null 2>&1; then
echo "[\$(date -u '+%FT%TZ')] healthz OK" >> "\$LOG"
exit 0
fi
sleep 2
done
echo "[\$(date -u '+%FT%TZ')] healthz never reached OK" >> "\$LOG"
docker compose logs --tail 100 >> "\$LOG" 2>&1 || true
exit 1
HOOK
chmod +x "${REPOS_GIT}/hooks/post-receive"
EOF
echo "==> 2/5 Verifying Ollama has qwen3:14b pulled"
if ! ssh "${SERVER}" "docker exec ollama ollama list | awk '{print \$1}' | grep -qx 'qwen3:14b'"; then
echo "FAIL: qwen3:14b not found in Ollama. Run: ssh ${SERVER} 'docker exec ollama ollama pull qwen3:14b'" >&2
exit 1
fi
echo "==> 3/5 Creating Postgres role '${DB_USER}' and database '${DB_NAME}' on postgis container"
# Idempotent via DO blocks; uses docker exec to avoid needing psql on the host.
ssh "${SERVER}" bash -s <<EOF
set -euo pipefail
docker exec -i postgis psql -U postgres <<SQL
DO \\\$\\\$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = '${DB_USER}') THEN
CREATE ROLE ${DB_USER} LOGIN PASSWORD '${IX_POSTGRES_PASSWORD}';
ELSE
ALTER ROLE ${DB_USER} WITH PASSWORD '${IX_POSTGRES_PASSWORD}';
END IF;
END
\\\$\\\$;
SQL
if ! docker exec -i postgis psql -U postgres -tc "SELECT 1 FROM pg_database WHERE datname = '${DB_NAME}'" | grep -q 1; then
docker exec -i postgis createdb -U postgres -O ${DB_USER} ${DB_NAME}
fi
EOF
echo "==> 4/5 Writing ${APP_DIR}/.env on the server"
# Render .env from the repo's .env.example, substituting the password placeholder.
LOCAL_ENV_CONTENT="$(
sed "s#<password>#${IX_POSTGRES_PASSWORD}#g" \
"$(dirname "$0")/../.env.example"
)"
# Append the IX_TEST_MODE=production for safety (fake mode stays off).
# .env is written atomically and permissioned 0600.
ssh "${SERVER}" "install -d -m 0755 '${APP_DIR}' && cat > '${APP_DIR}/.env' <<'ENVEOF'
${LOCAL_ENV_CONTENT}
ENVEOF
chmod 0600 '${APP_DIR}/.env'"
echo "==> 5/5 Checking UFW rule for port 8994 (LAN only)"
ssh "${SERVER}" "sudo ufw status numbered | grep -F 8994" >/dev/null 2>&1 || {
echo "NOTE: UFW doesn't yet allow 8994. Run on the server:"
echo " sudo ufw allow from 192.168.68.0/24 to any port 8994 proto tcp"
}
echo
echo "Done."
echo
echo "Next steps (on the Mac):"
echo " git remote add server ssh://server@192.168.68.42${REPOS_GIT}"
echo " git push server main"
echo " ssh ${SERVER} 'tail -f /tmp/infoxtractor-deploy.log'"
echo " curl http://192.168.68.42:8994/healthz"
echo " python scripts/e2e_smoke.py"

View file

@ -0,0 +1,6 @@
"""Transport adapters — REST (always on) + pg_queue (optional).
Adapters are thin: they marshal external events into :class:`RequestIX`
payloads that land in ``ix_jobs`` as pending rows, and they read back from
the same store. They do NOT run the pipeline themselves; the worker does.
"""

View file

@ -0,0 +1,15 @@
"""Postgres queue adapter — ``LISTEN ix_jobs_new`` + 10 s fallback poll.
This is a secondary transport: a direct-SQL writer can insert a row and
``NOTIFY ix_jobs_new, '<job_id>'`` and the worker wakes up within the roundtrip
time rather than the 10 s fallback poll. The REST adapter doesn't need the
listener because the worker is already running in-process; this exists for
external callers who bypass the REST API.
"""
from ix.adapters.pg_queue.listener import (
PgQueueListener,
asyncpg_dsn_from_sqlalchemy_url,
)
__all__ = ["PgQueueListener", "asyncpg_dsn_from_sqlalchemy_url"]

View file

@ -0,0 +1,111 @@
"""Dedicated asyncpg connection that LISTENs to ``ix_jobs_new``.
We hold the connection *outside* the SQLAlchemy pool because SQLAlchemy's
asyncpg dialect doesn't expose the raw connection in a way that survives
the pool's checkout/checkin dance, and LISTEN needs a connection that
stays open for the full session to receive asynchronous notifications.
The adapter contract the worker sees is a single coroutine-factory,
``wait_for_work(poll_seconds)``, which completes either when a NOTIFY
arrives or when ``poll_seconds`` elapse. The worker doesn't care which
woke it it just goes back to its claim query.
"""
from __future__ import annotations
import asyncio
from urllib.parse import unquote, urlparse
import asyncpg
def asyncpg_dsn_from_sqlalchemy_url(url: str) -> str:
"""Strip the SQLAlchemy ``postgresql+asyncpg://`` prefix for raw asyncpg.
asyncpg's connect() expects the plain ``postgres://user:pass@host/db``
shape; the ``+driver`` segment SQLAlchemy adds confuses it. We also
percent-decode the password asyncpg accepts the raw form but not the
pre-encoded ``%21`` passwords we sometimes use in dev.
"""
parsed = urlparse(url)
scheme = parsed.scheme.split("+", 1)[0]
user = unquote(parsed.username) if parsed.username else ""
password = unquote(parsed.password) if parsed.password else ""
auth = ""
if user:
auth = f"{user}"
if password:
auth += f":{password}"
auth += "@"
netloc = parsed.hostname or ""
if parsed.port:
netloc += f":{parsed.port}"
return f"{scheme}://{auth}{netloc}{parsed.path}"
class PgQueueListener:
"""Long-lived asyncpg connection that sets an event on each NOTIFY.
The worker uses :meth:`wait_for_work` as its ``wait_for_work`` hook:
one call resolves when either a NOTIFY is received OR ``timeout``
seconds elapse, whichever comes first. The event is cleared after each
resolution so subsequent waits don't see stale state.
"""
def __init__(self, dsn: str, channel: str = "ix_jobs_new") -> None:
self._dsn = dsn
self._channel = channel
self._conn: asyncpg.Connection | None = None
self._event = asyncio.Event()
# Protect add_listener / remove_listener against concurrent
# start/stop — shouldn't happen in practice but a stray double-stop
# from a lifespan shutdown shouldn't raise ``listener not found``.
self._lock = asyncio.Lock()
async def start(self) -> None:
async with self._lock:
if self._conn is not None:
return
self._conn = await asyncpg.connect(self._dsn)
await self._conn.add_listener(self._channel, self._on_notify)
async def stop(self) -> None:
async with self._lock:
if self._conn is None:
return
try:
await self._conn.remove_listener(self._channel, self._on_notify)
finally:
await self._conn.close()
self._conn = None
def _on_notify(
self,
connection: asyncpg.Connection,
pid: int,
channel: str,
payload: str,
) -> None:
"""asyncpg listener callback — signals the waiter."""
# We don't care about payload/pid/channel — any NOTIFY on our
# channel means "go check for pending rows". Keep the body tiny so
# asyncpg's single dispatch loop stays snappy.
self._event.set()
async def wait_for_work(self, timeout: float) -> None:
"""Resolve when a NOTIFY arrives or ``timeout`` seconds pass.
We wait on the event with a timeout. ``asyncio.wait_for`` raises
:class:`asyncio.TimeoutError` on expiry; we swallow it because the
worker treats "either signal" identically. The event is cleared
after every wait so the next call starts fresh.
"""
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
except TimeoutError:
pass
finally:
self._event.clear()

View file

@ -0,0 +1,5 @@
"""FastAPI REST adapter — POST /jobs / GET /jobs / GET /healthz / GET /metrics.
Routes are defined in ``routes.py``. The ``create_app`` factory in
``ix.app`` wires them up alongside the worker lifespan.
"""

View file

@ -0,0 +1,227 @@
"""REST routes (spec §5).
The routes depend on two injected objects:
* a session factory (``get_session_factory_dep``): swapped in tests so we can
use the fixture's per-test engine instead of the lazy process-wide one in
``ix.store.engine``.
* a :class:`Probes` bundle (``get_probes``): each probe returns the
per-subsystem state string used by ``/healthz``. Tests inject canned
probes; Chunk 4 wires the real Ollama/Surya ones.
``/healthz`` has a strict 2-second postgres timeout we use an
``asyncio.wait_for`` around a ``SELECT 1`` roundtrip so a broken pool or a
hung connection can't wedge the healthcheck endpoint.
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Annotated, Literal
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ix.adapters.rest.schemas import HealthStatus, JobSubmitResponse, MetricsResponse
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.store import jobs_repo
from ix.store.engine import get_session_factory
from ix.store.models import IxJob
@dataclass
class Probes:
"""Injected subsystem-probe callables for ``/healthz``.
Each callable returns the literal status string expected in the body.
Probes are sync by design none of the real ones need awaits today and
keeping them sync lets tests pass plain lambdas. Real probes that need
async work run the call through ``asyncio.run_in_executor`` inside the
callable (Chunk 4).
"""
ollama: Callable[[], Literal["ok", "degraded", "fail"]]
ocr: Callable[[], Literal["ok", "fail"]]
def get_session_factory_dep() -> async_sessionmaker[AsyncSession]:
"""Default DI: the process-wide store factory. Tests override this."""
return get_session_factory()
def get_probes() -> Probes:
"""Default DI: a pair of ``fail`` probes.
Production wiring (Chunk 4) overrides this factory with real Ollama +
Surya probes at app-startup time. Integration tests override via
``app.dependency_overrides[get_probes]`` with a canned ``ok`` pair.
The default here ensures a mis-wired deployment surfaces clearly in
``/healthz`` rather than claiming everything is fine by accident.
"""
return Probes(ollama=lambda: "fail", ocr=lambda: "fail")
router = APIRouter()
@router.post("/jobs", response_model=JobSubmitResponse, status_code=201)
async def submit_job(
request: RequestIX,
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> JobSubmitResponse:
"""Submit a new job.
Per spec §5: 201 on first insert, 200 on idempotent re-submit of an
existing ``(client_id, request_id)`` pair. We detect the second case by
snapshotting the pre-insert row set and comparing ``created_at``.
"""
async with session_factory() as session:
existing = await jobs_repo.get_by_correlation(
session, request.ix_client_id, request.request_id
)
job = await jobs_repo.insert_pending(
session, request, callback_url=request.callback_url
)
await session.commit()
if existing is not None:
# Idempotent re-submit — flip to 200. FastAPI's declared status_code
# is 201, but setting response.status_code overrides it per-call.
response.status_code = 200
return JobSubmitResponse(job_id=job.job_id, ix_id=job.ix_id, status=job.status)
@router.get("/jobs/{job_id}", response_model=Job)
async def get_job(
job_id: UUID,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/jobs", response_model=Job)
async def lookup_job_by_correlation(
client_id: Annotated[str, Query(...)],
request_id: Annotated[str, Query(...)],
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get_by_correlation(session, client_id, request_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/healthz")
async def healthz(
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
probes: Annotated[Probes, Depends(get_probes)],
) -> HealthStatus:
"""Per spec §5: postgres / ollama / ocr; 200 iff all three == ok."""
postgres_state: Literal["ok", "fail"] = "fail"
try:
async def _probe() -> None:
async with session_factory() as session:
await session.execute(text("SELECT 1"))
await asyncio.wait_for(_probe(), timeout=2.0)
postgres_state = "ok"
except Exception:
postgres_state = "fail"
try:
ollama_state = probes.ollama()
except Exception:
ollama_state = "fail"
try:
ocr_state = probes.ocr()
except Exception:
ocr_state = "fail"
body = HealthStatus(
postgres=postgres_state, ollama=ollama_state, ocr=ocr_state
)
if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok":
response.status_code = 503
return body
@router.get("/metrics", response_model=MetricsResponse)
async def metrics(
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> MetricsResponse:
"""Counters over the last 24h — plain JSON per spec §5."""
since = datetime.now(UTC) - timedelta(hours=24)
async with session_factory() as session:
pending = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "pending")
)
running = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "running")
)
done_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "done", IxJob.finished_at >= since)
)
error_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "error", IxJob.finished_at >= since)
)
# Per-use-case average seconds. ``request`` is JSONB, so we dig out
# the use_case key via ->>. Only consider rows that both started and
# finished in the window (can't compute elapsed otherwise).
rows = (
await session.execute(
text(
"SELECT request->>'use_case' AS use_case, "
"AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) "
"FROM ix_jobs "
"WHERE status='done' AND finished_at IS NOT NULL "
"AND started_at IS NOT NULL AND finished_at >= :since "
"GROUP BY request->>'use_case'"
),
{"since": since},
)
).all()
by_use_case = {row[0]: float(row[1]) for row in rows if row[0] is not None}
return MetricsResponse(
jobs_pending=int(pending or 0),
jobs_running=int(running or 0),
jobs_done_24h=int(done_24h or 0),
jobs_error_24h=int(error_24h or 0),
by_use_case_seconds=by_use_case,
)

View file

@ -0,0 +1,52 @@
"""REST-adapter request / response bodies.
Most payloads reuse the core contracts directly (:class:`RequestIX`,
:class:`Job`). The only adapter-specific shape is the lightweight POST /jobs
response (`job_id`, `ix_id`, `status`) callers don't need the full Job
envelope back on submit; they poll ``GET /jobs/{id}`` for that.
"""
from __future__ import annotations
from typing import Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class JobSubmitResponse(BaseModel):
"""What POST /jobs returns: just enough to poll or subscribe to callbacks."""
model_config = ConfigDict(extra="forbid")
job_id: UUID
ix_id: str
status: Literal["pending", "running", "done", "error"]
class HealthStatus(BaseModel):
"""Body of GET /healthz.
Each field reports per-subsystem state. Overall HTTP status is 200 iff
every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"``
when the backend is reachable but the default model isn't pulled —
monitoring surfaces that as non-200.
"""
model_config = ConfigDict(extra="forbid")
postgres: Literal["ok", "fail"]
ollama: Literal["ok", "degraded", "fail"]
ocr: Literal["ok", "fail"]
class MetricsResponse(BaseModel):
"""Body of GET /metrics — plain JSON (no Prometheus format for MVP)."""
model_config = ConfigDict(extra="forbid")
jobs_pending: int
jobs_running: int
jobs_done_24h: int
jobs_error_24h: int
by_use_case_seconds: dict[str, float]

232
src/ix/app.py Normal file
View file

@ -0,0 +1,232 @@
"""FastAPI app factory + lifespan.
``create_app()`` wires the REST router on top of a lifespan that spawns the
worker loop (Task 3.5) and the pg_queue listener (Task 3.6). Tests that
don't care about the worker call ``create_app(spawn_worker=False)`` so the
lifespan returns cleanly.
Task 4.3 fills in the production wiring:
* Factories (``make_genai_client`` / ``make_ocr_client``) pick between
fakes (``IX_TEST_MODE=fake``) and real Ollama/Surya clients.
* ``/healthz`` probes call ``selfcheck()`` on the active clients. In
``fake`` mode they always report ok.
* The worker's :class:`Pipeline` is built once per spawn with the real
chain of Steps; each call to the injected ``pipeline_factory`` returns
a fresh Pipeline so per-request state stays isolated.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager, suppress
from typing import Literal
from fastapi import FastAPI
from ix.adapters.rest.routes import Probes, get_probes
from ix.adapters.rest.routes import router as rest_router
from ix.config import AppConfig, get_config
from ix.genai import make_genai_client
from ix.genai.client import GenAIClient
from ix.ocr import make_ocr_client
from ix.ocr.client import OCRClient
from ix.pipeline.genai_step import GenAIStep
from ix.pipeline.ocr_step import OCRStep
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.reliability_step import ReliabilityStep
from ix.pipeline.response_handler_step import ResponseHandlerStep
from ix.pipeline.setup_step import SetupStep
def build_pipeline(
genai: GenAIClient, ocr: OCRClient, cfg: AppConfig
) -> Pipeline:
"""Assemble the production :class:`Pipeline` with injected clients.
Kept as a module-level helper so tests that want to exercise the
production wiring (without running the worker) can call it directly.
"""
from pathlib import Path
from ix.ingestion import FetchConfig
return Pipeline(
steps=[
SetupStep(
tmp_dir=Path(cfg.tmp_dir),
fetch_config=FetchConfig(
connect_timeout_s=float(cfg.file_connect_timeout_seconds),
read_timeout_s=float(cfg.file_read_timeout_seconds),
max_bytes=cfg.file_max_bytes,
),
),
OCRStep(ocr_client=ocr),
GenAIStep(genai_client=genai),
ReliabilityStep(),
ResponseHandlerStep(),
]
)
def _make_ollama_probe(
genai: GenAIClient, cfg: AppConfig
) -> Callable[[], Literal["ok", "degraded", "fail"]]:
"""Adapter: async ``selfcheck`` → sync callable the route expects.
Always drives the coroutine on a throwaway event loop in a separate
thread. This keeps the behavior identical whether the caller holds an
event loop (FastAPI request) or doesn't (a CLI tool), and avoids the
``asyncio.run`` vs. already-running-loop footgun.
"""
def probe() -> Literal["ok", "degraded", "fail"]:
if not hasattr(genai, "selfcheck"):
return "ok" # fake client — nothing to probe.
return _run_async_sync(
lambda: genai.selfcheck(expected_model=cfg.default_model), # type: ignore[attr-defined]
fallback="fail",
)
return probe
def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]:
def probe() -> Literal["ok", "fail"]:
if not hasattr(ocr, "selfcheck"):
return "ok" # fake — nothing to probe.
return _run_async_sync(
lambda: ocr.selfcheck(), # type: ignore[attr-defined]
fallback="fail",
)
return probe
def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def]
"""Run ``make_coro()`` on a fresh loop in a thread; return its result.
The thread owns its own event loop so the caller's loop (if any) keeps
running. Any exception collapses to ``fallback``.
"""
import threading
result: dict[str, object] = {}
def _runner() -> None:
loop = asyncio.new_event_loop()
try:
result["value"] = loop.run_until_complete(make_coro())
except Exception as exc: # any error collapses to fallback
result["error"] = exc
finally:
loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
if "error" in result or "value" not in result:
return fallback
return str(result["value"])
def create_app(*, spawn_worker: bool = True) -> FastAPI:
"""Construct the ASGI app.
Parameters
----------
spawn_worker:
When True (default), the lifespan spawns the background worker task
and the pg_queue listener. Integration tests that only exercise the
REST adapter pass False so jobs pile up as ``pending`` and the tests
can assert on their state without a racing worker mutating them.
"""
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
cfg = get_config()
# Build the clients once per process. The worker's pipeline
# factory closes over these so every job runs through the same
# Ollama/Surya instance (Surya's predictors are heavy; re-loading
# them per job would be catastrophic).
genai_client = make_genai_client(cfg)
ocr_client = make_ocr_client(cfg)
# Override the route-level probe DI so /healthz reflects the
# actual clients. Tests that want canned probes can still override
# ``get_probes`` at the TestClient layer.
_app.dependency_overrides.setdefault(
get_probes,
lambda: Probes(
ollama=_make_ollama_probe(genai_client, cfg),
ocr=_make_ocr_probe(ocr_client),
),
)
worker_task = None
listener = None
if spawn_worker:
try:
from ix.adapters.pg_queue.listener import (
PgQueueListener,
asyncpg_dsn_from_sqlalchemy_url,
)
listener = PgQueueListener(
dsn=asyncpg_dsn_from_sqlalchemy_url(cfg.postgres_url)
)
await listener.start()
except Exception:
listener = None
try:
worker_task = await _spawn_production_worker(
cfg, genai_client, ocr_client, listener
)
except Exception:
worker_task = None
try:
yield
finally:
if worker_task is not None:
worker_task.cancel()
with suppress(Exception):
await worker_task
if listener is not None:
with suppress(Exception):
await listener.stop()
app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0")
app.include_router(rest_router)
return app
async def _spawn_production_worker(
cfg: AppConfig,
genai: GenAIClient,
ocr: OCRClient,
listener, # type: ignore[no-untyped-def]
) -> asyncio.Task[None]:
"""Spawn the background worker with a production pipeline factory."""
from ix.store.engine import get_session_factory
from ix.worker.loop import Worker
def pipeline_factory() -> Pipeline:
return build_pipeline(genai, ocr, cfg)
worker = Worker(
session_factory=get_session_factory(),
pipeline_factory=pipeline_factory,
poll_interval_seconds=10.0,
max_running_seconds=2 * cfg.pipeline_request_timeout_seconds,
callback_timeout_seconds=cfg.callback_timeout_seconds,
wait_for_work=listener.wait_for_work if listener is not None else None,
)
stop = asyncio.Event()
return asyncio.create_task(worker.run(stop))

86
src/ix/config.py Normal file
View file

@ -0,0 +1,86 @@
"""Application configuration — loaded from ``IX_*`` env vars via pydantic-settings.
Spec §9 lists every tunable. This module is the single read-point for them;
callers that need runtime config should go through :func:`get_config` rather
than ``os.environ``. The LRU cache makes the first call materialise + validate
the full config and every subsequent call return the same instance.
Cache-clearing is public (``get_config.cache_clear()``) because tests need to
re-read after ``monkeypatch.setenv``. Production code never clears the cache.
"""
from __future__ import annotations
from functools import lru_cache
from typing import Literal
from pydantic_settings import BaseSettings, SettingsConfigDict
class AppConfig(BaseSettings):
"""Typed view over the ``IX_*`` environment.
Field names drop the ``IX_`` prefix pydantic-settings puts it back via
``env_prefix``. Defaults match the spec exactly; do not change a default
here without updating spec §9 in the same commit.
"""
model_config = SettingsConfigDict(
env_prefix="IX_",
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
# --- Job store ---
# Defaults assume the ix container runs with `network_mode: host` and
# reaches the shared `postgis` and `ollama` containers on loopback;
# spec §11 / docker-compose.yml ship that configuration.
postgres_url: str = (
"postgresql+asyncpg://infoxtractor:<password>"
"@127.0.0.1:5431/infoxtractor"
)
# --- LLM backend ---
ollama_url: str = "http://127.0.0.1:11434"
default_model: str = "qwen3:14b"
# --- OCR ---
ocr_engine: str = "surya"
# --- Pipeline behavior ---
pipeline_worker_concurrency: int = 1
pipeline_request_timeout_seconds: int = 2700
genai_call_timeout_seconds: int = 1500
render_max_pixels_per_page: int = 75_000_000
# --- File fetching ---
tmp_dir: str = "/tmp/ix"
file_max_bytes: int = 52_428_800
file_connect_timeout_seconds: int = 10
file_read_timeout_seconds: int = 30
# --- Transport / callbacks ---
callback_timeout_seconds: int = 10
# --- Observability ---
log_level: str = "INFO"
# --- Test / wiring mode ---
# ``fake``: factories return FakeGenAIClient / FakeOCRClient and
# ``/healthz`` probes report ok. CI sets this so the Forgejo runner
# doesn't need access to Ollama or GPU-backed Surya. ``None`` (default)
# means production wiring: real OllamaClient + SuryaOCRClient.
test_mode: Literal["fake"] | None = None
@lru_cache(maxsize=1)
def get_config() -> AppConfig:
"""Return the process-wide :class:`AppConfig` (materialise on first call).
Wrapped in ``lru_cache`` so config is parsed + validated once per process.
Tests call ``get_config.cache_clear()`` between scenarios; nothing in
production should touch the cache.
"""
return AppConfig()

View file

@ -1,18 +1,43 @@
"""GenAI subsystem: protocol + fake client + invocation-result dataclasses.
Real backends (Ollama, etc.) plug in behind :class:`GenAIClient`. The MVP
ships only :class:`FakeGenAIClient` from this package; the real Ollama
client lands in Chunk 4.
Real backends (Ollama, ) plug in behind :class:`GenAIClient`. The factory
:func:`make_genai_client` picks between :class:`FakeGenAIClient` (for CI
/ hermetic tests via ``IX_TEST_MODE=fake``) and :class:`OllamaClient`
(production). Tests that want a real Ollama client anyway can call the
constructor directly.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.genai.client import GenAIClient, GenAIInvocationResult, GenAIUsage
from ix.genai.fake import FakeGenAIClient
from ix.genai.ollama_client import OllamaClient
def make_genai_client(cfg: AppConfig) -> GenAIClient:
"""Return the :class:`GenAIClient` configured for the current run.
When ``cfg.test_mode == "fake"`` the fake is returned; the pipeline
callers are expected to override the injected client via DI if they
want a non-default canned response. Otherwise a live
:class:`OllamaClient` bound to ``cfg.ollama_url`` and the per-call
timeout is returned.
"""
if cfg.test_mode == "fake":
return FakeGenAIClient(parsed=None)
return OllamaClient(
base_url=cfg.ollama_url,
per_call_timeout_s=float(cfg.genai_call_timeout_seconds),
)
__all__ = [
"FakeGenAIClient",
"GenAIClient",
"GenAIInvocationResult",
"GenAIUsage",
"OllamaClient",
"make_genai_client",
]

View file

@ -0,0 +1,340 @@
"""OllamaClient — real :class:`GenAIClient` implementation (spec §6 GenAIStep).
Wraps the Ollama ``/api/chat`` structured-output endpoint. Per spec:
* POST ``{base_url}/api/chat`` with ``format = <pydantic JSON schema>``,
``stream = false``, and ``options`` carrying provider-neutral knobs
(``temperature`` mapped, ``reasoning_effort`` dropped Ollama ignores it).
* Messages are passed through. Content-parts lists (``[{"type":"text",...}]``)
are joined to a single string because MVP models (``gpt-oss:20b`` /
``qwen2.5:32b``) don't accept native content-parts.
* Per-call timeout is enforced via ``httpx``. A connection refusal, read
timeout, or 5xx maps to ``IX_002_000``. A 2xx whose ``message.content`` is
not valid JSON for the schema maps to ``IX_002_001``.
``selfcheck()`` targets ``/api/tags`` with a fixed 5 s timeout and is what
``/healthz`` consumes.
"""
from __future__ import annotations
from typing import Any, Literal
import httpx
from pydantic import BaseModel, ValidationError
from ix.errors import IXErrorCode, IXException
from ix.genai.client import GenAIInvocationResult, GenAIUsage
_OLLAMA_TAGS_TIMEOUT_S: float = 5.0
_BODY_SNIPPET_MAX_CHARS: int = 240
class OllamaClient:
"""Async Ollama backend satisfying :class:`~ix.genai.client.GenAIClient`.
Parameters
----------
base_url:
Root URL of the Ollama server (e.g. ``http://127.0.0.1:11434``).
Trailing slashes are stripped.
per_call_timeout_s:
Hard per-call timeout for ``/api/chat``. Spec default: 1500 s.
"""
def __init__(self, base_url: str, per_call_timeout_s: float) -> None:
self._base_url = base_url.rstrip("/")
self._per_call_timeout_s = per_call_timeout_s
async def invoke(
self,
request_kwargs: dict[str, Any],
response_schema: type[BaseModel],
) -> GenAIInvocationResult:
"""Run one structured-output chat call; parse into ``response_schema``."""
body = self._translate_request(request_kwargs, response_schema)
url = f"{self._base_url}/api/chat"
try:
async with httpx.AsyncClient(timeout=self._per_call_timeout_s) as http:
resp = await http.post(url, json=body)
except httpx.HTTPError as exc:
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama {exc.__class__.__name__}: {exc}",
) from exc
except (ConnectionError, TimeoutError) as exc: # pragma: no cover - httpx wraps these
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama {exc.__class__.__name__}: {exc}",
) from exc
if resp.status_code >= 500:
raise IXException(
IXErrorCode.IX_002_000,
detail=(
f"ollama HTTP {resp.status_code}: "
f"{resp.text[:_BODY_SNIPPET_MAX_CHARS]}"
),
)
if resp.status_code >= 400:
raise IXException(
IXErrorCode.IX_002_000,
detail=(
f"ollama HTTP {resp.status_code}: "
f"{resp.text[:_BODY_SNIPPET_MAX_CHARS]}"
),
)
try:
payload = resp.json()
except ValueError as exc:
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama non-JSON body: {resp.text[:_BODY_SNIPPET_MAX_CHARS]}",
) from exc
content = (payload.get("message") or {}).get("content") or ""
json_blob = _extract_json_blob(content)
try:
parsed = response_schema.model_validate_json(json_blob)
except ValidationError as exc:
raise IXException(
IXErrorCode.IX_002_001,
detail=(
f"{response_schema.__name__}: {exc.__class__.__name__}: "
f"body={content[:_BODY_SNIPPET_MAX_CHARS]}"
),
) from exc
except ValueError as exc:
# ``model_validate_json`` raises ValueError on invalid JSON (not
# a ValidationError). Treat as structured-output failure.
raise IXException(
IXErrorCode.IX_002_001,
detail=(
f"{response_schema.__name__}: invalid JSON: "
f"body={content[:_BODY_SNIPPET_MAX_CHARS]}"
),
) from exc
usage = GenAIUsage(
prompt_tokens=int(payload.get("prompt_eval_count") or 0),
completion_tokens=int(payload.get("eval_count") or 0),
)
model_name = str(payload.get("model") or request_kwargs.get("model") or "")
return GenAIInvocationResult(parsed=parsed, usage=usage, model_name=model_name)
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
"""Probe ``/api/tags`` for ``/healthz``.
``ok`` when the server answers 2xx and ``expected_model`` is listed;
``degraded`` when reachable but the model is missing; ``fail``
otherwise. Spec §5, §11.
"""
try:
async with httpx.AsyncClient(timeout=_OLLAMA_TAGS_TIMEOUT_S) as http:
resp = await http.get(f"{self._base_url}/api/tags")
except (httpx.HTTPError, ConnectionError, TimeoutError):
return "fail"
if resp.status_code != 200:
return "fail"
try:
payload = resp.json()
except ValueError:
return "fail"
models = payload.get("models") or []
names = {str(entry.get("name", "")) for entry in models}
if expected_model in names:
return "ok"
return "degraded"
def _translate_request(
self,
request_kwargs: dict[str, Any],
response_schema: type[BaseModel],
) -> dict[str, Any]:
"""Map provider-neutral kwargs to Ollama's /api/chat body.
Schema strategy for Ollama 0.11.8: we pass ``format="json"`` (loose
JSON mode) and bake the Pydantic schema into a system message
ahead of the caller's own system prompt. Rationale:
* The full Pydantic schema as ``format=<schema>`` crashes llama.cpp's
structured-output implementation (SIGSEGV) on every non-trivial
shape ``anyOf`` / ``$ref`` / ``pattern`` all trigger it.
* ``format="json"`` alone guarantees valid JSON but not the shape;
models routinely return ``{}`` when not told what fields to emit.
* Injecting the schema into the prompt is the cheapest way to
get both: the model sees the expected shape explicitly, Pydantic
validates the response at parse time (IX_002_001 on mismatch).
Non-Ollama ``GenAIClient`` impls can ignore this behaviour and use
native structured-output (``response_format`` on OpenAI, etc.).
"""
messages = self._translate_messages(
list(request_kwargs.get("messages") or [])
)
messages = _inject_schema_system_message(messages, response_schema)
body: dict[str, Any] = {
"model": request_kwargs.get("model"),
"messages": messages,
"stream": False,
# NOTE: format is deliberately omitted. `format="json"` made
# reasoning models (qwen3) abort after emitting `{}` because the
# constrained sampler terminated before the chain-of-thought
# finished; `format=<schema>` segfaulted Ollama 0.11.8. Letting
# the model stream freely and then extracting the trailing JSON
# blob works for both reasoning and non-reasoning models.
}
options: dict[str, Any] = {}
if "temperature" in request_kwargs:
options["temperature"] = request_kwargs["temperature"]
# reasoning_effort intentionally dropped — Ollama doesn't support it.
if options:
body["options"] = options
return body
@staticmethod
def _translate_messages(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Collapse content-parts lists into single strings for Ollama."""
out: list[dict[str, Any]] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
text_parts = [
str(part.get("text", ""))
for part in content
if isinstance(part, dict) and part.get("type") == "text"
]
new_content = "\n".join(text_parts)
else:
new_content = content
out.append({**msg, "content": new_content})
return out
def _extract_json_blob(text: str) -> str:
"""Return the outermost balanced JSON object in ``text``.
Reasoning models (qwen3, deepseek-r1) wrap their real answer in
``<think></think>`` blocks. Other models sometimes prefix prose or
fence the JSON in ```json``` code blocks. Finding the last balanced
``{}`` is the cheapest robust parse that works for all three shapes;
a malformed response yields the full text and Pydantic catches it
downstream as ``IX_002_001``.
"""
start = text.find("{")
if start < 0:
return text
depth = 0
in_string = False
escaped = False
for i in range(start, len(text)):
ch = text[i]
if in_string:
if escaped:
escaped = False
elif ch == "\\":
escaped = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[start : i + 1]
return text[start:]
def _inject_schema_system_message(
messages: list[dict[str, Any]],
response_schema: type[BaseModel],
) -> list[dict[str, Any]]:
"""Prepend a system message that pins the expected JSON shape.
Ollama's ``format="json"`` mode guarantees valid JSON but not the
field set or names. We emit the Pydantic schema as JSON and
instruct the model to match it. If the caller already provides a
system message, we prepend ours; otherwise ours becomes the first
system turn.
"""
import json as _json
schema_json = _json.dumps(
_sanitise_schema_for_ollama(response_schema.model_json_schema()),
indent=2,
)
guidance = (
"Respond ONLY with a single JSON object matching this JSON Schema "
"exactly. No prose, no code fences, no explanations. All top-level "
"properties listed in `required` MUST be present. Use null for "
"fields you cannot confidently extract. The JSON Schema:\n"
f"{schema_json}"
)
return [{"role": "system", "content": guidance}, *messages]
def _sanitise_schema_for_ollama(schema: Any) -> Any:
"""Strip null branches from ``anyOf`` unions.
Ollama 0.11.8's llama.cpp structured-output implementation segfaults on
Pydantic v2's standard Optional pattern::
{"anyOf": [{"type": "string"}, {"type": "null"}]}
We collapse any ``anyOf`` that includes a ``{"type": "null"}`` entry to
its non-null branch single branch becomes that branch inline; multiple
branches keep the union without null. This only narrows what the LLM is
*told* it may emit; Pydantic still validates the real response and can
accept ``None`` at parse time if the field is ``Optional``.
Walk is recursive and structure-preserving. Other ``anyOf`` shapes (e.g.
polymorphic unions without null) are left alone.
"""
if isinstance(schema, dict):
cleaned: dict[str, Any] = {}
for key, value in schema.items():
if key == "anyOf" and isinstance(value, list):
non_null = [
_sanitise_schema_for_ollama(branch)
for branch in value
if not (isinstance(branch, dict) and branch.get("type") == "null")
]
if len(non_null) == 1:
# Inline the single remaining branch; merge its keys into the
# parent so siblings like ``default``/``title`` are preserved.
only = non_null[0]
if isinstance(only, dict):
for ok, ov in only.items():
cleaned.setdefault(ok, ov)
else:
cleaned[key] = non_null
elif len(non_null) == 0:
# Pathological: nothing left. Fall back to a permissive type.
cleaned["type"] = "string"
else:
cleaned[key] = non_null
else:
cleaned[key] = _sanitise_schema_for_ollama(value)
return cleaned
if isinstance(schema, list):
return [_sanitise_schema_for_ollama(item) for item in schema]
return schema
__all__ = ["OllamaClient"]

View file

@ -1,13 +1,34 @@
"""OCR subsystem: protocol + fake client.
"""OCR subsystem: protocol + fake + real Surya client + factory.
Real engines (Surya, Azure DI, ) plug in behind :class:`OCRClient`. The
MVP ships only :class:`FakeOCRClient` from this package; the real Surya
client lands in Chunk 4.
Real engines (Surya today, Azure DI / AWS Textract deferred) plug in
behind :class:`OCRClient`. The factory :func:`make_ocr_client` picks
between :class:`FakeOCRClient` (when ``IX_TEST_MODE=fake``) and
:class:`SuryaOCRClient` (production). Unknown engine names raise so a
typo'd ``IX_OCR_ENGINE`` surfaces at startup, not later.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.contracts.response import OCRDetails, OCRResult
from ix.ocr.client import OCRClient
from ix.ocr.fake import FakeOCRClient
from ix.ocr.surya_client import SuryaOCRClient
__all__ = ["FakeOCRClient", "OCRClient"]
def make_ocr_client(cfg: AppConfig) -> OCRClient:
"""Return the :class:`OCRClient` configured for the current run."""
if cfg.test_mode == "fake":
return FakeOCRClient(canned=OCRResult(result=OCRDetails()))
if cfg.ocr_engine == "surya":
return SuryaOCRClient()
raise ValueError(f"Unknown ocr_engine: {cfg.ocr_engine!r}")
__all__ = [
"FakeOCRClient",
"OCRClient",
"SuryaOCRClient",
"make_ocr_client",
]

View file

@ -5,11 +5,19 @@ method satisfies the Protocol. :class:`~ix.pipeline.ocr_step.OCRStep`
depends on the Protocol, not a concrete class, so swapping engines
(``FakeOCRClient`` in tests, ``SuryaOCRClient`` in prod) stays a wiring
change at the app factory.
Per-page source location (``files`` + ``page_metadata``) flows in as
optional kwargs: fakes ignore them; the real
:class:`~ix.ocr.surya_client.SuryaOCRClient` uses them to render each
page's pixels back from disk. Keeping these optional lets unit tests stay
pages-only while production wiring (Task 4.3) plumbs through the real
filesystem handles.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from ix.contracts import OCRResult, Page
@ -24,8 +32,18 @@ class OCRClient(Protocol):
per input page (in the same order).
"""
async def ocr(self, pages: list[Page]) -> OCRResult:
"""Run OCR over the input pages; return the structured result."""
async def ocr(
self,
pages: list[Page],
*,
files: list[tuple[Path, str]] | None = None,
page_metadata: list[Any] | None = None,
) -> OCRResult:
"""Run OCR over the input pages; return the structured result.
``files`` and ``page_metadata`` are optional for hermetic tests;
real engines that need to re-render from disk read them.
"""
...

View file

@ -30,8 +30,17 @@ class FakeOCRClient:
self._canned = canned
self._raise_on_call = raise_on_call
async def ocr(self, pages: list[Page]) -> OCRResult:
"""Return the canned result or raise the configured error."""
async def ocr(
self,
pages: list[Page],
**_kwargs: object,
) -> OCRResult:
"""Return the canned result or raise the configured error.
Accepts (and ignores) any keyword args the production Protocol may
carry keeps the fake swappable for :class:`SuryaOCRClient` at
call sites that pass ``files`` / ``page_metadata``.
"""
if self._raise_on_call is not None:
raise self._raise_on_call
return self._canned

235
src/ix/ocr/surya_client.py Normal file
View file

@ -0,0 +1,235 @@
"""SuryaOCRClient — real :class:`OCRClient` backed by ``surya-ocr``.
Per spec §6.2: the MVP OCR engine. Runs Surya's detection + recognition
predictors over per-page PIL images rendered from the downloaded sources
(PDFs via PyMuPDF, images via Pillow).
Design choices:
* **Lazy model loading.** ``__init__`` is cheap; the heavy predictors are
built on first :meth:`ocr` / :meth:`selfcheck` / explicit :meth:`warm_up`.
This keeps FastAPI's lifespan predictable — ops can decide whether to
pay the load cost up front or on first request.
* **Device is Surya's default.** CUDA on the prod box, MPS on M-series Macs.
We deliberately don't pin.
* **No text-token reuse from PyMuPDF.** The cross-check against Paperless'
Tesseract output (ReliabilityStep's ``text_agreement``) is only meaningful
with a truly independent OCR pass, so we always render-and-recognize
even for PDFs that carry embedded text.
The ``surya-ocr`` package pulls torch + heavy model deps, so it's kept
behind the ``[ocr]`` extra. All Surya imports are deferred into
:meth:`warm_up` so running the unit tests (which patch the predictors)
doesn't require the package to be installed.
"""
from __future__ import annotations
import asyncio
import contextlib
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from ix.contracts import Line, OCRDetails, OCRResult, Page
from ix.segmentation import PageMetadata
if TYPE_CHECKING: # pragma: no cover
from PIL import Image as PILImage
class SuryaOCRClient:
"""Surya-backed OCR engine.
Attributes are created lazily by :meth:`warm_up`. The unit tests inject
mocks directly onto ``_recognition_predictor`` / ``_detection_predictor``
to avoid the Surya import chain.
"""
def __init__(self) -> None:
self._recognition_predictor: Any = None
self._detection_predictor: Any = None
def warm_up(self) -> None:
"""Load the detection + recognition predictors. Idempotent.
Called automatically on the first :meth:`ocr` / :meth:`selfcheck`,
or explicitly from the app lifespan to front-load the cost.
"""
if (
self._recognition_predictor is not None
and self._detection_predictor is not None
):
return
# Deferred imports: only reachable when the optional [ocr] extra is
# installed. Keeping them inside the method so base-install unit
# tests (which patch the predictors) don't need surya on sys.path.
from surya.detection import DetectionPredictor # type: ignore[import-not-found]
from surya.foundation import FoundationPredictor # type: ignore[import-not-found]
from surya.recognition import RecognitionPredictor # type: ignore[import-not-found]
foundation = FoundationPredictor()
self._recognition_predictor = RecognitionPredictor(foundation)
self._detection_predictor = DetectionPredictor()
async def ocr(
self,
pages: list[Page],
*,
files: list[tuple[Path, str]] | None = None,
page_metadata: list[Any] | None = None,
) -> OCRResult:
"""Render each input page, run Surya, translate back to contracts."""
self.warm_up()
images = self._render_pages(pages, files, page_metadata)
# Surya is blocking — run it off the event loop.
loop = asyncio.get_running_loop()
surya_results = await loop.run_in_executor(
None, self._run_recognition, images
)
out_pages: list[Page] = []
all_text_fragments: list[str] = []
for input_page, surya_result in zip(pages, surya_results, strict=True):
lines: list[Line] = []
for tl in getattr(surya_result, "text_lines", []) or []:
flat = self._flatten_polygon(getattr(tl, "polygon", None))
text = getattr(tl, "text", None)
lines.append(Line(text=text, bounding_box=flat))
if text:
all_text_fragments.append(text)
out_pages.append(
Page(
page_no=input_page.page_no,
width=input_page.width,
height=input_page.height,
angle=input_page.angle,
unit=input_page.unit,
lines=lines,
)
)
details = OCRDetails(
text="\n".join(all_text_fragments) if all_text_fragments else None,
pages=out_pages,
)
return OCRResult(result=details, meta_data={"engine": "surya"})
async def selfcheck(self) -> Literal["ok", "fail"]:
"""Run the predictors on a 1x1 image to confirm the stack works."""
try:
self.warm_up()
except Exception:
return "fail"
try:
from PIL import Image as PILImageRuntime
img = PILImageRuntime.new("RGB", (1, 1), color="white")
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._run_recognition, [img])
except Exception:
return "fail"
return "ok"
def _run_recognition(self, images: list[PILImage.Image]) -> list[Any]:
"""Invoke the recognition predictor. Kept tiny for threadpool offload."""
return list(
self._recognition_predictor(
images, det_predictor=self._detection_predictor
)
)
def _render_pages(
self,
pages: list[Page],
files: list[tuple[Path, str]] | None,
page_metadata: list[Any] | None,
) -> list[PILImage.Image]:
"""Render each input :class:`Page` to a PIL image.
We walk pages + page_metadata in lockstep so we know which source
file each page came from and (for PDFs) what page-index to render.
Text-only pages (``file_index is None``) get a blank 1x1 placeholder
so Surya returns an empty result and downstream code still gets one
entry per input page.
"""
from PIL import Image as PILImageRuntime
metas: list[PageMetadata] = list(page_metadata or [])
file_records: list[tuple[Path, str]] = list(files or [])
# Per-file lazy PDF openers so we don't re-open across pages.
pdf_docs: dict[int, Any] = {}
# Per-file running page-within-file counter. For PDFs we emit one
# entry per PDF page in order; ``pages`` was built the same way by
# DocumentIngestor, so a parallel counter reconstructs the mapping.
per_file_cursor: dict[int, int] = {}
rendered: list[PILImage.Image] = []
try:
for idx, _page in enumerate(pages):
meta = metas[idx] if idx < len(metas) else PageMetadata()
file_index = meta.file_index
if file_index is None or file_index >= len(file_records):
# Text-only page — placeholder image; Surya returns empty.
rendered.append(
PILImageRuntime.new("RGB", (1, 1), color="white")
)
continue
local_path, mime = file_records[file_index]
if mime == "application/pdf":
doc = pdf_docs.get(file_index)
if doc is None:
import fitz # PyMuPDF
doc = fitz.open(str(local_path))
pdf_docs[file_index] = doc
pdf_page_no = per_file_cursor.get(file_index, 0)
per_file_cursor[file_index] = pdf_page_no + 1
pdf_page = doc.load_page(pdf_page_no)
pix = pdf_page.get_pixmap(dpi=200)
img = PILImageRuntime.frombytes(
"RGB", (pix.width, pix.height), pix.samples
)
rendered.append(img)
elif mime in ("image/png", "image/jpeg", "image/tiff"):
frame_no = per_file_cursor.get(file_index, 0)
per_file_cursor[file_index] = frame_no + 1
img = PILImageRuntime.open(local_path)
# Handle multi-frame (TIFF) — seek to the right frame.
with contextlib.suppress(EOFError):
img.seek(frame_no)
rendered.append(img.convert("RGB"))
else: # pragma: no cover - ingestor already rejected
rendered.append(
PILImageRuntime.new("RGB", (1, 1), color="white")
)
finally:
for doc in pdf_docs.values():
with contextlib.suppress(Exception):
doc.close()
return rendered
@staticmethod
def _flatten_polygon(polygon: Any) -> list[float]:
"""Flatten ``[[x1,y1],[x2,y2],[x3,y3],[x4,y4]]`` → 8-float list.
Surya emits 4 quad corners. The spec wants 8 raw-pixel coords so
downstream provenance normalisation can consume them directly.
"""
if not polygon:
return []
flat: list[float] = []
for point in polygon:
if isinstance(point, (list, tuple)) and len(point) >= 2:
flat.append(float(point[0]))
flat.append(float(point[1]))
return flat
__all__ = ["SuryaOCRClient"]

View file

@ -0,0 +1,216 @@
"""GenAIStep — assemble prompt, call LLM, map provenance (spec §6.3, §7, §9.2).
Runs after :class:`~ix.pipeline.ocr_step.OCRStep`. Builds the chat-style
``request_kwargs`` (messages + model name), picks the structured-output
schema (plain ``UseCaseResponse`` or a runtime
``ProvenanceWrappedResponse(result=..., segment_citations=...)`` when
provenance is on), hands both to the injected :class:`GenAIClient`, and
writes the parsed payload onto ``response_ix.ix_result``.
When provenance is on, the LLM-emitted ``segment_citations`` flow into
:func:`~ix.provenance.map_segment_refs_to_provenance` to build
``response_ix.provenance``. The per-field reliability flags
(``provenance_verified`` / ``text_agreement``) stay ``None`` here they
land in :class:`~ix.pipeline.reliability_step.ReliabilityStep`.
Failure modes:
* Network / timeout / non-2xx surfaced by the client ``IX_002_000``.
* :class:`pydantic.ValidationError` (structured output didn't match the
schema) ``IX_002_001``.
"""
from __future__ import annotations
from typing import Any, cast
import httpx
from pydantic import BaseModel, Field, ValidationError, create_model
from ix.contracts import RequestIX, ResponseIX, SegmentCitation
from ix.errors import IXErrorCode, IXException
from ix.genai.client import GenAIClient
from ix.pipeline.step import Step
from ix.provenance import map_segment_refs_to_provenance
from ix.segmentation import SegmentIndex
# Verbatim from spec §9.2 (core-pipeline spec) — inserted after the
# use-case system prompt when provenance is on.
_CITATION_INSTRUCTION = (
"For each extracted field, you must also populate the `segment_citations` list.\n"
"Each entry maps one field to the document segments that were its source.\n"
"Set `field_path` to the dot-separated JSON path of the field "
"(e.g. 'result.invoice_number').\n"
"Use two separate segment ID lists:\n"
"- `value_segment_ids`: segment IDs whose text directly contains the extracted "
"value (e.g. ['p1_l4'] for the line containing 'INV-001').\n"
"- `context_segment_ids`: segment IDs for surrounding label or anchor text that "
"helped you identify the field but does not contain the value itself "
"(e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is "
"no distinct label.\n"
"Only use segment IDs that appear in the document text.\n"
"Omit fields for which you cannot identify a source segment."
)
class GenAIStep(Step):
"""LLM extraction + (optional) provenance mapping."""
def __init__(self, genai_client: GenAIClient) -> None:
self._client = genai_client
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
if request_ix.options.ocr.ocr_only:
return False
ctx = response_ix.context
ocr_text = (
response_ix.ocr_result.result.text
if response_ix.ocr_result is not None
else None
)
texts = list(getattr(ctx, "texts", []) or []) if ctx is not None else []
if not (ocr_text and ocr_text.strip()) and not texts:
raise IXException(IXErrorCode.IX_001_000)
return True
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
ctx = response_ix.context
assert ctx is not None, "SetupStep must populate response_ix.context"
use_case_request: Any = getattr(ctx, "use_case_request", None)
use_case_response_cls: type[BaseModel] = getattr(ctx, "use_case_response", None)
assert use_case_request is not None and use_case_response_cls is not None
opts = request_ix.options
provenance_on = opts.provenance.include_provenance
# 1. System prompt — use-case default + optional citation instruction.
system_prompt = use_case_request.system_prompt
if provenance_on:
system_prompt = f"{system_prompt}\n\n{_CITATION_INSTRUCTION}"
# 2. User text — segment-tagged when provenance is on, else plain OCR + texts.
user_text = self._build_user_text(response_ix, provenance_on)
# 3. Response schema — plain or wrapped.
response_schema = self._resolve_response_schema(
use_case_response_cls, provenance_on
)
# 4. Model selection — request override → use-case default.
model_name = (
opts.gen_ai.gen_ai_model_name
or getattr(use_case_request, "default_model", None)
)
request_kwargs = {
"model": model_name,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
}
# 5. Call the backend, translate errors.
try:
result = await self._client.invoke(
request_kwargs=request_kwargs,
response_schema=response_schema,
)
except ValidationError as exc:
raise IXException(
IXErrorCode.IX_002_001,
detail=f"{use_case_response_cls.__name__}: {exc}",
) from exc
except (httpx.HTTPError, ConnectionError, TimeoutError) as exc:
raise IXException(
IXErrorCode.IX_002_000,
detail=f"{model_name}: {exc.__class__.__name__}: {exc}",
) from exc
except IXException:
raise
# 6. Split parsed output; write result + meta.
if provenance_on:
wrapped = result.parsed
extraction: BaseModel = wrapped.result
segment_citations: list[SegmentCitation] = list(
getattr(wrapped, "segment_citations", []) or []
)
else:
extraction = result.parsed
segment_citations = []
response_ix.ix_result.result = extraction.model_dump(mode="json")
response_ix.ix_result.meta_data = {
"model_name": result.model_name,
"token_usage": {
"prompt_tokens": result.usage.prompt_tokens,
"completion_tokens": result.usage.completion_tokens,
},
}
# 7. Provenance mapping — only the structural assembly. Reliability
# flags get written in ReliabilityStep.
if provenance_on:
seg_idx = cast(SegmentIndex, getattr(ctx, "segment_index", None))
if seg_idx is None:
# No OCR was run (text-only request); skip provenance.
response_ix.provenance = None
else:
response_ix.provenance = map_segment_refs_to_provenance(
extraction_result={"result": response_ix.ix_result.result},
segment_citations=segment_citations,
segment_index=seg_idx,
max_sources_per_field=opts.provenance.max_sources_per_field,
min_confidence=0.0,
include_bounding_boxes=True,
source_type="value_and_context",
)
return response_ix
def _build_user_text(self, response_ix: ResponseIX, provenance_on: bool) -> str:
ctx = response_ix.context
assert ctx is not None
texts: list[str] = list(getattr(ctx, "texts", []) or [])
seg_idx: SegmentIndex | None = getattr(ctx, "segment_index", None)
if provenance_on and seg_idx is not None:
return seg_idx.to_prompt_text(context_texts=texts)
# Plain concat — OCR flat text + any extra paperless-style texts.
parts: list[str] = []
ocr_text = (
response_ix.ocr_result.result.text
if response_ix.ocr_result is not None
else None
)
if ocr_text:
parts.append(ocr_text)
parts.extend(texts)
return "\n\n".join(p for p in parts if p)
def _resolve_response_schema(
self,
use_case_response_cls: type[BaseModel],
provenance_on: bool,
) -> type[BaseModel]:
if not provenance_on:
return use_case_response_cls
# Dynamic wrapper — one per call is fine; Pydantic caches the
# generated JSON schema internally.
return create_model(
"ProvenanceWrappedResponse",
result=(use_case_response_cls, ...),
segment_citations=(
list[SegmentCitation],
Field(default_factory=list),
),
)
__all__ = ["GenAIStep"]

View file

@ -56,7 +56,11 @@ class OCRStep(Step):
assert ctx is not None, "SetupStep must populate response_ix.context"
pages = list(getattr(ctx, "pages", []))
ocr_result = await self._client.ocr(pages)
files = list(getattr(ctx, "files", []) or [])
page_metadata = list(getattr(ctx, "page_metadata", []) or [])
ocr_result = await self._client.ocr(
pages, files=files, page_metadata=page_metadata
)
# Inject page tags around each OCR page's content so the LLM can
# cross-reference the visual anchor without a separate prompt hack.

View file

@ -0,0 +1,56 @@
"""ReliabilityStep — writes provenance_verified + text_agreement (spec §6).
Runs after :class:`~ix.pipeline.genai_step.GenAIStep`. Skips entirely
when provenance is off OR when no provenance data was built (OCR-skipped
text-only request, for example). Otherwise delegates to
:func:`~ix.provenance.apply_reliability_flags`, which mutates each
:class:`~ix.contracts.FieldProvenance` in place and fills the two
summary counters (``verified_fields``, ``text_agreement_fields``) on
``quality_metrics``.
No own dispatch logic everything interesting lives in the normalisers
+ verifier modules and is unit-tested there.
"""
from __future__ import annotations
from typing import cast
from pydantic import BaseModel
from ix.contracts import RequestIX, ResponseIX
from ix.pipeline.step import Step
from ix.provenance import apply_reliability_flags
class ReliabilityStep(Step):
"""Fills per-field reliability flags on ``response.provenance``."""
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
if not request_ix.options.provenance.include_provenance:
return False
return response_ix.provenance is not None
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
assert response_ix.provenance is not None # validate() guarantees
ctx = response_ix.context
texts: list[str] = (
list(getattr(ctx, "texts", []) or []) if ctx is not None else []
)
use_case_response_cls = cast(
type[BaseModel],
getattr(ctx, "use_case_response", None) if ctx is not None else None,
)
apply_reliability_flags(
provenance_data=response_ix.provenance,
use_case_response=use_case_response_cls,
texts=texts,
)
return response_ix
__all__ = ["ReliabilityStep"]

View file

@ -0,0 +1,67 @@
"""ResponseHandlerStep — final shape-up before the caller sees the payload (spec §8).
Does three purely mechanical things:
1. When ``include_ocr_text`` is set, concatenate every non-tag line text
into ``ocr_result.result.text`` (pages joined with blank line).
2. When ``include_geometries`` is **not** set (the default), strip
``ocr_result.result.pages`` and ``ocr_result.meta_data`` geometries
are heavyweight; callers opt in.
3. Clear ``response_ix.context`` (belt-and-braces ``Field(exclude=True)``
already keeps it out of ``model_dump`` output).
:meth:`validate` always returns True per spec.
"""
from __future__ import annotations
import re
from ix.contracts import RequestIX, ResponseIX
from ix.pipeline.step import Step
_PAGE_TAG_RE = re.compile(r"^\s*<\s*/?\s*page\b", re.IGNORECASE)
def _is_page_tag(text: str | None) -> bool:
if not text:
return False
return bool(_PAGE_TAG_RE.match(text))
class ResponseHandlerStep(Step):
"""Final shape-up step."""
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
return True
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
ocr_opts = request_ix.options.ocr
# 1. Attach flat OCR text if requested.
if ocr_opts.include_ocr_text:
page_texts: list[str] = []
for page in response_ix.ocr_result.result.pages:
line_texts = [
line.text or ""
for line in page.lines
if not _is_page_tag(line.text)
]
page_texts.append("\n".join(line_texts))
response_ix.ocr_result.result.text = "\n\n".join(page_texts) or None
# 2. Strip geometries unless explicitly retained.
if not ocr_opts.include_geometries:
response_ix.ocr_result.result.pages = []
response_ix.ocr_result.meta_data = {}
# 3. Drop the internal context — already Field(exclude=True),
# this is defense in depth.
response_ix.context = None
return response_ix
__all__ = ["ResponseHandlerStep"]

20
src/ix/store/__init__.py Normal file
View file

@ -0,0 +1,20 @@
"""Async Postgres job store — SQLAlchemy 2.0 ORM + repo.
Exports are intentionally minimal: the engine factory and the declarative
``Base`` + ``IxJob`` ORM. The ``jobs_repo`` module lives next to this one and
exposes the CRUD methods callers actually need; we don't re-export from the
package so it stays obvious where each function lives.
"""
from __future__ import annotations
from ix.store.engine import get_engine, get_session_factory, reset_engine
from ix.store.models import Base, IxJob
__all__ = [
"Base",
"IxJob",
"get_engine",
"get_session_factory",
"reset_engine",
]

76
src/ix/store/engine.py Normal file
View file

@ -0,0 +1,76 @@
"""Lazy async engine + session-factory singletons.
The factories read ``IX_POSTGRES_URL`` from the environment on first call. In
Task 3.2 this switches to ``get_config()``; for now we go through ``os.environ``
directly so the store module doesn't depend on config that doesn't exist yet.
Both factories are idempotent on success repeat calls return the same
engine / sessionmaker. ``reset_engine`` nukes the cache and should only be
used in tests (where we teardown-recreate the DB between sessions).
"""
from __future__ import annotations
import os
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def _resolve_url() -> str:
"""Grab the Postgres URL from the environment.
Task 3.2 refactors this to go through ``ix.config.get_config()``; this
version keeps the store module usable during the bootstrap window where
``ix.config`` doesn't exist yet. Behaviour after refactor is identical —
both paths ultimately read ``IX_POSTGRES_URL``.
"""
try:
from ix.config import get_config
except ImportError:
url = os.environ.get("IX_POSTGRES_URL")
if not url:
raise RuntimeError(
"IX_POSTGRES_URL is not set and ix.config is unavailable"
) from None
return url
return get_config().postgres_url
def get_engine() -> AsyncEngine:
"""Return the process-wide async engine; create on first call."""
global _engine
if _engine is None:
_engine = create_async_engine(_resolve_url(), pool_pre_ping=True)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Return the process-wide session factory; create on first call.
``expire_on_commit=False`` so ORM instances stay usable after ``commit()``
we frequently commit inside a repo method and then ``model_validate``
the row outside the session.
"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(get_engine(), expire_on_commit=False)
return _session_factory
def reset_engine() -> None:
"""Drop the cached engine + session factory. Test-only."""
global _engine, _session_factory
_engine = None
_session_factory = None

274
src/ix/store/jobs_repo.py Normal file
View file

@ -0,0 +1,274 @@
"""Async CRUD over ``ix_jobs`` — the one module the worker / REST touches.
Every method takes an :class:`AsyncSession` (caller-owned transaction). The
caller commits. We don't manage transactions inside repo methods because the
worker sometimes needs to claim + run-pipeline + mark-done inside one
long-running unit of work, and an inside-the-method commit would break that.
A few invariants worth stating up front:
* ``ix_id`` is a 16-char hex string assigned by :func:`insert_pending` on
first insert. Callers MUST NOT pass one (we generate it); if a
``RequestIX`` arrives with ``ix_id`` set it is ignored.
* ``(client_id, request_id)`` is unique on collision we return the
existing row unchanged. Callback URLs on the second insert are ignored;
the first insert's metadata wins.
* Claim uses ``FOR UPDATE SKIP LOCKED`` so concurrent workers never pick the
same row, and a session holding a lock doesn't block a sibling claimer.
* Status transitions: ``pending running (done | error)``. The sweeper is
the only path back to ``pending`` (and only from ``running``); terminal
states are stable.
"""
from __future__ import annotations
import secrets
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
from uuid import UUID, uuid4
from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.contracts.response import ResponseIX
from ix.store.models import IxJob
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
def _new_ix_id() -> str:
"""Transport-assigned 16-hex handle.
``secrets.token_hex(8)`` gives 16 characters of entropy; good enough to
tag logs per spec §3 without collision risk across the lifetime of the
service.
"""
return secrets.token_hex(8)
def _orm_to_job(row: IxJob) -> Job:
"""Round-trip ORM row back through the Pydantic ``Job`` contract.
The JSONB columns come out as plain dicts; we let Pydantic re-validate
them into :class:`RequestIX` / :class:`ResponseIX`. Catching validation
errors here would mask real bugs; we let them surface.
"""
return Job(
job_id=row.job_id,
ix_id=row.ix_id,
client_id=row.client_id,
request_id=row.request_id,
status=row.status, # type: ignore[arg-type]
request=RequestIX.model_validate(row.request),
response=(
ResponseIX.model_validate(row.response) if row.response is not None else None
),
callback_url=row.callback_url,
callback_status=row.callback_status, # type: ignore[arg-type]
attempts=row.attempts,
created_at=row.created_at,
started_at=row.started_at,
finished_at=row.finished_at,
)
async def insert_pending(
session: AsyncSession,
request: RequestIX,
callback_url: str | None,
) -> Job:
"""Insert a pending row; return the new or existing :class:`Job`.
Uses ``INSERT ... ON CONFLICT DO NOTHING`` on the
``(client_id, request_id)`` unique index, then re-selects. If the insert
was a no-op the existing row is returned verbatim (status / callback_url
unchanged) callers rely on this for idempotent resubmission.
"""
ix_id = request.ix_id or _new_ix_id()
job_id = uuid4()
# Serialise the request through Pydantic so JSONB gets plain JSON types,
# not datetime / Decimal instances asyncpg would reject.
request_json = request.model_copy(update={"ix_id": ix_id}).model_dump(
mode="json"
)
stmt = (
pg_insert(IxJob)
.values(
job_id=job_id,
ix_id=ix_id,
client_id=request.ix_client_id,
request_id=request.request_id,
status="pending",
request=request_json,
response=None,
callback_url=callback_url,
callback_status=None,
attempts=0,
)
.on_conflict_do_nothing(index_elements=["client_id", "request_id"])
)
await session.execute(stmt)
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == request.ix_client_id,
IxJob.request_id == request.request_id,
)
)
assert row is not None, "insert_pending: row missing after upsert"
return _orm_to_job(row)
async def claim_next_pending(session: AsyncSession) -> Job | None:
"""Atomically pick the oldest pending row and flip it to running.
``FOR UPDATE SKIP LOCKED`` means a sibling worker can never deadlock on
our row; they'll skip past it and grab the next pending entry. The
sibling test in :mod:`tests/integration/test_jobs_repo` asserts this.
"""
stmt = (
select(IxJob)
.where(IxJob.status == "pending")
.order_by(IxJob.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
row = await session.scalar(stmt)
if row is None:
return None
row.status = "running"
row.started_at = datetime.now(UTC)
await session.flush()
return _orm_to_job(row)
async def get(session: AsyncSession, job_id: UUID) -> Job | None:
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
return _orm_to_job(row) if row is not None else None
async def get_by_correlation(
session: AsyncSession, client_id: str, request_id: str
) -> Job | None:
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == client_id,
IxJob.request_id == request_id,
)
)
return _orm_to_job(row) if row is not None else None
async def mark_done(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Write the pipeline's response and move to terminal state.
Status is ``done`` iff ``response.error is None``; any non-None error
flips us to ``error``. Spec §3 lifecycle invariant.
"""
status = "done" if response.error is None else "error"
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status=status,
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def mark_error(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Convenience wrapper that always writes status='error'.
Separate from :func:`mark_done` for readability at call sites: when the
worker knows it caught an exception the pipeline didn't handle itself,
``mark_error`` signals intent even if the response body happens to have
a populated error field.
"""
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status="error",
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def update_callback_status(
session: AsyncSession,
job_id: UUID,
status: Literal["delivered", "failed"],
) -> None:
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(callback_status=status)
)
async def sweep_orphans(
session: AsyncSession,
now: datetime,
max_running_seconds: int,
) -> list[UUID]:
"""Reset stale ``running`` rows back to ``pending`` and bump ``attempts``.
Called once at worker startup (spec §3) to rescue jobs whose owner died
mid-pipeline. The threshold is time-based on ``started_at`` so a still-
running worker never reclaims its own in-flight job callers pass
``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS`` per spec.
"""
# Pick candidates and return their ids so the worker can log what it
# did. Two-step (SELECT then UPDATE) is clearer than RETURNING for
# callers who want the id list alongside a plain UPDATE.
candidates = (
await session.scalars(
select(IxJob.job_id).where(
IxJob.status == "running",
IxJob.started_at < now - _as_interval(max_running_seconds),
)
)
).all()
if not candidates:
return []
await session.execute(
update(IxJob)
.where(IxJob.job_id.in_(candidates))
.values(
status="pending",
started_at=None,
attempts=IxJob.attempts + 1,
)
)
return list(candidates)
def _as_interval(seconds: int): # type: ignore[no-untyped-def]
"""Return a SQL interval expression for ``seconds``.
We build the interval via ``func.make_interval`` so asyncpg doesn't have
to guess at a text-form cast the server-side ``make_interval(secs :=)``
is unambiguous and avoids locale-dependent parsing.
"""
return func.make_interval(0, 0, 0, 0, 0, 0, seconds)

86
src/ix/store/models.py Normal file
View file

@ -0,0 +1,86 @@
"""SQLAlchemy 2.0 ORM for ``ix_jobs``.
Shape matches the initial migration (``alembic/versions/001_initial_ix_jobs.py``)
which in turn matches spec §4. JSONB columns carry the RequestIX / ResponseIX
Pydantic payloads; we don't wrap them in custom TypeDecorators — the repo does
an explicit ``model_dump(mode="json")`` on write and ``model_validate`` on read
so the ORM stays a thin mapping layer and the Pydantic round-trip logic stays
colocated with the other contract code.
The status column is a plain string the CHECK constraint in the DB enforces
the allowed values. Using a SQLAlchemy ``Enum`` type here would double-bind
the enum values on both sides and force a migration each time we add a state.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import UUID
from sqlalchemy import CheckConstraint, DateTime, Index, Integer, Text, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import UUID as PgUUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""Shared declarative base for the store package."""
class IxJob(Base):
"""ORM mapping for the ``ix_jobs`` table.
One row per submitted extraction job. Lifecycle: pending running
(done | error). The worker is the only writer that flips status past
pending; the REST / pg_queue adapters only insert.
"""
__tablename__ = "ix_jobs"
__table_args__ = (
CheckConstraint(
"status IN ('pending', 'running', 'done', 'error')",
name="ix_jobs_status_check",
),
CheckConstraint(
"callback_status IS NULL OR callback_status IN "
"('pending', 'delivered', 'failed')",
name="ix_jobs_callback_status_check",
),
Index(
"ix_jobs_status_created",
"status",
"created_at",
postgresql_where=text("status = 'pending'"),
),
Index(
"ix_jobs_client_request",
"client_id",
"request_id",
unique=True,
),
)
job_id: Mapped[UUID] = mapped_column(PgUUID(as_uuid=True), primary_key=True)
ix_id: Mapped[str] = mapped_column(Text, nullable=False)
client_id: Mapped[str] = mapped_column(Text, nullable=False)
request_id: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[str] = mapped_column(Text, nullable=False)
request: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
response: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True)
callback_url: Mapped[str | None] = mapped_column(Text, nullable=True)
callback_status: Mapped[str | None] = mapped_column(Text, nullable=True)
attempts: Mapped[int] = mapped_column(
Integer, nullable=False, server_default=text("0")
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=text("now()"),
)
started_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
finished_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)

View file

@ -26,7 +26,7 @@ class Request(BaseModel):
model_config = ConfigDict(extra="forbid")
use_case_name: str = "Bank Statement Header"
default_model: str = "gpt-oss:20b"
default_model: str = "qwen3:14b"
system_prompt: str = (
"You extract header metadata from a single bank or credit-card statement. "
"Return only facts that appear in the document; leave a field null if uncertain. "

View file

@ -0,0 +1,7 @@
"""Async worker — pulls pending rows and runs the pipeline against them.
The worker is one asyncio task spawned by the FastAPI lifespan (see
``ix.app``). Single-concurrency per MVP spec (Ollama + Surya both want the
GPU serially). Production wiring lives in Chunk 4; until then the pipeline
factory is parameter-injected so tests pass a fakes-only Pipeline.
"""

44
src/ix/worker/callback.py Normal file
View file

@ -0,0 +1,44 @@
"""One-shot webhook callback delivery.
No retries the caller always has ``GET /jobs/{id}`` as the authoritative
fallback. We record the delivery outcome (``delivered`` / ``failed``) on the
row but never change ``status`` based on it; terminal states are stable.
Spec §5 callback semantics: one POST, 2xx delivered, anything else or
exception failed.
"""
from __future__ import annotations
from typing import Literal
import httpx
from ix.contracts.job import Job
async def deliver(
callback_url: str,
job: Job,
timeout_s: int,
) -> Literal["delivered", "failed"]:
"""POST the full :class:`Job` body to ``callback_url``; return the outcome.
``timeout_s`` caps both connect and read we don't configure them
separately for callbacks because the endpoint is caller-supplied and we
don't have a reason to treat slow-to-connect differently from slow-to-
respond. Any exception (connection error, timeout, non-2xx) collapses to
``"failed"``.
"""
try:
async with httpx.AsyncClient(timeout=timeout_s) as client:
response = await client.post(
callback_url,
json=job.model_dump(mode="json"),
)
if 200 <= response.status_code < 300:
return "delivered"
return "failed"
except Exception:
return "failed"

179
src/ix/worker/loop.py Normal file
View file

@ -0,0 +1,179 @@
"""Worker loop — claim pending rows, run pipeline, write terminal state.
One ``Worker`` instance per process. The loop body is:
1. Claim the next pending row (``FOR UPDATE SKIP LOCKED``). If none, wait
for the notify event or the poll interval, whichever fires first.
2. Build a fresh Pipeline via the injected factory and run it.
3. Write the response via ``mark_done`` (spec's ``done iff error is None``
invariant). If the pipeline itself raised (it shouldn't — steps catch
IXException internally but belt-and-braces), we stuff an
``IX_002_000`` into ``response.error`` and mark_error.
4. If the job has a ``callback_url``, POST once, record the outcome.
Startup pre-amble:
* Run ``sweep_orphans(now, 2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS)`` once
before the loop starts. Recovers rows left in ``running`` by a crashed
previous process.
The "wait for work" hook is a callable so Task 3.6's PgQueueListener can
plug in later without the worker needing to know anything about LISTEN.
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from ix.contracts.response import ResponseIX
from ix.errors import IXErrorCode, IXException
from ix.pipeline.pipeline import Pipeline
from ix.store import jobs_repo
from ix.worker import callback as cb
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
PipelineFactory = Callable[[], Pipeline]
WaitForWork = Callable[[float], "asyncio.Future[None] | asyncio.Task[None]"]
class Worker:
"""Single-concurrency worker loop.
Parameters
----------
session_factory:
async_sessionmaker bound to an engine on the current event loop.
pipeline_factory:
Zero-arg callable returning a fresh :class:`Pipeline`. In production
this builds the real pipeline with Ollama + Surya; in tests it
returns a Pipeline of fakes.
poll_interval_seconds:
Fallback poll cadence when no notify wakes us (spec: 10 s default).
max_running_seconds:
Threshold passed to :func:`sweep_orphans` at startup.
Production wiring passes ``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS``.
callback_timeout_seconds:
Timeout for the webhook POST per spec §5.
wait_for_work:
Optional coroutine-factory. When set, the worker awaits it instead
of ``asyncio.sleep``. Task 3.6 passes the PgQueueListener's
notify-or-poll helper.
"""
def __init__(
self,
*,
session_factory: async_sessionmaker[AsyncSession],
pipeline_factory: PipelineFactory,
poll_interval_seconds: float = 10.0,
max_running_seconds: int = 5400,
callback_timeout_seconds: int = 10,
wait_for_work: Callable[[float], asyncio.Future[None]] | None = None,
) -> None:
self._session_factory = session_factory
self._pipeline_factory = pipeline_factory
self._poll_interval = poll_interval_seconds
self._max_running_seconds = max_running_seconds
self._callback_timeout = callback_timeout_seconds
self._wait_for_work = wait_for_work
async def run(self, stop: asyncio.Event) -> None:
"""Drive the claim-run-write-callback loop until ``stop`` is set."""
await self._startup_sweep()
while not stop.is_set():
async with self._session_factory() as session:
job = await jobs_repo.claim_next_pending(session)
await session.commit()
if job is None:
await self._sleep_or_wake(stop)
continue
await self._run_one(job)
async def _startup_sweep(self) -> None:
"""Rescue ``running`` rows left behind by a previous crash."""
async with self._session_factory() as session:
await jobs_repo.sweep_orphans(
session,
datetime.now(UTC),
self._max_running_seconds,
)
await session.commit()
async def _sleep_or_wake(self, stop: asyncio.Event) -> None:
"""Either run the custom wait hook or sleep the poll interval.
We cap the wait at either the poll interval or the stop signal,
whichever fires first without this, a worker with no notify hook
would happily sleep for 10 s while the outer app is trying to shut
down.
"""
stop_task = asyncio.create_task(stop.wait())
try:
if self._wait_for_work is not None:
wake_task = asyncio.ensure_future(
self._wait_for_work(self._poll_interval)
)
else:
wake_task = asyncio.create_task(
asyncio.sleep(self._poll_interval)
)
try:
await asyncio.wait(
{stop_task, wake_task},
return_when=asyncio.FIRST_COMPLETED,
)
finally:
if not wake_task.done():
wake_task.cancel()
finally:
if not stop_task.done():
stop_task.cancel()
async def _run_one(self, job) -> None: # type: ignore[no-untyped-def]
"""Run the pipeline for one job; persist the outcome + callback."""
pipeline = self._pipeline_factory()
try:
response = await pipeline.start(job.request)
except Exception as exc:
# The pipeline normally catches IXException itself. Non-IX
# failures land here. We wrap the message in IX_002_000 so the
# caller sees a stable code.
ix_exc = IXException(IXErrorCode.IX_002_000, detail=str(exc))
response = ResponseIX(error=str(ix_exc))
async with self._session_factory() as session:
await jobs_repo.mark_error(session, job.job_id, response)
await session.commit()
else:
async with self._session_factory() as session:
await jobs_repo.mark_done(session, job.job_id, response)
await session.commit()
if job.callback_url:
await self._deliver_callback(job.job_id, job.callback_url)
async def _deliver_callback(self, job_id, callback_url: str) -> None: # type: ignore[no-untyped-def]
# Re-fetch the job so the callback payload reflects the final terminal
# state + response. Cheaper than threading the freshly-marked state
# back out of ``mark_done``, and keeps the callback body canonical.
async with self._session_factory() as session:
final = await jobs_repo.get(session, job_id)
if final is None:
return
status = await cb.deliver(callback_url, final, self._callback_timeout)
async with self._session_factory() as session:
await jobs_repo.update_callback_status(session, job_id, status)
await session.commit()

98
tests/fixtures/synthetic_giro.pdf vendored Normal file
View file

@ -0,0 +1,98 @@
%PDF-1.7
%µ¶
% Written by MuPDF 1.27.2
1 0 obj
<</Type/Catalog/Pages 2 0 R/Info<</Producer(MuPDF 1.27.2)>>>>
endobj
2 0 obj
<</Type/Pages/Count 1/Kids[4 0 R]>>
endobj
3 0 obj
<</Font<</helv 5 0 R>>>>
endobj
4 0 obj
<</Type/Page/MediaBox[0 0 595 842]/Rotate 0/Resources 3 0 R/Parent 2 0 R/Contents[6 0 R 7 0 R 8 0 R 9 0 R 10 0 R 11 0 R]>>
endobj
5 0 obj
<</Type/Font/Subtype/Type1/BaseFont/Helvetica/Encoding/WinAnsiEncoding>>
endobj
6 0 obj
<</Length 54>>
stream
q
BT
1 0 0 1 72 770 Tm
/helv 12 Tf [<444b42>]TJ
ET
Q
endstream
endobj
7 0 obj
<</Length 95/Filter/FlateDecode>>
stream
ˆ1
€@ û¼"?ð¬žX6vB:±°P,´°ñýæXf†^<1A>„SL8+g4ìU×q,Ê~òÚ£ƒBpØ® @m­uf-òÅu4 K¸Ô4l>Óä´Ð9
endstream
endobj
8 0 obj
<</Length 105/Filter/FlateDecode>>
stream
xÚe‰±
ACûùŠùg2»3b!ØØ ÛÉ·‡…6~¿é%ÉK ò‘ËW£\ 4t¼å𜯯:÷®<C3B7>S<EFBFBD>jéLÏ<4C>™Õ`eÙ yÌ=[¬°°pL2H° ÃÆ'þŸó2nrr—Ò
endstream
endobj
9 0 obj
<</Length 100/Filter/FlateDecode>>
stream
xÚ Ã±
Â@EÑ~¾bþÀ™7»o ˆ…`c'LR„°Á")lü~÷^Ž|åâjc×åtÕ<åòéÇOš»Î·²7ceç44Aç6tk¬°ð@Dô¨AX©#Ü—|É3å-Åyd
endstream
endobj
10 0 obj
<</Length 99/Filter/FlateDecode>>
stream
ˆ1
B1û=ÅÞÀÝ÷’±lì„íÄB$ -l<¿™©fìk§²ôX¦¸FóúØî5ß?Oxm~;4ê©mP{M „ \'WQ<57><><E2809C><EFBFBD>IˆÖ8Þëb粫ý·V
endstream
endobj
11 0 obj
<</Length 93/Filter/FlateDecode>>
stream
xÚ-ˆ;
€@ ûœ"70ŸÝl#ÍvB:±\±ÐÂÆóBó)ÆX-ú ÝÙ®\ú¬%Ùö •$dÑMHUYš†ã%,jÃê&‡>NT
endstream
endobj
xref
0 12
0000000000 65535 f
0000000042 00000 n
0000000120 00000 n
0000000172 00000 n
0000000213 00000 n
0000000352 00000 n
0000000441 00000 n
0000000544 00000 n
0000000707 00000 n
0000000881 00000 n
0000001050 00000 n
0000001218 00000 n
trailer
<</Size 12/Root 1 0 R/ID[<C3B4C38E004FC2B6C3A0C2BF4C00C282><890F3E53B827FF9B00CB90D2895721FC>]>>
startxref
1380
%%EOF

View file

@ -0,0 +1,124 @@
"""Integration-test fixtures — real Postgres required.
Policy: tests that import these fixtures skip cleanly when no DB is
configured. We check ``IX_TEST_DATABASE_URL`` first (local developer
override, usually a disposable docker container), then ``IX_POSTGRES_URL``
(what Forgejo Actions already sets). If neither is present the fixture
short-circuits with ``pytest.skip`` so a developer running
``pytest tests/unit`` in an unconfigured shell doesn't see the integration
suite hang or raise cryptic ``OperationalError``.
Schema lifecycle:
* session scope: ``alembic upgrade head`` once, ``alembic downgrade base``
at session end. We tried ``Base.metadata.create_all`` at first faster,
but it meant migrations stayed untested by the integration suite and a
developer who broke ``001_initial_ix_jobs.py`` wouldn't find out until
deploy. Current shape keeps migrations in the hot path.
* per-test: ``TRUNCATE ix_jobs`` (via the ``_reset_schema`` autouse fixture)
faster than recreating the schema and preserves indexes/constraints so
tests that want to assert ON a unique-violation path actually get one.
"""
from __future__ import annotations
import os
import subprocess
import sys
from collections.abc import AsyncIterator, Iterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
REPO_ROOT = Path(__file__).resolve().parents[2]
def _resolve_postgres_url() -> str | None:
"""Pick the database URL per policy: test override → CI URL → none."""
return os.environ.get("IX_TEST_DATABASE_URL") or os.environ.get("IX_POSTGRES_URL")
@pytest.fixture(scope="session")
def postgres_url() -> str:
url = _resolve_postgres_url()
if not url:
pytest.skip(
"no postgres configured — set IX_TEST_DATABASE_URL or IX_POSTGRES_URL"
)
return url
def _run_alembic(direction: str, postgres_url: str) -> None:
"""Invoke Alembic in a subprocess so its ``asyncio.run`` inside ``env.py``
doesn't collide with the pytest-asyncio event loop.
We pass the URL via ``IX_POSTGRES_URL`` not ``-x url=...`` because
percent-encoded characters in developer passwords trip up alembic's
configparser-backed ini loader. The env var lane skips configparser.
"""
env = os.environ.copy()
env["IX_POSTGRES_URL"] = postgres_url
subprocess.run(
[sys.executable, "-m", "alembic", direction, "head" if direction == "upgrade" else "base"],
cwd=REPO_ROOT,
env=env,
check=True,
)
@pytest.fixture(scope="session", autouse=True)
def _prepare_schema(postgres_url: str) -> Iterator[None]:
"""Run migrations once per session, torn down at the end.
pytest-asyncio creates one event loop per test (function-scoped by
default) and asyncpg connections can't survive a loop switch. That
forces a function-scoped engine below but migrations are expensive,
so we keep those session-scoped via a subprocess call (no loop
involvement at all).
"""
_run_alembic("downgrade", postgres_url)
_run_alembic("upgrade", postgres_url)
yield
_run_alembic("downgrade", postgres_url)
@pytest_asyncio.fixture
async def engine(postgres_url: str) -> AsyncIterator[AsyncEngine]:
"""Per-test async engine.
Built fresh each test so its asyncpg connections live on the same loop
as the test itself. Dispose on teardown otherwise asyncpg leaks tasks
into the next test's loop and we get ``got Future attached to a
different loop`` errors on the second test in a file.
"""
eng = create_async_engine(postgres_url, pool_pre_ping=True)
try:
yield eng
finally:
await eng.dispose()
@pytest_asyncio.fixture
async def session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
"""Per-test session factory. ``expire_on_commit=False`` per prod parity."""
return async_sessionmaker(engine, expire_on_commit=False)
@pytest_asyncio.fixture(autouse=True)
async def _reset_schema(engine: AsyncEngine) -> None:
"""Truncate ix_jobs between tests so each test starts from empty state."""
async with engine.begin() as conn:
await conn.exec_driver_sql("TRUNCATE ix_jobs")

View file

@ -0,0 +1,367 @@
"""Integration tests for :mod:`ix.store.jobs_repo` — run against a real DB.
Every test exercises one repo method end-to-end. A few go further and
concurrently spin up two sessions to demonstrate the claim query behaves
correctly under ``SKIP LOCKED`` (two claimers should never see the same row).
Skipped cleanly when no Postgres is configured see integration/conftest.py.
"""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
from uuid import UUID, uuid4
from ix.contracts.request import Context, RequestIX
from ix.contracts.response import ResponseIX
from ix.store import jobs_repo
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
def _make_request(client: str = "mammon", request_id: str = "r-1") -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id=client,
request_id=request_id,
context=Context(texts=["hello"]),
)
async def test_insert_pending_creates_row_and_assigns_ix_id(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
assert job.status == "pending"
assert isinstance(job.job_id, UUID)
# ix_id is a 16-hex string per spec §3 — transport-assigned.
assert isinstance(job.ix_id, str)
assert len(job.ix_id) == 16
assert all(c in "0123456789abcdef" for c in job.ix_id)
assert job.attempts == 0
async def test_insert_pending_is_idempotent_on_correlation_key(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""(client_id, request_id) collides → existing row comes back unchanged."""
async with session_factory() as session:
first = await jobs_repo.insert_pending(
session, _make_request("mammon", "same-id"), callback_url="http://x/cb"
)
await session.commit()
async with session_factory() as session:
second = await jobs_repo.insert_pending(
session, _make_request("mammon", "same-id"), callback_url="http://y/cb"
)
await session.commit()
assert second.job_id == first.job_id
assert second.ix_id == first.ix_id
# The callback_url of the FIRST insert wins — we don't overwrite.
assert second.callback_url == "http://x/cb"
async def test_get_returns_full_job(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
async with session_factory() as session:
fetched = await jobs_repo.get(session, inserted.job_id)
assert fetched is not None
assert fetched.job_id == inserted.job_id
assert fetched.request.use_case == "bank_statement_header"
assert fetched.status == "pending"
async def test_get_unknown_id_returns_none(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
result = await jobs_repo.get(session, uuid4())
assert result is None
async def test_get_by_correlation(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request("mammon", "req-42"), callback_url=None
)
await session.commit()
async with session_factory() as session:
found = await jobs_repo.get_by_correlation(session, "mammon", "req-42")
assert found is not None
assert found.job_id == inserted.job_id
async with session_factory() as session:
missing = await jobs_repo.get_by_correlation(session, "mammon", "nope")
assert missing is None
async def test_claim_next_pending_advances_status(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is not None
assert claimed.job_id == inserted.job_id
assert claimed.status == "running"
assert claimed.started_at is not None
async def test_claim_next_pending_returns_none_when_empty(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is None
async def test_claim_next_pending_skips_locked(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Two concurrent claimers pick different rows (SKIP LOCKED in action)."""
async with session_factory() as session:
a = await jobs_repo.insert_pending(
session, _make_request("c", "a"), callback_url=None
)
b = await jobs_repo.insert_pending(
session, _make_request("c", "b"), callback_url=None
)
await session.commit()
session_a = session_factory()
session_b = session_factory()
try:
# Start the first claim but *don't* commit yet — its row is locked.
first = await jobs_repo.claim_next_pending(session_a)
# Second claimer runs while the first is still holding its lock. It
# must see the 'a' row as pending but SKIP it, returning the 'b' row.
second = await jobs_repo.claim_next_pending(session_b)
assert first is not None and second is not None
assert {first.job_id, second.job_id} == {a.job_id, b.job_id}
assert first.job_id != second.job_id
await session_a.commit()
await session_b.commit()
finally:
await session_a.close()
await session_b.close()
async def test_mark_done_writes_response_and_finishes(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
response = ResponseIX(
use_case="bank_statement_header",
ix_client_id="mammon",
request_id="r-1",
)
async with session_factory() as session:
await jobs_repo.mark_done(session, inserted.job_id, response)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "done"
assert after.response is not None
assert after.finished_at is not None
async def test_mark_done_with_error_response_moves_to_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""`done` iff response.error is None — otherwise status='error'."""
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
bad = ResponseIX(error="IX_002_000: boom")
async with session_factory() as session:
await jobs_repo.mark_done(session, inserted.job_id, bad)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "error"
assert after.response is not None
assert (after.response.error or "").startswith("IX_002_000")
async def test_mark_error_always_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
bad = ResponseIX(error="IX_000_005: unsupported")
async with session_factory() as session:
await jobs_repo.mark_error(session, inserted.job_id, bad)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "error"
async def test_update_callback_status(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url="http://cb"
)
await session.commit()
async with session_factory() as session:
await jobs_repo.update_callback_status(session, inserted.job_id, "delivered")
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.callback_status == "delivered"
async def test_sweep_orphans_resets_stale_running(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Running rows older than (now - max_running_seconds) go back to pending."""
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
# Backdate started_at by an hour to simulate a crashed worker mid-job.
async with session_factory() as session:
from sqlalchemy import text
stale = datetime.now(UTC) - timedelta(hours=1)
await session.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{"t": stale, "jid": inserted.job_id},
)
await session.commit()
# Max age of 60 s → our hour-old row gets swept.
async with session_factory() as session:
rescued = await jobs_repo.sweep_orphans(
session, datetime.now(UTC), max_running_seconds=60
)
await session.commit()
assert inserted.job_id in rescued
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "pending"
assert after.attempts == 1
async def test_sweep_orphans_leaves_fresh_running_alone(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""A just-claimed row must not get reclaimed by the sweeper."""
async with session_factory() as session:
await jobs_repo.insert_pending(session, _make_request(), callback_url=None)
await session.commit()
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is not None
# Sweep with a huge threshold (1 hour). Our just-claimed row is fresh, so
# it stays running.
async with session_factory() as session:
rescued = await jobs_repo.sweep_orphans(
session, datetime.now(UTC), max_running_seconds=3600
)
await session.commit()
assert rescued == []
async with session_factory() as session:
after = await jobs_repo.get(session, claimed.job_id)
assert after is not None
assert after.status == "running"
async def test_concurrent_claim_never_double_dispatches(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Spin a batch of concurrent claimers; every insert is claimed exactly once."""
async with session_factory() as session:
ids = []
for i in range(5):
job = await jobs_repo.insert_pending(
session, _make_request("mass", f"r-{i}"), callback_url=None
)
ids.append(job.job_id)
await session.commit()
async def claim_one() -> UUID | None:
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
return claimed.job_id if claimed else None
results = await asyncio.gather(*(claim_one() for _ in range(10)))
non_null = [r for r in results if r is not None]
# Every inserted id appears at most once.
assert sorted(non_null) == sorted(ids)

View file

@ -0,0 +1,153 @@
"""Integration tests for the PgQueueListener + worker integration (Task 3.6).
Two scenarios:
1. NOTIFY delivered worker wakes within ~1 s and picks the row up.
2. Missed NOTIFY the row still gets picked up by the fallback poll.
Both run a real worker + listener against a live Postgres. We drive them via
``asyncio.gather`` + a "until done" watchdog.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from sqlalchemy import text
from ix.adapters.pg_queue.listener import PgQueueListener, asyncpg_dsn_from_sqlalchemy_url
from ix.contracts.request import Context, RequestIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Same minimal fake as test_worker_loop — keeps these suites independent."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
return response_ix
def _factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
async def _wait_for_status(
session_factory: async_sessionmaker[AsyncSession],
job_id,
target: str,
timeout_s: float,
) -> bool:
deadline = asyncio.get_event_loop().time() + timeout_s
while asyncio.get_event_loop().time() < deadline:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is not None and job.status == target:
return True
await asyncio.sleep(0.1)
return False
async def test_notify_wakes_worker_within_2s(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Direct INSERT + NOTIFY → worker picks it up fast (not via the poll)."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# 60 s fallback poll — if we still find the row within 2 s it's
# because NOTIFY woke us, not the poll.
poll_interval_seconds=60.0,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
# Give the worker one tick to reach the sleep_or_wake branch.
await asyncio.sleep(0.3)
# Insert a pending row manually + NOTIFY — simulates a direct-SQL client
# like an external batch script.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="notify-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
async with session_factory() as session:
await session.execute(
text(f"NOTIFY ix_jobs_new, '{job.job_id}'")
)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 3.0), (
"worker didn't pick up the NOTIFY'd row in time"
)
stop.set()
await worker_task
await listener.stop()
async def test_missed_notify_falls_back_to_poll(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Row lands without a NOTIFY; fallback poll still picks it up."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# Short poll so the fallback kicks in quickly — we need the test
# to finish in seconds, not the spec's 10 s.
poll_interval_seconds=0.5,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
await asyncio.sleep(0.3)
# Insert without NOTIFY: simulate a buggy writer.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="missed-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 5.0), (
"fallback poll didn't pick up the row"
)
stop.set()
await worker_task
await listener.stop()

View file

@ -0,0 +1,179 @@
"""Integration tests for the FastAPI REST adapter (spec §5).
Uses ``fastapi.testclient.TestClient`` against a real DB. Ollama / OCR probes
are stubbed via the DI hooks the routes expose for testing in Chunk 4 the
production probes swap in.
"""
from __future__ import annotations
from collections.abc import Iterator
from uuid import uuid4
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep
from ix.app import create_app
def _factory_for_url(postgres_url: str): # type: ignore[no-untyped-def]
"""Build a TestClient-compatible session factory.
TestClient runs the ASGI app on its own dedicated event loop (the one it
creates in its sync wrapper), distinct from the per-test loop
pytest-asyncio gives direct tests. Session factories must therefore be
constructed from an engine that was itself created on that inner loop.
We do this lazily: each dependency resolution creates a fresh engine +
factory on the current running loop, which is the TestClient's loop at
route-invocation time. Engine reuse would drag the cross-loop futures
that asyncpg hates back in.
"""
def _factory(): # type: ignore[no-untyped-def]
eng = create_async_engine(postgres_url, pool_pre_ping=True)
return async_sessionmaker(eng, expire_on_commit=False)
return _factory
@pytest.fixture
def app(postgres_url: str) -> Iterator[TestClient]:
"""Spin up the FastAPI app wired to the test DB + stub probes."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok",
ocr=lambda: "ok",
)
with TestClient(app_obj) as client:
yield client
def _valid_request_body(client_id: str = "mammon", request_id: str = "r-1") -> dict:
return {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {"texts": ["hello world"]},
}
def test_post_jobs_creates_pending(app: TestClient) -> None:
resp = app.post("/jobs", json=_valid_request_body())
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["status"] == "pending"
assert len(body["ix_id"]) == 16
assert body["job_id"]
def test_post_jobs_idempotent_returns_200(app: TestClient) -> None:
first = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert first.status_code == 201
first_body = first.json()
second = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert second.status_code == 200
second_body = second.json()
assert second_body["job_id"] == first_body["job_id"]
assert second_body["ix_id"] == first_body["ix_id"]
def test_get_job_by_id(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body()).json()
resp = app.get(f"/jobs/{created['job_id']}")
assert resp.status_code == 200
body = resp.json()
assert body["job_id"] == created["job_id"]
assert body["request"]["use_case"] == "bank_statement_header"
assert body["status"] == "pending"
def test_get_job_404(app: TestClient) -> None:
resp = app.get(f"/jobs/{uuid4()}")
assert resp.status_code == 404
def test_get_by_correlation_query(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body("mammon", "corr-1")).json()
resp = app.get("/jobs", params={"client_id": "mammon", "request_id": "corr-1"})
assert resp.status_code == 200
assert resp.json()["job_id"] == created["job_id"]
missing = app.get("/jobs", params={"client_id": "mammon", "request_id": "nope"})
assert missing.status_code == 404
def test_healthz_all_ok(app: TestClient) -> None:
resp = app.get("/healthz")
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["postgres"] == "ok"
assert body["ollama"] == "ok"
assert body["ocr"] == "ok"
def test_healthz_503_on_postgres_fail(postgres_url: str) -> None:
"""Broken postgres probe → 503. Ollama/OCR still surface in the body."""
app_obj = create_app(spawn_worker=False)
def _bad_factory(): # type: ignore[no-untyped-def]
def _raise(): # type: ignore[no-untyped-def]
raise RuntimeError("db down")
return _raise
app_obj.dependency_overrides[get_session_factory_dep] = _bad_factory
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
body = resp.json()
assert body["postgres"] == "fail"
def test_healthz_degraded_ollama_is_503(postgres_url: str) -> None:
"""Per spec §5: degraded flips HTTP to 503 (only all-ok yields 200)."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "degraded", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
assert resp.json()["ollama"] == "degraded"
def test_metrics_shape(app: TestClient) -> None:
# Submit a couple of pending jobs to populate counters.
app.post("/jobs", json=_valid_request_body("mm", "a"))
app.post("/jobs", json=_valid_request_body("mm", "b"))
resp = app.get("/metrics")
assert resp.status_code == 200
body = resp.json()
for key in (
"jobs_pending",
"jobs_running",
"jobs_done_24h",
"jobs_error_24h",
"by_use_case_seconds",
):
assert key in body
assert body["jobs_pending"] == 2
assert body["jobs_running"] == 0
assert isinstance(body["by_use_case_seconds"], dict)

View file

@ -0,0 +1,325 @@
"""Integration tests for the worker loop (Task 3.5).
We spin up a real worker with a fake pipeline factory and verify the lifecycle
transitions against a live DB. Callback delivery is exercised via
``pytest-httpx`` callers' webhook endpoints are mocked, not run.
"""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
import pytest
from pytest_httpx import HTTPXMock
from ix.contracts.request import Context, RequestIX
from ix.contracts.response import ResponseIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Minimal fake step that writes a sentinel field on the response."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
response_ix.ix_client_id = request_ix.ix_client_id
response_ix.request_id = request_ix.request_id
response_ix.ix_id = request_ix.ix_id
return response_ix
class _RaisingStep(Step):
"""Fake step that raises a non-IX exception to exercise the worker's
belt-and-braces error path."""
step_name = "fake_raise"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
raise RuntimeError("boom")
def _ok_factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
def _bad_factory() -> Pipeline:
return Pipeline(steps=[_RaisingStep()])
async def _insert_pending(session_factory, **kwargs): # type: ignore[no-untyped-def]
request = RequestIX(
use_case="bank_statement_header",
ix_client_id=kwargs.get("client", "test"),
request_id=kwargs.get("rid", "r-1"),
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, request, callback_url=kwargs.get("cb")
)
await session.commit()
return job
async def test_worker_runs_one_job_to_done(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
"""Wait until the job lands in a terminal state, then stop the worker."""
for _ in range(50): # 5 seconds budget
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status in ("done", "error"):
stop.set()
return
stop.set() # timeout — let the worker exit so assertions run
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
assert final.finished_at is not None
async def test_worker_pipeline_exception_marks_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""A step raising a non-IX exception → status=error, response carries the
code. The worker catches what the pipeline doesn't."""
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_bad_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "error":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "error"
assert final.response is not None
assert (final.response.error or "").startswith("IX_002_000")
async def test_worker_delivers_callback(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""callback_url on a done job → one POST, callback_status=delivered."""
httpx_mock.add_response(url="http://caller/webhook", status_code=200)
job = await _insert_pending(session_factory, cb="http://caller/webhook")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status == "delivered"
async def test_worker_marks_callback_failed_on_5xx(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
httpx_mock.add_response(url="http://caller/bad", status_code=500)
job = await _insert_pending(session_factory, cb="http://caller/bad")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done" # terminal state stays done
assert final.callback_status == "failed"
async def test_worker_sweeps_orphans_at_startup(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Stale running rows → pending before the loop starts picking work."""
# Insert a job and backdate it to mimic a crashed worker mid-run.
job = await _insert_pending(session_factory, rid="orphan")
async with session_factory() as session:
from sqlalchemy import text
stale = datetime.now(UTC) - timedelta(hours=2)
await session.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{"t": stale, "jid": job.job_id},
)
await session.commit()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
# max_running_seconds=60 so our 2-hour-old row gets swept.
max_running_seconds=60,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
# attempts starts at 0, gets +1 on sweep.
assert final.attempts >= 1
@pytest.mark.parametrize("non_matching_url", ["http://x/y", None])
async def test_worker_no_callback_leaves_callback_status_none(
session_factory: async_sessionmaker[AsyncSession],
httpx_mock: HTTPXMock,
non_matching_url: str | None,
) -> None:
"""Jobs without a callback_url should never get a callback_status set."""
if non_matching_url is not None:
# If we ever accidentally deliver, pytest-httpx will complain because
# no mock matches — which is the signal we want.
pass
job = await _insert_pending(session_factory) # cb=None by default
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status is None
def _unused() -> None:
"""Silence a ruff F401 for ResponseIX — kept for symmetry w/ other tests."""
_ = ResponseIX

0
tests/live/__init__.py Normal file
View file

View file

@ -0,0 +1,70 @@
"""Live tests for :class:`OllamaClient` — gated on ``IX_TEST_OLLAMA=1``.
Never runs in CI (Forgejo runner has no LAN access to Ollama). Run locally::
IX_TEST_OLLAMA=1 uv run pytest tests/live/test_ollama_client_live.py -v
Assumes the Ollama server at ``http://192.168.68.42:11434`` already has
``qwen3:14b`` pulled.
"""
from __future__ import annotations
import os
import pytest
from ix.genai.ollama_client import OllamaClient
from ix.use_cases.bank_statement_header import BankStatementHeader
pytestmark = [
pytest.mark.live,
pytest.mark.skipif(
os.environ.get("IX_TEST_OLLAMA") != "1",
reason="live: IX_TEST_OLLAMA=1 required",
),
]
_OLLAMA_URL = "http://192.168.68.42:11434"
_MODEL = "qwen3:14b"
async def test_structured_output_round_trip() -> None:
"""Real Ollama returns a parsed BankStatementHeader instance."""
client = OllamaClient(base_url=_OLLAMA_URL, per_call_timeout_s=300.0)
result = await client.invoke(
request_kwargs={
"model": _MODEL,
"messages": [
{
"role": "system",
"content": (
"You extract bank statement header fields. "
"Return valid JSON matching the given schema. "
"Do not invent values."
),
},
{
"role": "user",
"content": (
"Bank: Deutsche Kreditbank (DKB)\n"
"Currency: EUR\n"
"IBAN: DE89370400440532013000\n"
"Period: 2025-01-01 to 2025-01-31"
),
},
],
},
response_schema=BankStatementHeader,
)
assert isinstance(result.parsed, BankStatementHeader)
assert isinstance(result.parsed.bank_name, str)
assert result.parsed.bank_name # non-empty
assert isinstance(result.parsed.currency, str)
assert result.model_name # server echoes a model name
async def test_selfcheck_ok_against_real_server() -> None:
"""``selfcheck`` returns ``ok`` when the target model is pulled."""
client = OllamaClient(base_url=_OLLAMA_URL, per_call_timeout_s=5.0)
assert await client.selfcheck(expected_model=_MODEL) == "ok"

View file

@ -0,0 +1,83 @@
"""Live test for :class:`SuryaOCRClient` — gated on ``IX_TEST_OLLAMA=1``.
Downloads real Surya models (hundreds of MB) on first run. Never runs in
CI. Exercised locally with::
IX_TEST_OLLAMA=1 uv run pytest tests/live/test_surya_client_live.py -v
Note: requires the ``[ocr]`` extra ``uv sync --extra ocr --extra dev``.
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from ix.contracts import Page
from ix.segmentation import PageMetadata
pytestmark = [
pytest.mark.live,
pytest.mark.skipif(
os.environ.get("IX_TEST_OLLAMA") != "1",
reason="live: IX_TEST_OLLAMA=1 required",
),
]
async def test_extracts_dkb_and_iban_from_synthetic_giro() -> None:
"""Real Surya run against ``tests/fixtures/synthetic_giro.pdf``.
Assert the flat text contains ``"DKB"`` and the IBAN without spaces.
"""
from ix.ocr.surya_client import SuryaOCRClient
fixture = Path(__file__).parent.parent / "fixtures" / "synthetic_giro.pdf"
assert fixture.exists(), f"missing fixture: {fixture}"
# Build Pages the way DocumentIngestor would for this PDF: count pages
# via PyMuPDF so we pass the right number of inputs.
import fitz
doc = fitz.open(str(fixture))
try:
pages = [
Page(
page_no=i + 1,
width=float(p.rect.width),
height=float(p.rect.height),
lines=[],
)
for i, p in enumerate(doc)
]
finally:
doc.close()
client = SuryaOCRClient()
result = await client.ocr(
pages,
files=[(fixture, "application/pdf")],
page_metadata=[PageMetadata(file_index=0) for _ in pages],
)
flat_text = result.result.text or ""
# Join page-level line texts if flat text missing (shape-safety).
if not flat_text:
flat_text = "\n".join(
line.text or ""
for page in result.result.pages
for line in page.lines
)
assert "DKB" in flat_text
assert "DE89370400440532013000" in flat_text.replace(" ", "")
async def test_selfcheck_ok_against_real_predictors() -> None:
"""``selfcheck()`` returns ``ok`` once Surya's predictors load."""
from ix.ocr.surya_client import SuryaOCRClient
client = SuryaOCRClient()
assert await client.selfcheck() == "ok"

View file

@ -0,0 +1,111 @@
"""Hermetic smoke test for the Alembic migration module.
Does NOT run ``alembic upgrade head`` that requires a real database and is
exercised by the integration suite. This test only verifies the migration
module's structural integrity:
* the initial migration can be imported without side effects,
* its revision / down_revision pair is well-formed,
* ``upgrade()`` and ``downgrade()`` are callable,
* the SQL emitted by ``upgrade()`` mentions every column the spec requires.
We capture emitted SQL via ``alembic.op`` in offline mode so we don't need a
live connection. The point is that callers can look at this one test and know
the migration won't silently drift from spec §4 at import time.
"""
from __future__ import annotations
import importlib.util
from pathlib import Path
ALEMBIC_DIR = Path(__file__).resolve().parents[2] / "alembic"
INITIAL_PATH = ALEMBIC_DIR / "versions" / "001_initial_ix_jobs.py"
def _load_migration_module(path: Path):
spec = importlib.util.spec_from_file_location(f"_test_migration_{path.stem}", path)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_initial_migration_file_exists() -> None:
assert INITIAL_PATH.exists(), f"missing migration: {INITIAL_PATH}"
def test_initial_migration_revision_ids() -> None:
module = _load_migration_module(INITIAL_PATH)
# Revision must be a non-empty string; down_revision must be None for the
# initial migration (no parent).
assert isinstance(module.revision, str) and module.revision
assert module.down_revision is None
def test_initial_migration_has_upgrade_and_downgrade() -> None:
module = _load_migration_module(INITIAL_PATH)
assert callable(module.upgrade)
assert callable(module.downgrade)
def test_initial_migration_source_mentions_required_columns() -> None:
"""Spec §4 columns must all appear in the migration source.
We grep the source file rather than running the migration because running
it needs Postgres. This is belt-and-braces: if someone renames a column
they'll see this test fail and go update both sides in lockstep.
"""
source = INITIAL_PATH.read_text(encoding="utf-8")
for column in (
"job_id",
"ix_id",
"client_id",
"request_id",
"status",
"request",
"response",
"callback_url",
"callback_status",
"attempts",
"created_at",
"started_at",
"finished_at",
):
assert column in source, f"migration missing column {column!r}"
def test_initial_migration_source_mentions_indexes_and_constraint() -> None:
source = INITIAL_PATH.read_text(encoding="utf-8")
# Unique correlation index on (client_id, request_id).
assert "ix_jobs_client_request" in source
# Partial index on pending rows for the claim query.
assert "ix_jobs_status_created" in source
# CHECK constraint on status values.
assert "pending" in source and "running" in source
assert "done" in source and "error" in source
def test_models_module_declares_ix_job() -> None:
"""The ORM model mirrors the migration; both must stay in sync."""
from ix.store.models import Base, IxJob
assert IxJob.__tablename__ == "ix_jobs"
# Registered in the shared Base.metadata so alembic autogenerate could
# in principle see it — we don't rely on autogenerate, but having the
# model in the shared metadata is what lets integration tests do
# ``Base.metadata.create_all`` as a fast path when Alembic isn't desired.
assert "ix_jobs" in Base.metadata.tables
def test_engine_module_exposes_factory() -> None:
from ix.store.engine import get_engine, reset_engine
# The engine factory is lazy and idempotent. We don't actually call
# ``get_engine()`` here — that would require IX_POSTGRES_URL and a real
# DB. Just confirm the symbols exist and ``reset_engine`` is safe to call
# on a cold cache.
assert callable(get_engine)
reset_engine() # no-op when nothing is cached

View file

@ -0,0 +1,104 @@
"""Tests for ``ix.app`` lifespan / probe wiring (Task 4.3).
The lifespan selects fake clients when ``IX_TEST_MODE=fake`` and exposes
their probes via the route DI hook. These tests exercise the probe
adapter in isolation no DB, no real Ollama/Surya.
"""
from __future__ import annotations
from typing import Literal
from ix.app import _make_ocr_probe, _make_ollama_probe, build_pipeline
from ix.config import AppConfig
from ix.genai.fake import FakeGenAIClient
from ix.ocr.fake import FakeOCRClient
from ix.pipeline.genai_step import GenAIStep
from ix.pipeline.ocr_step import OCRStep
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.reliability_step import ReliabilityStep
from ix.pipeline.response_handler_step import ResponseHandlerStep
from ix.pipeline.setup_step import SetupStep
def _cfg(**overrides: object) -> AppConfig:
return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg]
class _SelfcheckOllamaClient:
async def invoke(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
self.called_with = expected_model
return "ok"
class _SelfcheckOCRClient:
async def ocr(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(self) -> Literal["ok", "fail"]:
return "ok"
class _BrokenSelfcheckOllama:
async def invoke(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
raise RuntimeError("boom")
class TestOllamaProbe:
def test_fake_client_without_selfcheck_reports_ok(self) -> None:
cfg = _cfg(test_mode="fake", default_model="gpt-oss:20b")
probe = _make_ollama_probe(FakeGenAIClient(parsed=None), cfg)
assert probe() == "ok"
def test_real_selfcheck_returns_its_verdict(self) -> None:
cfg = _cfg(default_model="gpt-oss:20b")
client = _SelfcheckOllamaClient()
probe = _make_ollama_probe(client, cfg) # type: ignore[arg-type]
assert probe() == "ok"
assert client.called_with == "gpt-oss:20b"
def test_selfcheck_exception_falls_back_to_fail(self) -> None:
cfg = _cfg(default_model="gpt-oss:20b")
probe = _make_ollama_probe(_BrokenSelfcheckOllama(), cfg) # type: ignore[arg-type]
assert probe() == "fail"
class TestOCRProbe:
def test_fake_client_without_selfcheck_reports_ok(self) -> None:
from ix.contracts.response import OCRDetails, OCRResult
probe = _make_ocr_probe(FakeOCRClient(canned=OCRResult(result=OCRDetails())))
assert probe() == "ok"
def test_real_selfcheck_returns_its_verdict(self) -> None:
probe = _make_ocr_probe(_SelfcheckOCRClient()) # type: ignore[arg-type]
assert probe() == "ok"
class TestBuildPipeline:
def test_assembles_all_five_steps_in_order(self) -> None:
from ix.contracts.response import OCRDetails, OCRResult
genai = FakeGenAIClient(parsed=None)
ocr = FakeOCRClient(canned=OCRResult(result=OCRDetails()))
cfg = _cfg(test_mode="fake")
pipeline = build_pipeline(genai, ocr, cfg)
assert isinstance(pipeline, Pipeline)
steps = pipeline._steps # type: ignore[attr-defined]
assert [type(s) for s in steps] == [
SetupStep,
OCRStep,
GenAIStep,
ReliabilityStep,
ResponseHandlerStep,
]

131
tests/unit/test_config.py Normal file
View file

@ -0,0 +1,131 @@
"""Tests for :mod:`ix.config` — the pydantic-settings ``AppConfig``.
Guardrails we care about:
1. Every env var in spec §9 round-trips with the right type.
2. Defaults match the spec exactly when no env is set.
3. Unknown IX_ vars are ignored (``extra="ignore"``) so a typo doesn't crash
the container at startup.
4. ``get_config()`` is cached same instance per process and
``get_config.cache_clear()`` rebuilds from the current environment (used by
every test here to keep them independent of process state).
"""
from __future__ import annotations
import pytest
from ix.config import AppConfig, get_config
@pytest.fixture(autouse=True)
def _reset_config_cache() -> None:
"""Flush the LRU cache around every test.
Without this, tests that set env vars would see stale data from earlier
runs because ``get_config()`` caches the first materialised instance.
"""
get_config.cache_clear()
def _clear_ix_env(monkeypatch: pytest.MonkeyPatch) -> None:
"""Scrub every IX_* var so defaults surface predictably.
Tests that exercise env-based overrides still call ``monkeypatch.setenv``
after this to dial in specific values; tests for defaults rely on this
scrubbing so a developer's local ``.env`` can't contaminate the assertion.
"""
import os
for key in list(os.environ):
if key.startswith("IX_"):
monkeypatch.delenv(key, raising=False)
def test_defaults_match_spec(monkeypatch: pytest.MonkeyPatch) -> None:
_clear_ix_env(monkeypatch)
# Don't let pydantic-settings pick up the repo's .env.example.
cfg = AppConfig(_env_file=None) # type: ignore[call-arg]
assert cfg.postgres_url == (
"postgresql+asyncpg://infoxtractor:<password>"
"@127.0.0.1:5431/infoxtractor"
)
assert cfg.ollama_url == "http://127.0.0.1:11434"
assert cfg.default_model == "qwen3:14b"
assert cfg.ocr_engine == "surya"
assert cfg.tmp_dir == "/tmp/ix"
assert cfg.pipeline_worker_concurrency == 1
assert cfg.pipeline_request_timeout_seconds == 2700
assert cfg.genai_call_timeout_seconds == 1500
assert cfg.file_max_bytes == 52428800
assert cfg.file_connect_timeout_seconds == 10
assert cfg.file_read_timeout_seconds == 30
assert cfg.render_max_pixels_per_page == 75000000
assert cfg.log_level == "INFO"
assert cfg.callback_timeout_seconds == 10
def test_env_overrides(monkeypatch: pytest.MonkeyPatch) -> None:
_clear_ix_env(monkeypatch)
monkeypatch.setenv("IX_POSTGRES_URL", "postgresql+asyncpg://u:p@db:5432/x")
monkeypatch.setenv("IX_OLLAMA_URL", "http://llm:11434")
monkeypatch.setenv("IX_DEFAULT_MODEL", "llama3:8b")
monkeypatch.setenv("IX_PIPELINE_WORKER_CONCURRENCY", "4")
monkeypatch.setenv("IX_GENAI_CALL_TIMEOUT_SECONDS", "60")
monkeypatch.setenv("IX_LOG_LEVEL", "DEBUG")
monkeypatch.setenv("IX_CALLBACK_TIMEOUT_SECONDS", "30")
cfg = AppConfig(_env_file=None) # type: ignore[call-arg]
assert cfg.postgres_url == "postgresql+asyncpg://u:p@db:5432/x"
assert cfg.ollama_url == "http://llm:11434"
assert cfg.default_model == "llama3:8b"
assert cfg.pipeline_worker_concurrency == 4
assert cfg.genai_call_timeout_seconds == 60
assert cfg.log_level == "DEBUG"
assert cfg.callback_timeout_seconds == 30
def test_get_config_is_cached(monkeypatch: pytest.MonkeyPatch) -> None:
_clear_ix_env(monkeypatch)
monkeypatch.setenv("IX_POSTGRES_URL", "postgresql+asyncpg://a:b@c:5432/d1")
first = get_config()
# Later mutation must NOT be seen until cache_clear — this is a feature,
# not a bug: config is process-level state, not per-call.
monkeypatch.setenv("IX_POSTGRES_URL", "postgresql+asyncpg://a:b@c:5432/d2")
second = get_config()
assert first is second
assert second.postgres_url.endswith("/d1")
get_config.cache_clear()
third = get_config()
assert third is not first
assert third.postgres_url.endswith("/d2")
def test_extra_env_keys_are_ignored(monkeypatch: pytest.MonkeyPatch) -> None:
"""A typo'd IX_FOOBAR should not raise ValidationError at startup."""
_clear_ix_env(monkeypatch)
monkeypatch.setenv("IX_FOOBAR", "whatever")
# Should not raise.
cfg = AppConfig(_env_file=None) # type: ignore[call-arg]
assert cfg.ollama_url.startswith("http://")
def test_engine_uses_config_url(monkeypatch: pytest.MonkeyPatch) -> None:
"""``ix.store.engine`` reads the URL through ``AppConfig``.
Task 3.2 refactors engine.py to go through ``get_config()`` instead of
reading ``os.environ`` directly. We can't actually construct an async
engine in a unit test (would need the DB), so we verify the resolution
function exists and returns the configured URL.
"""
_clear_ix_env(monkeypatch)
monkeypatch.setenv("IX_POSTGRES_URL", "postgresql+asyncpg://a:b@c:5432/d")
from ix.store.engine import _resolve_url
assert _resolve_url() == "postgresql+asyncpg://a:b@c:5432/d"

View file

@ -0,0 +1,60 @@
"""Tests for the GenAI + OCR factories (Task 4.3).
The factories pick between fake and real clients based on
``IX_TEST_MODE``. CI runs with ``IX_TEST_MODE=fake``, production runs
without so the selection knob is the one lever between hermetic CI and
real clients.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.genai import make_genai_client
from ix.genai.fake import FakeGenAIClient
from ix.genai.ollama_client import OllamaClient
from ix.ocr import make_ocr_client
from ix.ocr.fake import FakeOCRClient
from ix.ocr.surya_client import SuryaOCRClient
def _cfg(**overrides: object) -> AppConfig:
"""Build an AppConfig without loading the repo's .env.example."""
return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg]
class TestGenAIFactory:
def test_fake_mode_returns_fake(self) -> None:
cfg = _cfg(test_mode="fake")
client = make_genai_client(cfg)
assert isinstance(client, FakeGenAIClient)
def test_production_returns_ollama_with_configured_url(self) -> None:
cfg = _cfg(
test_mode=None,
ollama_url="http://ollama.host:11434",
genai_call_timeout_seconds=42,
)
client = make_genai_client(cfg)
assert isinstance(client, OllamaClient)
# Inspect the private attrs for binding correctness.
assert client._base_url == "http://ollama.host:11434"
assert client._per_call_timeout_s == 42
class TestOCRFactory:
def test_fake_mode_returns_fake(self) -> None:
cfg = _cfg(test_mode="fake")
client = make_ocr_client(cfg)
assert isinstance(client, FakeOCRClient)
def test_production_surya_returns_surya(self) -> None:
cfg = _cfg(test_mode=None, ocr_engine="surya")
client = make_ocr_client(cfg)
assert isinstance(client, SuryaOCRClient)
def test_unknown_engine_raises(self) -> None:
cfg = _cfg(test_mode=None, ocr_engine="tesseract")
import pytest
with pytest.raises(ValueError, match="ocr_engine"):
make_ocr_client(cfg)

View file

@ -0,0 +1,378 @@
"""Tests for :class:`ix.pipeline.genai_step.GenAIStep` (spec §6.3, §7, §9.2)."""
from __future__ import annotations
from typing import Any
import httpx
import pytest
from pydantic import BaseModel, ValidationError
from ix.contracts import (
Context,
GenAIOptions,
Line,
OCRDetails,
OCROptions,
OCRResult,
Options,
Page,
ProvenanceData,
ProvenanceOptions,
RequestIX,
ResponseIX,
SegmentCitation,
)
from ix.contracts.response import _InternalContext
from ix.errors import IXErrorCode, IXException
from ix.genai import FakeGenAIClient, GenAIInvocationResult, GenAIUsage
from ix.pipeline.genai_step import GenAIStep
from ix.segmentation import PageMetadata, SegmentIndex
from ix.use_cases.bank_statement_header import BankStatementHeader
from ix.use_cases.bank_statement_header import Request as BankReq
def _make_request(
*,
use_ocr: bool = True,
ocr_only: bool = False,
include_provenance: bool = True,
model_name: str | None = None,
) -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id="test",
request_id="r-1",
context=Context(files=[], texts=[]),
options=Options(
ocr=OCROptions(use_ocr=use_ocr, ocr_only=ocr_only),
gen_ai=GenAIOptions(gen_ai_model_name=model_name),
provenance=ProvenanceOptions(
include_provenance=include_provenance,
max_sources_per_field=5,
),
),
)
def _ocr_with_lines(lines: list[str]) -> OCRResult:
return OCRResult(
result=OCRDetails(
text="\n".join(lines),
pages=[
Page(
page_no=1,
width=100.0,
height=200.0,
lines=[
Line(text=t, bounding_box=[0, i * 10, 10, i * 10, 10, i * 10 + 5, 0, i * 10 + 5])
for i, t in enumerate(lines)
],
)
],
)
)
def _response_with_segment_index(
lines: list[str], texts: list[str] | None = None
) -> ResponseIX:
ocr = _ocr_with_lines(lines)
resp = ResponseIX(ocr_result=ocr)
seg_idx = SegmentIndex.build(
ocr_result=ocr,
granularity="line",
pages_metadata=[PageMetadata(file_index=0)],
)
resp.context = _InternalContext(
use_case_request=BankReq(),
use_case_response=BankStatementHeader,
segment_index=seg_idx,
texts=texts or [],
pages=ocr.result.pages,
page_metadata=[PageMetadata(file_index=0)],
)
return resp
class CapturingClient:
"""Records the request_kwargs + response_schema handed to invoke()."""
def __init__(self, parsed: Any) -> None:
self._parsed = parsed
self.request_kwargs: dict[str, Any] | None = None
self.response_schema: type[BaseModel] | None = None
async def invoke(
self,
request_kwargs: dict[str, Any],
response_schema: type[BaseModel],
) -> GenAIInvocationResult:
self.request_kwargs = request_kwargs
self.response_schema = response_schema
return GenAIInvocationResult(
parsed=self._parsed,
usage=GenAIUsage(prompt_tokens=5, completion_tokens=7),
model_name="captured-model",
)
class TestValidate:
async def test_ocr_only_skips(self) -> None:
step = GenAIStep(
genai_client=FakeGenAIClient(parsed=BankStatementHeader(bank_name="x", currency="EUR"))
)
req = _make_request(ocr_only=True)
resp = _response_with_segment_index(lines=["hello"])
assert await step.validate(req, resp) is False
async def test_empty_context_raises_IX_001_000(self) -> None:
step = GenAIStep(
genai_client=FakeGenAIClient(parsed=BankStatementHeader(bank_name="x", currency="EUR"))
)
req = _make_request()
resp = ResponseIX(ocr_result=OCRResult(result=OCRDetails(text="")))
resp.context = _InternalContext(
use_case_request=BankReq(),
use_case_response=BankStatementHeader,
texts=[],
)
with pytest.raises(IXException) as ei:
await step.validate(req, resp)
assert ei.value.code is IXErrorCode.IX_001_000
async def test_runs_when_texts_only(self) -> None:
step = GenAIStep(
genai_client=FakeGenAIClient(parsed=BankStatementHeader(bank_name="x", currency="EUR"))
)
req = _make_request()
resp = ResponseIX(ocr_result=OCRResult(result=OCRDetails(text="")))
resp.context = _InternalContext(
use_case_request=BankReq(),
use_case_response=BankStatementHeader,
texts=["some paperless text"],
)
assert await step.validate(req, resp) is True
async def test_runs_when_ocr_text_present(self) -> None:
step = GenAIStep(
genai_client=FakeGenAIClient(parsed=BankStatementHeader(bank_name="x", currency="EUR"))
)
req = _make_request()
resp = _response_with_segment_index(lines=["hello"])
assert await step.validate(req, resp) is True
class TestProcessBasic:
async def test_writes_ix_result_and_meta(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
resp = await step.process(req, resp)
assert resp.ix_result.result["bank_name"] == "DKB"
assert resp.ix_result.result["currency"] == "EUR"
assert resp.ix_result.meta_data["model_name"] == "captured-model"
assert resp.ix_result.meta_data["token_usage"]["prompt_tokens"] == 5
assert resp.ix_result.meta_data["token_usage"]["completion_tokens"] == 7
class TestSystemPromptAssembly:
async def test_citation_instruction_appended_when_provenance_on(self) -> None:
parsed_wrapped: Any = _WrappedResponse(
result=BankStatementHeader(bank_name="DKB", currency="EUR"),
segment_citations=[],
)
client = CapturingClient(parsed=parsed_wrapped)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=True)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
messages = client.request_kwargs["messages"] # type: ignore[index]
system = messages[0]["content"]
# Use-case system prompt is always there.
assert "extract header metadata" in system
# Citation instruction added.
assert "segment_citations" in system
assert "value_segment_ids" in system
async def test_citation_instruction_absent_when_provenance_off(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
messages = client.request_kwargs["messages"] # type: ignore[index]
system = messages[0]["content"]
assert "segment_citations" not in system
class TestUserTextFormat:
async def test_tagged_prompt_when_provenance_on(self) -> None:
parsed_wrapped: Any = _WrappedResponse(
result=BankStatementHeader(bank_name="DKB", currency="EUR"),
segment_citations=[],
)
client = CapturingClient(parsed=parsed_wrapped)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=True)
resp = _response_with_segment_index(lines=["alpha line", "beta line"])
await step.process(req, resp)
user_content = client.request_kwargs["messages"][1]["content"] # type: ignore[index]
assert "[p1_l0] alpha line" in user_content
assert "[p1_l1] beta line" in user_content
async def test_plain_prompt_when_provenance_off(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["alpha line", "beta line"])
await step.process(req, resp)
user_content = client.request_kwargs["messages"][1]["content"] # type: ignore[index]
assert "[p1_l0]" not in user_content
assert "alpha line" in user_content
assert "beta line" in user_content
class TestResponseSchemaChoice:
async def test_plain_schema_when_provenance_off(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
assert client.response_schema is BankStatementHeader
async def test_wrapped_schema_when_provenance_on(self) -> None:
parsed_wrapped: Any = _WrappedResponse(
result=BankStatementHeader(bank_name="DKB", currency="EUR"),
segment_citations=[],
)
client = CapturingClient(parsed=parsed_wrapped)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=True)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
schema = client.response_schema
assert schema is not None
field_names = set(schema.model_fields.keys())
assert field_names == {"result", "segment_citations"}
class TestProvenanceMapping:
async def test_provenance_populated_from_citations(self) -> None:
parsed_wrapped: Any = _WrappedResponse(
result=BankStatementHeader(bank_name="DKB", currency="EUR"),
segment_citations=[
SegmentCitation(
field_path="result.bank_name",
value_segment_ids=["p1_l0"],
context_segment_ids=[],
),
],
)
client = CapturingClient(parsed=parsed_wrapped)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=True)
resp = _response_with_segment_index(lines=["DKB"])
resp = await step.process(req, resp)
assert isinstance(resp.provenance, ProvenanceData)
fields = resp.provenance.fields
assert "result.bank_name" in fields
fp = fields["result.bank_name"]
assert fp.value == "DKB"
assert len(fp.sources) == 1
assert fp.sources[0].segment_id == "p1_l0"
# Reliability flags are NOT set here — ReliabilityStep does that.
assert fp.provenance_verified is None
assert fp.text_agreement is None
class TestErrorHandling:
async def test_network_error_maps_to_IX_002_000(self) -> None:
err = httpx.ConnectError("refused")
client = FakeGenAIClient(
parsed=BankStatementHeader(bank_name="x", currency="EUR"),
raise_on_call=err,
)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
with pytest.raises(IXException) as ei:
await step.process(req, resp)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_timeout_maps_to_IX_002_000(self) -> None:
err = httpx.ReadTimeout("slow")
client = FakeGenAIClient(
parsed=BankStatementHeader(bank_name="x", currency="EUR"),
raise_on_call=err,
)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
with pytest.raises(IXException) as ei:
await step.process(req, resp)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_validation_error_maps_to_IX_002_001(self) -> None:
class _M(BaseModel):
x: int
try:
_M(x="not-an-int") # type: ignore[arg-type]
except ValidationError as err:
raise_err = err
client = FakeGenAIClient(
parsed=BankStatementHeader(bank_name="x", currency="EUR"),
raise_on_call=raise_err,
)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
with pytest.raises(IXException) as ei:
await step.process(req, resp)
assert ei.value.code is IXErrorCode.IX_002_001
class TestModelSelection:
async def test_request_model_override_wins(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False, model_name="explicit-model")
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
assert client.request_kwargs["model"] == "explicit-model" # type: ignore[index]
async def test_falls_back_to_use_case_default(self) -> None:
parsed = BankStatementHeader(bank_name="DKB", currency="EUR")
client = CapturingClient(parsed=parsed)
step = GenAIStep(genai_client=client)
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
# use-case default is qwen3:14b
assert client.request_kwargs["model"] == "qwen3:14b" # type: ignore[index]
# ----------------------------------------------------------------------------
# Helpers
class _WrappedResponse(BaseModel):
"""Stand-in for the runtime-created ProvenanceWrappedResponse."""
result: BankStatementHeader
segment_citations: list[SegmentCitation] = []

View file

@ -0,0 +1,270 @@
"""Tests for :class:`OllamaClient` — hermetic, pytest-httpx-driven.
Covers spec §6 GenAIStep Ollama call contract:
* POST body shape (model / messages / format / stream / options).
* Response parsing :class:`GenAIInvocationResult`.
* Error mapping: connection / timeout / 5xx ``IX_002_000``;
schema-violating body ``IX_002_001``.
* ``selfcheck()``: tags-reachable + model-listed ``ok``;
reachable-but-missing ``degraded``; unreachable ``fail``.
"""
from __future__ import annotations
import httpx
import pytest
from pydantic import BaseModel
from pytest_httpx import HTTPXMock
from ix.errors import IXErrorCode, IXException
from ix.genai.ollama_client import OllamaClient
class _Schema(BaseModel):
"""Trivial structured-output schema for the round-trip tests."""
bank_name: str
account_number: str | None = None
def _ollama_chat_ok_body(content_json: str) -> dict:
"""Build a minimal Ollama /api/chat success body."""
return {
"model": "gpt-oss:20b",
"message": {"role": "assistant", "content": content_json},
"done": True,
"eval_count": 42,
"prompt_eval_count": 17,
}
class TestInvokeHappyPath:
async def test_posts_to_chat_endpoint_with_format_and_no_stream(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"bank_name":"DKB","account_number":"DE89"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
result = await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [
{"role": "system", "content": "You extract."},
{"role": "user", "content": "Doc body"},
],
"temperature": 0.2,
"reasoning_effort": "high", # dropped silently
},
response_schema=_Schema,
)
assert result.parsed == _Schema(bank_name="DKB", account_number="DE89")
assert result.model_name == "gpt-oss:20b"
assert result.usage.prompt_tokens == 17
assert result.usage.completion_tokens == 42
# Verify request shape.
requests = httpx_mock.get_requests()
assert len(requests) == 1
body = requests[0].read().decode()
import json
body_json = json.loads(body)
assert body_json["model"] == "gpt-oss:20b"
assert body_json["stream"] is False
# No `format` is sent: Ollama 0.11.8 segfaults on full schemas and
# aborts to `{}` with `format=json` on reasoning models. Schema is
# injected into the system prompt instead; we extract the trailing
# JSON blob from the response and validate via Pydantic.
assert "format" not in body_json
assert body_json["options"]["temperature"] == 0.2
assert "reasoning_effort" not in body_json
# A schema-guidance system message is prepended to the caller's
# messages so Ollama (format=json loose mode) emits the right shape.
msgs = body_json["messages"]
assert msgs[0]["role"] == "system"
assert "JSON Schema" in msgs[0]["content"]
assert msgs[1:] == [
{"role": "system", "content": "You extract."},
{"role": "user", "content": "Doc body"},
]
async def test_text_parts_content_list_is_joined(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"bank_name":"X"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "part-a"},
{"type": "text", "text": "part-b"},
],
}
],
},
response_schema=_Schema,
)
import json
request_body = json.loads(httpx_mock.get_requests()[0].read())
# First message is the auto-injected schema guidance; after that
# the caller's user message has its text parts joined.
assert request_body["messages"][0]["role"] == "system"
assert request_body["messages"][1:] == [
{"role": "user", "content": "part-a\npart-b"}
]
class TestInvokeErrorPaths:
async def test_connection_error_maps_to_002_000(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_exception(httpx.ConnectError("refused"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=1.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_read_timeout_maps_to_002_000(self, httpx_mock: HTTPXMock) -> None:
httpx_mock.add_exception(httpx.ReadTimeout("slow"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=0.5
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_500_maps_to_002_000_with_body_snippet(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
status_code=500,
text="boom boom server broken",
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
assert "boom" in (ei.value.detail or "")
async def test_200_with_invalid_json_maps_to_002_001(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body("not-json"),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_001
async def test_200_with_schema_violation_maps_to_002_001(
self, httpx_mock: HTTPXMock
) -> None:
# Missing required `bank_name` field.
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"account_number":"DE89"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_001
class TestSelfcheck:
async def test_selfcheck_ok_when_model_listed(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/tags",
method="GET",
json={"models": [{"name": "gpt-oss:20b"}, {"name": "qwen2.5:32b"}]},
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "ok"
async def test_selfcheck_degraded_when_model_missing(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/tags",
method="GET",
json={"models": [{"name": "qwen2.5:32b"}]},
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "degraded"
async def test_selfcheck_fail_on_connection_error(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_exception(httpx.ConnectError("refused"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "fail"

View file

@ -0,0 +1,272 @@
"""End-to-end pipeline test with the fake OCR + GenAI clients (spec sections 6-9).
Feeds the committed ``tests/fixtures/synthetic_giro.pdf`` through the
full five-step pipeline with canned OCR + canned LLM responses.
Hermetic: no Surya, no Ollama, no network.
"""
from __future__ import annotations
from datetime import date
from decimal import Decimal
from pathlib import Path
from typing import Any
import pytest
from pydantic import BaseModel
from ix.contracts import (
Context,
Line,
OCRDetails,
OCROptions,
OCRResult,
Options,
Page,
ProvenanceOptions,
RequestIX,
SegmentCitation,
)
from ix.genai import FakeGenAIClient, GenAIUsage
from ix.ocr import FakeOCRClient
from ix.pipeline import Pipeline
from ix.pipeline.genai_step import GenAIStep
from ix.pipeline.ocr_step import OCRStep
from ix.pipeline.reliability_step import ReliabilityStep
from ix.pipeline.response_handler_step import ResponseHandlerStep
from ix.pipeline.setup_step import SetupStep
from ix.use_cases.bank_statement_header import BankStatementHeader
FIXTURE_PDF = Path(__file__).resolve().parent.parent / "fixtures" / "synthetic_giro.pdf"
# Ground-truth values. Must match the strings the fixture builder drops on
# the page AND the canned OCR output below.
EXPECTED_BANK_NAME = "DKB"
EXPECTED_IBAN = "DE89370400440532013000"
EXPECTED_OPENING = Decimal("1234.56")
EXPECTED_CLOSING = Decimal("1450.22")
EXPECTED_CURRENCY = "EUR"
EXPECTED_STATEMENT_DATE = date(2026, 3, 31)
EXPECTED_PERIOD_START = date(2026, 3, 1)
EXPECTED_PERIOD_END = date(2026, 3, 31)
def _canned_ocr_result() -> OCRResult:
"""Canned Surya-shaped result for the synthetic_giro fixture.
Line texts match the strings placed by create_fixture_pdf.py. Bboxes
are plausible-but-not-exact: the fixture builder uses 72 pt left
margin and 24 pt line height on a 595x842 page, so we mirror those
coords here so normalisation gives sensible 0-1 values.
"""
width, height = 595.0, 842.0
lines_meta = [
("DKB", 60.0),
("IBAN: DE89370400440532013000", 84.0),
("Statement period: 01.03.2026 - 31.03.2026", 108.0),
("Opening balance: 1234.56 EUR", 132.0),
("Closing balance: 1450.22 EUR", 156.0),
("Statement date: 31.03.2026", 180.0),
]
lines: list[Line] = []
for text, y_top in lines_meta:
y_bot = y_top + 16.0
lines.append(
Line(
text=text,
bounding_box=[72.0, y_top, 500.0, y_top, 500.0, y_bot, 72.0, y_bot],
)
)
return OCRResult(
result=OCRDetails(
text="\n".join(t for t, _ in lines_meta),
pages=[
Page(
page_no=1,
width=width,
height=height,
lines=lines,
)
],
),
meta_data={"engine": "fake"},
)
class _WrappedResponse(BaseModel):
"""Mirrors the runtime ProvenanceWrappedResponse GenAIStep creates."""
result: BankStatementHeader
segment_citations: list[SegmentCitation] = []
def _canned_llm_output() -> _WrappedResponse:
# After OCRStep injects <page> tag lines, the real OCR line at local
# index 0 gets segment id p1_l0 (tag lines are skipped by
# SegmentIndex.build). So:
# p1_l0 -> "DKB"
# p1_l1 -> "IBAN: DE89370400440532013000"
# p1_l2 -> "Statement period: 01.03.2026 - 31.03.2026"
# p1_l3 -> "Opening balance: 1234.56 EUR"
# p1_l4 -> "Closing balance: 1450.22 EUR"
# p1_l5 -> "Statement date: 31.03.2026"
return _WrappedResponse(
result=BankStatementHeader(
bank_name=EXPECTED_BANK_NAME,
account_iban=EXPECTED_IBAN,
account_type="checking",
currency=EXPECTED_CURRENCY,
statement_date=EXPECTED_STATEMENT_DATE,
statement_period_start=EXPECTED_PERIOD_START,
statement_period_end=EXPECTED_PERIOD_END,
opening_balance=EXPECTED_OPENING,
closing_balance=EXPECTED_CLOSING,
),
segment_citations=[
SegmentCitation(
field_path="result.bank_name",
value_segment_ids=["p1_l0"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.account_iban",
value_segment_ids=["p1_l1"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.account_type",
value_segment_ids=[],
context_segment_ids=["p1_l0"],
),
SegmentCitation(
field_path="result.currency",
value_segment_ids=["p1_l3", "p1_l4"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.statement_date",
value_segment_ids=["p1_l5"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.statement_period_start",
value_segment_ids=["p1_l2"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.statement_period_end",
value_segment_ids=["p1_l2"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.opening_balance",
value_segment_ids=["p1_l3"],
context_segment_ids=[],
),
SegmentCitation(
field_path="result.closing_balance",
value_segment_ids=["p1_l4"],
context_segment_ids=[],
),
],
)
def _build_pipeline(fetch_config: Any = None) -> Pipeline:
ocr_client = FakeOCRClient(canned=_canned_ocr_result())
genai_client = FakeGenAIClient(
parsed=_canned_llm_output(),
usage=GenAIUsage(prompt_tokens=200, completion_tokens=400),
model_name="fake-gpt",
)
setup = SetupStep(fetch_config=fetch_config) if fetch_config else SetupStep()
return Pipeline(
steps=[
setup,
OCRStep(ocr_client=ocr_client),
GenAIStep(genai_client=genai_client),
ReliabilityStep(),
ResponseHandlerStep(),
]
)
class TestEndToEnd:
@pytest.fixture
def request_ix(self, tmp_path: Path) -> RequestIX:
# Canonical single-file request pointing to the committed fixture
# via file:// URL. Also includes a matching Paperless-style text
# so text_agreement has real data to compare against.
paperless_text = (
"DKB statement. IBAN: DE89370400440532013000. Period 01.03.2026 - "
"31.03.2026. Opening balance 1234.56 EUR. Closing balance 1450.22 EUR. "
"Date 31.03.2026."
)
return RequestIX(
use_case="bank_statement_header",
ix_client_id="mammon-test",
request_id="end-to-end-1",
ix_id="abcd0123ef456789",
context=Context(
files=[FIXTURE_PDF.as_uri()],
texts=[paperless_text],
),
options=Options(
ocr=OCROptions(use_ocr=True),
provenance=ProvenanceOptions(
include_provenance=True, max_sources_per_field=5
),
),
)
async def test_ix_result_populated_from_fake_llm(self, request_ix: RequestIX) -> None:
pipeline = _build_pipeline()
response = await pipeline.start(request_ix)
assert response.error is None
result = response.ix_result.result
assert result["bank_name"] == EXPECTED_BANK_NAME
assert result["account_iban"] == EXPECTED_IBAN
assert result["currency"] == EXPECTED_CURRENCY
# Pydantic v2 dumps Decimals as strings in mode="json".
assert result["closing_balance"] == str(EXPECTED_CLOSING)
async def test_provenance_verified_for_closing_balance(
self, request_ix: RequestIX
) -> None:
pipeline = _build_pipeline()
response = await pipeline.start(request_ix)
assert response.provenance is not None
fp = response.provenance.fields["result.closing_balance"]
assert fp.provenance_verified is True
async def test_text_agreement_true_when_texts_match_value(
self, request_ix: RequestIX
) -> None:
pipeline = _build_pipeline()
response = await pipeline.start(request_ix)
assert response.provenance is not None
fp = response.provenance.fields["result.closing_balance"]
assert fp.text_agreement is True
async def test_timings_per_step(self, request_ix: RequestIX) -> None:
pipeline = _build_pipeline()
response = await pipeline.start(request_ix)
# Each of the five steps executed and recorded a timing.
names = [t["step"] for t in response.metadata.timings]
assert names == [
"SetupStep",
"OCRStep",
"GenAIStep",
"ReliabilityStep",
"ResponseHandlerStep",
]
for entry in response.metadata.timings:
assert isinstance(entry["elapsed_seconds"], float)
async def test_no_error_and_context_stripped(self, request_ix: RequestIX) -> None:
pipeline = _build_pipeline()
response = await pipeline.start(request_ix)
assert response.error is None
dump = response.model_dump()
assert "context" not in dump

View file

@ -0,0 +1,250 @@
"""Tests for :class:`ix.pipeline.reliability_step.ReliabilityStep` (spec §6)."""
from __future__ import annotations
from datetime import date
from decimal import Decimal
from ix.contracts import (
BoundingBox,
Context,
ExtractionSource,
FieldProvenance,
OCROptions,
Options,
ProvenanceData,
ProvenanceOptions,
RequestIX,
ResponseIX,
)
from ix.contracts.response import _InternalContext
from ix.pipeline.reliability_step import ReliabilityStep
from ix.use_cases.bank_statement_header import BankStatementHeader
def _src(
segment_id: str,
text: str,
page: int = 1,
bbox: list[float] | None = None,
) -> ExtractionSource:
return ExtractionSource(
page_number=page,
file_index=0,
bounding_box=BoundingBox(coordinates=bbox or [0, 0, 1, 0, 1, 1, 0, 1]),
text_snippet=text,
relevance_score=1.0,
segment_id=segment_id,
)
def _make_request(
include_provenance: bool = True, texts: list[str] | None = None
) -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id="test",
request_id="r-1",
context=Context(files=[], texts=texts or []),
options=Options(
ocr=OCROptions(),
provenance=ProvenanceOptions(include_provenance=include_provenance),
),
)
def _response_with_provenance(
fields: dict[str, FieldProvenance],
texts: list[str] | None = None,
) -> ResponseIX:
resp = ResponseIX()
resp.provenance = ProvenanceData(
fields=fields,
quality_metrics={},
segment_count=10,
granularity="line",
)
resp.context = _InternalContext(
texts=texts or [],
use_case_response=BankStatementHeader,
)
return resp
class TestValidate:
async def test_skipped_when_provenance_off(self) -> None:
step = ReliabilityStep()
req = _make_request(include_provenance=False)
resp = _response_with_provenance(fields={})
assert await step.validate(req, resp) is False
async def test_skipped_when_no_provenance_data(self) -> None:
step = ReliabilityStep()
req = _make_request(include_provenance=True)
resp = ResponseIX()
assert await step.validate(req, resp) is False
async def test_runs_when_provenance_data_present(self) -> None:
step = ReliabilityStep()
req = _make_request(include_provenance=True)
resp = _response_with_provenance(fields={})
assert await step.validate(req, resp) is True
class TestProcessFlags:
async def test_string_field_verified_and_text_agreement(self) -> None:
fp = FieldProvenance(
field_name="bank_name",
field_path="result.bank_name",
value="DKB",
sources=[_src("p1_l0", "DKB")],
)
resp = _response_with_provenance(
fields={"result.bank_name": fp},
texts=["DKB statement content"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["DKB statement content"]), resp)
out = resp.provenance.fields["result.bank_name"]
assert out.provenance_verified is True
assert out.text_agreement is True
async def test_literal_field_flags_none(self) -> None:
fp = FieldProvenance(
field_name="account_type",
field_path="result.account_type",
value="checking",
sources=[_src("p1_l0", "anything")],
)
resp = _response_with_provenance(
fields={"result.account_type": fp},
texts=["some text"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["some text"]), resp)
out = resp.provenance.fields["result.account_type"]
assert out.provenance_verified is None
assert out.text_agreement is None
async def test_none_value_flags_none(self) -> None:
fp = FieldProvenance(
field_name="account_iban",
field_path="result.account_iban",
value=None,
sources=[_src("p1_l0", "IBAN blah")],
)
resp = _response_with_provenance(
fields={"result.account_iban": fp},
texts=["text"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["text"]), resp)
out = resp.provenance.fields["result.account_iban"]
assert out.provenance_verified is None
assert out.text_agreement is None
async def test_short_value_text_agreement_skipped(self) -> None:
# Closing balance value < 10 → short numeric skip rule.
fp = FieldProvenance(
field_name="opening_balance",
field_path="result.opening_balance",
value=Decimal("5.00"),
sources=[_src("p1_l0", "balance 5.00")],
)
resp = _response_with_provenance(
fields={"result.opening_balance": fp},
texts=["balance 5.00"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["balance 5.00"]), resp)
out = resp.provenance.fields["result.opening_balance"]
assert out.provenance_verified is True # bbox cite still runs
assert out.text_agreement is None # short-value skip
async def test_date_field_parses_both_sides(self) -> None:
fp = FieldProvenance(
field_name="statement_date",
field_path="result.statement_date",
value=date(2026, 3, 31),
sources=[_src("p1_l0", "Statement date 31.03.2026")],
)
resp = _response_with_provenance(
fields={"result.statement_date": fp},
texts=["Statement date 2026-03-31"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["Statement date 2026-03-31"]), resp)
out = resp.provenance.fields["result.statement_date"]
assert out.provenance_verified is True
assert out.text_agreement is True
async def test_iban_field_whitespace_ignored(self) -> None:
fp = FieldProvenance(
field_name="account_iban",
field_path="result.account_iban",
value="DE89370400440532013000",
sources=[_src("p1_l0", "IBAN DE89 3704 0044 0532 0130 00")],
)
resp = _response_with_provenance(
fields={"result.account_iban": fp},
texts=["IBAN DE89 3704 0044 0532 0130 00"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["IBAN DE89 3704 0044 0532 0130 00"]), resp)
out = resp.provenance.fields["result.account_iban"]
assert out.provenance_verified is True
assert out.text_agreement is True
async def test_disagreeing_snippet_sets_false(self) -> None:
fp = FieldProvenance(
field_name="bank_name",
field_path="result.bank_name",
value="DKB",
sources=[_src("p1_l0", "Commerzbank")],
)
resp = _response_with_provenance(
fields={"result.bank_name": fp},
texts=["Commerzbank header"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["Commerzbank header"]), resp)
out = resp.provenance.fields["result.bank_name"]
assert out.provenance_verified is False
assert out.text_agreement is False
class TestCounters:
async def test_quality_metrics_counters_written(self) -> None:
fp_ok = FieldProvenance(
field_name="bank_name",
field_path="result.bank_name",
value="DKB",
sources=[_src("p1_l0", "DKB")],
)
fp_bad = FieldProvenance(
field_name="currency",
field_path="result.currency",
value="EUR",
sources=[_src("p1_l1", "nothing to see")],
)
fp_literal = FieldProvenance(
field_name="account_type",
field_path="result.account_type",
value="checking",
sources=[_src("p1_l2", "anything")],
)
resp = _response_with_provenance(
fields={
"result.bank_name": fp_ok,
"result.currency": fp_bad,
"result.account_type": fp_literal,
},
texts=["DKB statement"],
)
step = ReliabilityStep()
resp = await step.process(_make_request(texts=["DKB statement"]), resp)
qm = resp.provenance.quality_metrics
# bank_name verified+agree (2 flags), others not.
assert qm["verified_fields"] == 1
assert qm["text_agreement_fields"] == 1

View file

@ -0,0 +1,136 @@
"""Tests for :class:`ix.pipeline.response_handler_step.ResponseHandlerStep` (spec §8)."""
from __future__ import annotations
from ix.contracts import (
Context,
Line,
OCRDetails,
OCROptions,
OCRResult,
Options,
Page,
RequestIX,
ResponseIX,
)
from ix.contracts.response import _InternalContext
from ix.pipeline.response_handler_step import ResponseHandlerStep
def _make_request(
*,
include_geometries: bool = False,
include_ocr_text: bool = False,
) -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id="test",
request_id="r-1",
context=Context(files=[], texts=[]),
options=Options(
ocr=OCROptions(
include_geometries=include_geometries,
include_ocr_text=include_ocr_text,
)
),
)
def _populated_response() -> ResponseIX:
resp = ResponseIX(
ocr_result=OCRResult(
result=OCRDetails(
text=None,
pages=[
Page(
page_no=1,
width=100.0,
height=200.0,
lines=[
Line(text='<page file="0" number="1">', bounding_box=[]),
Line(text="hello", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
Line(text="world", bounding_box=[0, 2, 1, 2, 1, 3, 0, 3]),
Line(text="</page>", bounding_box=[]),
],
),
Page(
page_no=2,
width=100.0,
height=200.0,
lines=[
Line(text="p2 line", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
],
),
],
),
meta_data={"adapter": "fake"},
)
)
resp.context = _InternalContext()
return resp
class TestValidateAlwaysTrue:
async def test_validate_always_true(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
assert await step.validate(req, _populated_response()) is True
class TestAttachOcrText:
async def test_include_ocr_text_concatenates_lines(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_ocr_text=True, include_geometries=True)
resp = _populated_response()
resp = await step.process(req, resp)
# Page tag lines excluded; real lines joined within page with \n,
# pages with \n\n.
text = resp.ocr_result.result.text
assert text is not None
assert "hello\nworld" in text
assert "p2 line" in text
assert "<page" not in text
async def test_include_ocr_text_false_leaves_text_alone(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_ocr_text=False, include_geometries=True)
resp = _populated_response()
resp.ocr_result.result.text = None
resp = await step.process(req, resp)
assert resp.ocr_result.result.text is None
class TestStripGeometries:
async def test_strips_pages_and_meta_when_off(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_geometries=False)
resp = _populated_response()
resp = await step.process(req, resp)
assert resp.ocr_result.result.pages == []
assert resp.ocr_result.meta_data == {}
async def test_keeps_pages_when_on(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_geometries=True)
resp = _populated_response()
pages_before = [p.page_no for p in resp.ocr_result.result.pages]
resp = await step.process(req, resp)
assert [p.page_no for p in resp.ocr_result.result.pages] == pages_before
assert resp.ocr_result.meta_data == {"adapter": "fake"}
class TestContextDeletion:
async def test_context_removed(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
resp = _populated_response()
resp = await step.process(req, resp)
assert resp.context is None
async def test_context_not_in_model_dump(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
resp = _populated_response()
resp = await step.process(req, resp)
dump = resp.model_dump()
assert "context" not in dump

View file

@ -0,0 +1,166 @@
"""Tests for :class:`SuryaOCRClient` — hermetic, no model download.
The real Surya predictors are patched out with :class:`unittest.mock.MagicMock`
that return trivially-shaped line objects. The tests assert the client's
translation layer flattening polygons, mapping text_lines ``Line``,
preserving ``page_no``/``width``/``height`` per input page.
"""
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from ix.contracts import Page
from ix.ocr.surya_client import SuryaOCRClient
from ix.segmentation import PageMetadata
def _make_surya_line(text: str, polygon: list[list[float]]) -> SimpleNamespace:
"""Mimic ``surya.recognition.schema.TextLine`` duck-typing-style."""
return SimpleNamespace(text=text, polygon=polygon, confidence=0.95)
def _make_surya_ocr_result(lines: list[SimpleNamespace]) -> SimpleNamespace:
"""Mimic ``surya.recognition.schema.OCRResult``."""
return SimpleNamespace(text_lines=lines, image_bbox=[0, 0, 100, 100])
class TestOCRBuildsOCRResultFromMockedPredictors:
async def test_one_image_one_line_flatten_polygon(self, tmp_path: Path) -> None:
img_path = tmp_path / "a.png"
_write_tiny_png(img_path)
mock_line = _make_surya_line(
text="hello",
polygon=[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]],
)
mock_predictor = MagicMock(
return_value=[_make_surya_ocr_result([mock_line])]
)
client = SuryaOCRClient()
# Skip the real warm_up; inject the mock directly.
client._recognition_predictor = mock_predictor
client._detection_predictor = MagicMock()
pages = [Page(page_no=1, width=100.0, height=50.0, lines=[])]
result = await client.ocr(
pages,
files=[(img_path, "image/png")],
page_metadata=[PageMetadata(file_index=0)],
)
assert len(result.result.pages) == 1
out_page = result.result.pages[0]
assert out_page.page_no == 1
assert out_page.width == 100.0
assert out_page.height == 50.0
assert len(out_page.lines) == 1
assert out_page.lines[0].text == "hello"
assert out_page.lines[0].bounding_box == [
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0
]
async def test_multiple_pages_preserves_order(self, tmp_path: Path) -> None:
img_a = tmp_path / "a.png"
img_b = tmp_path / "b.png"
_write_tiny_png(img_a)
_write_tiny_png(img_b)
mock_predictor = MagicMock(
return_value=[
_make_surya_ocr_result(
[_make_surya_line("a-line", [[0, 0], [1, 0], [1, 1], [0, 1]])]
),
_make_surya_ocr_result(
[_make_surya_line("b-line", [[0, 0], [1, 0], [1, 1], [0, 1]])]
),
]
)
client = SuryaOCRClient()
client._recognition_predictor = mock_predictor
client._detection_predictor = MagicMock()
pages = [
Page(page_no=1, width=10.0, height=20.0, lines=[]),
Page(page_no=2, width=10.0, height=20.0, lines=[]),
]
result = await client.ocr(
pages,
files=[(img_a, "image/png"), (img_b, "image/png")],
page_metadata=[
PageMetadata(file_index=0),
PageMetadata(file_index=1),
],
)
assert [p.lines[0].text for p in result.result.pages] == ["a-line", "b-line"]
async def test_lazy_warm_up_on_first_ocr(self, tmp_path: Path) -> None:
img = tmp_path / "x.png"
_write_tiny_png(img)
client = SuryaOCRClient()
# Use patch.object on the instance's warm_up so we don't need real
# Surya module loading.
with patch.object(client, "warm_up", autospec=True) as mocked_warm_up:
# After warm_up is called, the predictors must be assigned.
def fake_warm_up(self: SuryaOCRClient) -> None:
self._recognition_predictor = MagicMock(
return_value=[
_make_surya_ocr_result(
[
_make_surya_line(
"hi", [[0, 0], [1, 0], [1, 1], [0, 1]]
)
]
)
]
)
self._detection_predictor = MagicMock()
mocked_warm_up.side_effect = lambda: fake_warm_up(client)
pages = [Page(page_no=1, width=10.0, height=10.0, lines=[])]
await client.ocr(
pages,
files=[(img, "image/png")],
page_metadata=[PageMetadata(file_index=0)],
)
mocked_warm_up.assert_called_once()
class TestSelfcheck:
async def test_selfcheck_ok_with_mocked_predictors(self) -> None:
client = SuryaOCRClient()
client._recognition_predictor = MagicMock(
return_value=[_make_surya_ocr_result([])]
)
client._detection_predictor = MagicMock()
assert await client.selfcheck() == "ok"
async def test_selfcheck_fail_when_predictor_raises(self) -> None:
client = SuryaOCRClient()
client._recognition_predictor = MagicMock(
side_effect=RuntimeError("cuda broken")
)
client._detection_predictor = MagicMock()
assert await client.selfcheck() == "fail"
def _write_tiny_png(path: Path) -> None:
"""Write a 2x2 white PNG so PIL can open it."""
from PIL import Image
Image.new("RGB", (2, 2), color="white").save(path, format="PNG")
@pytest.mark.parametrize("unused", [None]) # keep pytest happy if file ever runs alone
def test_module_imports(unused: None) -> None:
assert SuryaOCRClient is not None

View file

@ -12,7 +12,7 @@ class TestRequest:
def test_defaults(self) -> None:
r = Request()
assert r.use_case_name == "Bank Statement Header"
assert r.default_model == "gpt-oss:20b"
assert r.default_model == "qwen3:14b"
# Stable substring for agent/worker tests that want to confirm the
# prompt is what they think it is.
assert "extract header metadata" in r.system_prompt

165
uv.lock
View file

@ -110,6 +110,79 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/a1/67fe25fac3c7642725500a3f6cfe5821ad557c3abb11c9d20d12c7008d3e/charset_normalizer-3.4.7.tar.gz", hash = "sha256:ae89db9e5f98a11a4bf50407d4363e7b09b31e55bc117b4f7d80aab97ba009e5", size = 144271, upload-time = "2026-04-02T09:28:39.342Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/eb/4fc8d0a7110eb5fc9cc161723a34a8a6c200ce3b4fbf681bc86feee22308/charset_normalizer-3.4.7-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:eca9705049ad3c7345d574e3510665cb2cf844c2f2dcfe675332677f081cbd46", size = 311328, upload-time = "2026-04-02T09:26:24.331Z" },
{ url = "https://files.pythonhosted.org/packages/f8/e3/0fadc706008ac9d7b9b5be6dc767c05f9d3e5df51744ce4cc9605de7b9f4/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6178f72c5508bfc5fd446a5905e698c6212932f25bcdd4b47a757a50605a90e2", size = 208061, upload-time = "2026-04-02T09:26:25.568Z" },
{ url = "https://files.pythonhosted.org/packages/42/f0/3dd1045c47f4a4604df85ec18ad093912ae1344ac706993aff91d38773a2/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e1421b502d83040e6d7fb2fb18dff63957f720da3d77b2fbd3187ceb63755d7b", size = 229031, upload-time = "2026-04-02T09:26:26.865Z" },
{ url = "https://files.pythonhosted.org/packages/dc/67/675a46eb016118a2fbde5a277a5d15f4f69d5f3f5f338e5ee2f8948fcf43/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:edac0f1ab77644605be2cbba52e6b7f630731fc42b34cb0f634be1a6eface56a", size = 225239, upload-time = "2026-04-02T09:26:28.044Z" },
{ url = "https://files.pythonhosted.org/packages/4b/f8/d0118a2f5f23b02cd166fa385c60f9b0d4f9194f574e2b31cef350ad7223/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5649fd1c7bade02f320a462fdefd0b4bd3ce036065836d4f42e0de958038e116", size = 216589, upload-time = "2026-04-02T09:26:29.239Z" },
{ url = "https://files.pythonhosted.org/packages/b1/f1/6d2b0b261b6c4ceef0fcb0d17a01cc5bc53586c2d4796fa04b5c540bc13d/charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_armv7l.whl", hash = "sha256:203104ed3e428044fd943bc4bf45fa73c0730391f9621e37fe39ecf477b128cb", size = 202733, upload-time = "2026-04-02T09:26:30.5Z" },
{ url = "https://files.pythonhosted.org/packages/6f/c0/7b1f943f7e87cc3db9626ba17807d042c38645f0a1d4415c7a14afb5591f/charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:298930cec56029e05497a76988377cbd7457ba864beeea92ad7e844fe74cd1f1", size = 212652, upload-time = "2026-04-02T09:26:31.709Z" },
{ url = "https://files.pythonhosted.org/packages/38/dd/5a9ab159fe45c6e72079398f277b7d2b523e7f716acc489726115a910097/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:708838739abf24b2ceb208d0e22403dd018faeef86ddac04319a62ae884c4f15", size = 211229, upload-time = "2026-04-02T09:26:33.282Z" },
{ url = "https://files.pythonhosted.org/packages/d5/ff/531a1cad5ca855d1c1a8b69cb71abfd6d85c0291580146fda7c82857caa1/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:0f7eb884681e3938906ed0434f20c63046eacd0111c4ba96f27b76084cd679f5", size = 203552, upload-time = "2026-04-02T09:26:34.845Z" },
{ url = "https://files.pythonhosted.org/packages/c1/4c/a5fb52d528a8ca41f7598cb619409ece30a169fbdf9cdce592e53b46c3a6/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:4dc1e73c36828f982bfe79fadf5919923f8a6f4df2860804db9a98c48824ce8d", size = 230806, upload-time = "2026-04-02T09:26:36.152Z" },
{ url = "https://files.pythonhosted.org/packages/59/7a/071feed8124111a32b316b33ae4de83d36923039ef8cf48120266844285b/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:aed52fea0513bac0ccde438c188c8a471c4e0f457c2dd20cdbf6ea7a450046c7", size = 212316, upload-time = "2026-04-02T09:26:37.672Z" },
{ url = "https://files.pythonhosted.org/packages/fd/35/f7dba3994312d7ba508e041eaac39a36b120f32d4c8662b8814dab876431/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:fea24543955a6a729c45a73fe90e08c743f0b3334bbf3201e6c4bc1b0c7fa464", size = 227274, upload-time = "2026-04-02T09:26:38.93Z" },
{ url = "https://files.pythonhosted.org/packages/8a/2d/a572df5c9204ab7688ec1edc895a73ebded3b023bb07364710b05dd1c9be/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:bb6d88045545b26da47aa879dd4a89a71d1dce0f0e549b1abcb31dfe4a8eac49", size = 218468, upload-time = "2026-04-02T09:26:40.17Z" },
{ url = "https://files.pythonhosted.org/packages/86/eb/890922a8b03a568ca2f336c36585a4713c55d4d67bf0f0c78924be6315ca/charset_normalizer-3.4.7-cp312-cp312-win32.whl", hash = "sha256:2257141f39fe65a3fdf38aeccae4b953e5f3b3324f4ff0daf9f15b8518666a2c", size = 148460, upload-time = "2026-04-02T09:26:41.416Z" },
{ url = "https://files.pythonhosted.org/packages/35/d9/0e7dffa06c5ab081f75b1b786f0aefc88365825dfcd0ac544bdb7b2b6853/charset_normalizer-3.4.7-cp312-cp312-win_amd64.whl", hash = "sha256:5ed6ab538499c8644b8a3e18debabcd7ce684f3fa91cf867521a7a0279cab2d6", size = 159330, upload-time = "2026-04-02T09:26:42.554Z" },
{ url = "https://files.pythonhosted.org/packages/9e/5d/481bcc2a7c88ea6b0878c299547843b2521ccbc40980cb406267088bc701/charset_normalizer-3.4.7-cp312-cp312-win_arm64.whl", hash = "sha256:56be790f86bfb2c98fb742ce566dfb4816e5a83384616ab59c49e0604d49c51d", size = 147828, upload-time = "2026-04-02T09:26:44.075Z" },
{ url = "https://files.pythonhosted.org/packages/c1/3b/66777e39d3ae1ddc77ee606be4ec6d8cbd4c801f65e5a1b6f2b11b8346dd/charset_normalizer-3.4.7-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f496c9c3cc02230093d8330875c4c3cdfc3b73612a5fd921c65d39cbcef08063", size = 309627, upload-time = "2026-04-02T09:26:45.198Z" },
{ url = "https://files.pythonhosted.org/packages/2e/4e/b7f84e617b4854ade48a1b7915c8ccfadeba444d2a18c291f696e37f0d3b/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ea948db76d31190bf08bd371623927ee1339d5f2a0b4b1b4a4439a65298703c", size = 207008, upload-time = "2026-04-02T09:26:46.824Z" },
{ url = "https://files.pythonhosted.org/packages/c4/bb/ec73c0257c9e11b268f018f068f5d00aa0ef8c8b09f7753ebd5f2880e248/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a277ab8928b9f299723bc1a2dabb1265911b1a76341f90a510368ca44ad9ab66", size = 228303, upload-time = "2026-04-02T09:26:48.397Z" },
{ url = "https://files.pythonhosted.org/packages/85/fb/32d1f5033484494619f701e719429c69b766bfc4dbc61aa9e9c8c166528b/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:3bec022aec2c514d9cf199522a802bd007cd588ab17ab2525f20f9c34d067c18", size = 224282, upload-time = "2026-04-02T09:26:49.684Z" },
{ url = "https://files.pythonhosted.org/packages/fa/07/330e3a0dda4c404d6da83b327270906e9654a24f6c546dc886a0eb0ffb23/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e044c39e41b92c845bc815e5ae4230804e8e7bc29e399b0437d64222d92809dd", size = 215595, upload-time = "2026-04-02T09:26:50.915Z" },
{ url = "https://files.pythonhosted.org/packages/e3/7c/fc890655786e423f02556e0216d4b8c6bcb6bdfa890160dc66bf52dee468/charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_armv7l.whl", hash = "sha256:f495a1652cf3fbab2eb0639776dad966c2fb874d79d87ca07f9d5f059b8bd215", size = 201986, upload-time = "2026-04-02T09:26:52.197Z" },
{ url = "https://files.pythonhosted.org/packages/d8/97/bfb18b3db2aed3b90cf54dc292ad79fdd5ad65c4eae454099475cbeadd0d/charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e712b419df8ba5e42b226c510472b37bd57b38e897d3eca5e8cfd410a29fa859", size = 211711, upload-time = "2026-04-02T09:26:53.49Z" },
{ url = "https://files.pythonhosted.org/packages/6f/a5/a581c13798546a7fd557c82614a5c65a13df2157e9ad6373166d2a3e645d/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7804338df6fcc08105c7745f1502ba68d900f45fd770d5bdd5288ddccb8a42d8", size = 210036, upload-time = "2026-04-02T09:26:54.975Z" },
{ url = "https://files.pythonhosted.org/packages/8c/bf/b3ab5bcb478e4193d517644b0fb2bf5497fbceeaa7a1bc0f4d5b50953861/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:481551899c856c704d58119b5025793fa6730adda3571971af568f66d2424bb5", size = 202998, upload-time = "2026-04-02T09:26:56.303Z" },
{ url = "https://files.pythonhosted.org/packages/e7/4e/23efd79b65d314fa320ec6017b4b5834d5c12a58ba4610aa353af2e2f577/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f59099f9b66f0d7145115e6f80dd8b1d847176df89b234a5a6b3f00437aa0832", size = 230056, upload-time = "2026-04-02T09:26:57.554Z" },
{ url = "https://files.pythonhosted.org/packages/b9/9f/1e1941bc3f0e01df116e68dc37a55c4d249df5e6fa77f008841aef68264f/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:f59ad4c0e8f6bba240a9bb85504faa1ab438237199d4cce5f622761507b8f6a6", size = 211537, upload-time = "2026-04-02T09:26:58.843Z" },
{ url = "https://files.pythonhosted.org/packages/80/0f/088cbb3020d44428964a6c97fe1edfb1b9550396bf6d278330281e8b709c/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:3dedcc22d73ec993f42055eff4fcfed9318d1eeb9a6606c55892a26964964e48", size = 226176, upload-time = "2026-04-02T09:27:00.437Z" },
{ url = "https://files.pythonhosted.org/packages/6a/9f/130394f9bbe06f4f63e22641d32fc9b202b7e251c9aef4db044324dac493/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:64f02c6841d7d83f832cd97ccf8eb8a906d06eb95d5276069175c696b024b60a", size = 217723, upload-time = "2026-04-02T09:27:02.021Z" },
{ url = "https://files.pythonhosted.org/packages/73/55/c469897448a06e49f8fa03f6caae97074fde823f432a98f979cc42b90e69/charset_normalizer-3.4.7-cp313-cp313-win32.whl", hash = "sha256:4042d5c8f957e15221d423ba781e85d553722fc4113f523f2feb7b188cc34c5e", size = 148085, upload-time = "2026-04-02T09:27:03.192Z" },
{ url = "https://files.pythonhosted.org/packages/5d/78/1b74c5bbb3f99b77a1715c91b3e0b5bdb6fe302d95ace4f5b1bec37b0167/charset_normalizer-3.4.7-cp313-cp313-win_amd64.whl", hash = "sha256:3946fa46a0cf3e4c8cb1cc52f56bb536310d34f25f01ca9b6c16afa767dab110", size = 158819, upload-time = "2026-04-02T09:27:04.454Z" },
{ url = "https://files.pythonhosted.org/packages/68/86/46bd42279d323deb8687c4a5a811fd548cb7d1de10cf6535d099877a9a9f/charset_normalizer-3.4.7-cp313-cp313-win_arm64.whl", hash = "sha256:80d04837f55fc81da168b98de4f4b797ef007fc8a79ab71c6ec9bc4dd662b15b", size = 147915, upload-time = "2026-04-02T09:27:05.971Z" },
{ url = "https://files.pythonhosted.org/packages/97/c8/c67cb8c70e19ef1960b97b22ed2a1567711de46c4ddf19799923adc836c2/charset_normalizer-3.4.7-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:c36c333c39be2dbca264d7803333c896ab8fa7d4d6f0ab7edb7dfd7aea6e98c0", size = 309234, upload-time = "2026-04-02T09:27:07.194Z" },
{ url = "https://files.pythonhosted.org/packages/99/85/c091fdee33f20de70d6c8b522743b6f831a2f1cd3ff86de4c6a827c48a76/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1c2aed2e5e41f24ea8ef1590b8e848a79b56f3a5564a65ceec43c9d692dc7d8a", size = 208042, upload-time = "2026-04-02T09:27:08.749Z" },
{ url = "https://files.pythonhosted.org/packages/87/1c/ab2ce611b984d2fd5d86a5a8a19c1ae26acac6bad967da4967562c75114d/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:54523e136b8948060c0fa0bc7b1b50c32c186f2fceee897a495406bb6e311d2b", size = 228706, upload-time = "2026-04-02T09:27:09.951Z" },
{ url = "https://files.pythonhosted.org/packages/a8/29/2b1d2cb00bf085f59d29eb773ce58ec2d325430f8c216804a0a5cd83cbca/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:715479b9a2802ecac752a3b0efa2b0b60285cf962ee38414211abdfccc233b41", size = 224727, upload-time = "2026-04-02T09:27:11.175Z" },
{ url = "https://files.pythonhosted.org/packages/47/5c/032c2d5a07fe4d4855fea851209cca2b6f03ebeb6d4e3afdb3358386a684/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bd6c2a1c7573c64738d716488d2cdd3c00e340e4835707d8fdb8dc1a66ef164e", size = 215882, upload-time = "2026-04-02T09:27:12.446Z" },
{ url = "https://files.pythonhosted.org/packages/2c/c2/356065d5a8b78ed04499cae5f339f091946a6a74f91e03476c33f0ab7100/charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_armv7l.whl", hash = "sha256:c45e9440fb78f8ddabcf714b68f936737a121355bf59f3907f4e17721b9d1aae", size = 200860, upload-time = "2026-04-02T09:27:13.721Z" },
{ url = "https://files.pythonhosted.org/packages/0c/cd/a32a84217ced5039f53b29f460962abb2d4420def55afabe45b1c3c7483d/charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3534e7dcbdcf757da6b85a0bbf5b6868786d5982dd959b065e65481644817a18", size = 211564, upload-time = "2026-04-02T09:27:15.272Z" },
{ url = "https://files.pythonhosted.org/packages/44/86/58e6f13ce26cc3b8f4a36b94a0f22ae2f00a72534520f4ae6857c4b81f89/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:e8ac484bf18ce6975760921bb6148041faa8fef0547200386ea0b52b5d27bf7b", size = 211276, upload-time = "2026-04-02T09:27:16.834Z" },
{ url = "https://files.pythonhosted.org/packages/8f/fe/d17c32dc72e17e155e06883efa84514ca375f8a528ba2546bee73fc4df81/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:a5fe03b42827c13cdccd08e6c0247b6a6d4b5e3cdc53fd1749f5896adcdc2356", size = 201238, upload-time = "2026-04-02T09:27:18.229Z" },
{ url = "https://files.pythonhosted.org/packages/6a/29/f33daa50b06525a237451cdb6c69da366c381a3dadcd833fa5676bc468b3/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:2d6eb928e13016cea4f1f21d1e10c1cebd5a421bc57ddf5b1142ae3f86824fab", size = 230189, upload-time = "2026-04-02T09:27:19.445Z" },
{ url = "https://files.pythonhosted.org/packages/b6/6e/52c84015394a6a0bdcd435210a7e944c5f94ea1055f5cc5d56c5fe368e7b/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:e74327fb75de8986940def6e8dee4f127cc9752bee7355bb323cc5b2659b6d46", size = 211352, upload-time = "2026-04-02T09:27:20.79Z" },
{ url = "https://files.pythonhosted.org/packages/8c/d7/4353be581b373033fb9198bf1da3cf8f09c1082561e8e922aa7b39bf9fe8/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_s390x.whl", hash = "sha256:d6038d37043bced98a66e68d3aa2b6a35505dc01328cd65217cefe82f25def44", size = 227024, upload-time = "2026-04-02T09:27:22.063Z" },
{ url = "https://files.pythonhosted.org/packages/30/45/99d18aa925bd1740098ccd3060e238e21115fffbfdcb8f3ece837d0ace6c/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:7579e913a5339fb8fa133f6bbcfd8e6749696206cf05acdbdca71a1b436d8e72", size = 217869, upload-time = "2026-04-02T09:27:23.486Z" },
{ url = "https://files.pythonhosted.org/packages/5c/05/5ee478aa53f4bb7996482153d4bfe1b89e0f087f0ab6b294fcf92d595873/charset_normalizer-3.4.7-cp314-cp314-win32.whl", hash = "sha256:5b77459df20e08151cd6f8b9ef8ef1f961ef73d85c21a555c7eed5b79410ec10", size = 148541, upload-time = "2026-04-02T09:27:25.146Z" },
{ url = "https://files.pythonhosted.org/packages/48/77/72dcb0921b2ce86420b2d79d454c7022bf5be40202a2a07906b9f2a35c97/charset_normalizer-3.4.7-cp314-cp314-win_amd64.whl", hash = "sha256:92a0a01ead5e668468e952e4238cccd7c537364eb7d851ab144ab6627dbbe12f", size = 159634, upload-time = "2026-04-02T09:27:26.642Z" },
{ url = "https://files.pythonhosted.org/packages/c6/a3/c2369911cd72f02386e4e340770f6e158c7980267da16af8f668217abaa0/charset_normalizer-3.4.7-cp314-cp314-win_arm64.whl", hash = "sha256:67f6279d125ca0046a7fd386d01b311c6363844deac3e5b069b514ba3e63c246", size = 148384, upload-time = "2026-04-02T09:27:28.271Z" },
{ url = "https://files.pythonhosted.org/packages/94/09/7e8a7f73d24dba1f0035fbbf014d2c36828fc1bf9c88f84093e57d315935/charset_normalizer-3.4.7-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:effc3f449787117233702311a1b7d8f59cba9ced946ba727bdc329ec69028e24", size = 330133, upload-time = "2026-04-02T09:27:29.474Z" },
{ url = "https://files.pythonhosted.org/packages/8d/da/96975ddb11f8e977f706f45cddd8540fd8242f71ecdb5d18a80723dcf62c/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:fbccdc05410c9ee21bbf16a35f4c1d16123dcdeb8a1d38f33654fa21d0234f79", size = 216257, upload-time = "2026-04-02T09:27:30.793Z" },
{ url = "https://files.pythonhosted.org/packages/e5/e8/1d63bf8ef2d388e95c64b2098f45f84758f6d102a087552da1485912637b/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:733784b6d6def852c814bce5f318d25da2ee65dd4839a0718641c696e09a2960", size = 234851, upload-time = "2026-04-02T09:27:32.44Z" },
{ url = "https://files.pythonhosted.org/packages/9b/40/e5ff04233e70da2681fa43969ad6f66ca5611d7e669be0246c4c7aaf6dc8/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a89c23ef8d2c6b27fd200a42aa4ac72786e7c60d40efdc76e6011260b6e949c4", size = 233393, upload-time = "2026-04-02T09:27:34.03Z" },
{ url = "https://files.pythonhosted.org/packages/be/c1/06c6c49d5a5450f76899992f1ee40b41d076aee9279b49cf9974d2f313d5/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6c114670c45346afedc0d947faf3c7f701051d2518b943679c8ff88befe14f8e", size = 223251, upload-time = "2026-04-02T09:27:35.369Z" },
{ url = "https://files.pythonhosted.org/packages/2b/9f/f2ff16fb050946169e3e1f82134d107e5d4ae72647ec8a1b1446c148480f/charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:a180c5e59792af262bf263b21a3c49353f25945d8d9f70628e73de370d55e1e1", size = 206609, upload-time = "2026-04-02T09:27:36.661Z" },
{ url = "https://files.pythonhosted.org/packages/69/d5/a527c0cd8d64d2eab7459784fb4169a0ac76e5a6fc5237337982fd61347e/charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3c9a494bc5ec77d43cea229c4f6db1e4d8fe7e1bbffa8b6f0f0032430ff8ab44", size = 220014, upload-time = "2026-04-02T09:27:38.019Z" },
{ url = "https://files.pythonhosted.org/packages/7e/80/8a7b8104a3e203074dc9aa2c613d4b726c0e136bad1cc734594b02867972/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:8d828b6667a32a728a1ad1d93957cdf37489c57b97ae6c4de2860fa749b8fc1e", size = 218979, upload-time = "2026-04-02T09:27:39.37Z" },
{ url = "https://files.pythonhosted.org/packages/02/9a/b759b503d507f375b2b5c153e4d2ee0a75aa215b7f2489cf314f4541f2c0/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:cf1493cd8607bec4d8a7b9b004e699fcf8f9103a9284cc94962cb73d20f9d4a3", size = 209238, upload-time = "2026-04-02T09:27:40.722Z" },
{ url = "https://files.pythonhosted.org/packages/c2/4e/0f3f5d47b86bdb79256e7290b26ac847a2832d9a4033f7eb2cd4bcf4bb5b/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_ppc64le.whl", hash = "sha256:0c96c3b819b5c3e9e165495db84d41914d6894d55181d2d108cc1a69bfc9cce0", size = 236110, upload-time = "2026-04-02T09:27:42.33Z" },
{ url = "https://files.pythonhosted.org/packages/96/23/bce28734eb3ed2c91dcf93abeb8a5cf393a7b2749725030bb630e554fdd8/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:752a45dc4a6934060b3b0dab47e04edc3326575f82be64bc4fc293914566503e", size = 219824, upload-time = "2026-04-02T09:27:43.924Z" },
{ url = "https://files.pythonhosted.org/packages/2c/6f/6e897c6984cc4d41af319b077f2f600fc8214eb2fe2d6bcb79141b882400/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_s390x.whl", hash = "sha256:8778f0c7a52e56f75d12dae53ae320fae900a8b9b4164b981b9c5ce059cd1fcb", size = 233103, upload-time = "2026-04-02T09:27:45.348Z" },
{ url = "https://files.pythonhosted.org/packages/76/22/ef7bd0fe480a0ae9b656189ec00744b60933f68b4f42a7bb06589f6f576a/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:ce3412fbe1e31eb81ea42f4169ed94861c56e643189e1e75f0041f3fe7020abe", size = 225194, upload-time = "2026-04-02T09:27:46.706Z" },
{ url = "https://files.pythonhosted.org/packages/c5/a7/0e0ab3e0b5bc1219bd80a6a0d4d72ca74d9250cb2382b7c699c147e06017/charset_normalizer-3.4.7-cp314-cp314t-win32.whl", hash = "sha256:c03a41a8784091e67a39648f70c5f97b5b6a37f216896d44d2cdcb82615339a0", size = 159827, upload-time = "2026-04-02T09:27:48.053Z" },
{ url = "https://files.pythonhosted.org/packages/7a/1d/29d32e0fb40864b1f878c7f5a0b343ae676c6e2b271a2d55cc3a152391da/charset_normalizer-3.4.7-cp314-cp314t-win_amd64.whl", hash = "sha256:03853ed82eeebbce3c2abfdbc98c96dc205f32a79627688ac9a27370ea61a49c", size = 174168, upload-time = "2026-04-02T09:27:49.795Z" },
{ url = "https://files.pythonhosted.org/packages/de/32/d92444ad05c7a6e41fb2036749777c163baf7a0301a040cb672d6b2b1ae9/charset_normalizer-3.4.7-cp314-cp314t-win_arm64.whl", hash = "sha256:c35abb8bfff0185efac5878da64c45dafd2b37fb0383add1be155a763c1f083d", size = 153018, upload-time = "2026-04-02T09:27:51.116Z" },
{ url = "https://files.pythonhosted.org/packages/db/8f/61959034484a4a7c527811f4721e75d02d653a35afb0b6054474d8185d4c/charset_normalizer-3.4.7-py3-none-any.whl", hash = "sha256:3dce51d0f5e7951f8bb4900c257dad282f49190fdbebecd4ba99bcc41fef404d", size = 61958, upload-time = "2026-04-02T09:28:37.794Z" },
]
[[package]]
name = "click"
version = "8.3.2"
@ -408,22 +481,21 @@ wheels = [
[[package]]
name = "huggingface-hub"
version = "1.11.0"
version = "0.36.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock" },
{ name = "fsspec" },
{ name = "hf-xet", marker = "platform_machine == 'AMD64' or platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'arm64' or platform_machine == 'x86_64'" },
{ name = "httpx" },
{ name = "hf-xet", marker = "platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'arm64' or platform_machine == 'x86_64'" },
{ name = "packaging" },
{ name = "pyyaml" },
{ name = "requests" },
{ name = "tqdm" },
{ name = "typer" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/dc/89/e7aa12d8a6b9259bed10671abb25ae6fa437c0f88a86ecbf59617bae7759/huggingface_hub-1.11.0.tar.gz", hash = "sha256:15fb3713c7f9cdff7b808a94fd91664f661ab142796bb48c9cd9493e8d166278", size = 761749, upload-time = "2026-04-16T13:07:39.73Z" }
sdist = { url = "https://files.pythonhosted.org/packages/7c/b7/8cb61d2eece5fb05a83271da168186721c450eb74e3c31f7ef3169fa475b/huggingface_hub-0.36.2.tar.gz", hash = "sha256:1934304d2fb224f8afa3b87007d58501acfda9215b334eed53072dd5e815ff7a", size = 649782, upload-time = "2026-02-06T09:24:13.098Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/02/4f3f8997d1ea7fe0146b343e5e14bd065fa87af790d07e5576d31b31cc18/huggingface_hub-1.11.0-py3-none-any.whl", hash = "sha256:42a6de0afbfeb5e022222d36398f029679db4eb4778801aafda32257ae9131ab", size = 645499, upload-time = "2026-04-16T13:07:37.716Z" },
{ url = "https://files.pythonhosted.org/packages/a8/af/48ac8483240de756d2438c380746e7130d1c6f75802ef22f3c6d49982787/huggingface_hub-0.36.2-py3-none-any.whl", hash = "sha256:48f0c8eac16145dfce371e9d2d7772854a4f591bcb56c9cf548accf531d54270", size = 566395, upload-time = "2026-02-06T09:24:11.133Z" },
]
[[package]]
@ -494,8 +566,8 @@ requires-dist = [
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8" },
{ name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0.36" },
{ name = "surya-ocr", marker = "extra == 'ocr'", specifier = ">=0.9" },
{ name = "torch", marker = "extra == 'ocr'", specifier = ">=2.4" },
{ name = "surya-ocr", marker = "extra == 'ocr'", specifier = ">=0.17,<0.18" },
{ name = "torch", marker = "extra == 'ocr'", specifier = ">=2.7" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.32" },
]
provides-extras = ["ocr", "dev"]
@ -593,18 +665,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/68/a5/19d7aaa7e433713ffe881df33705925a196afb9532efc8475d26593921a6/mako-1.3.11-py3-none-any.whl", hash = "sha256:e372c6e333cf004aa736a15f425087ec977e1fcbd2966aae7f17c8dc1da27a77", size = 78503, upload-time = "2026-04-14T20:19:53.233Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[[package]]
name = "markupsafe"
version = "3.0.3"
@ -668,15 +728,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146, upload-time = "2025-09-27T18:37:28.327Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "mpmath"
version = "1.3.0"
@ -1434,16 +1485,18 @@ wheels = [
]
[[package]]
name = "rich"
version = "15.0.0"
name = "requests"
version = "2.33.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
{ name = "certifi" },
{ name = "charset-normalizer" },
{ name = "idna" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5f/a4/98b9c7c6428a668bf7e42ebb7c79d576a1c3c1e3ae2d47e674b468388871/requests-2.33.1.tar.gz", hash = "sha256:18817f8c57c6263968bc123d237e3b8b08ac046f5456bd1e307ee8f4250d3517", size = 134120, upload-time = "2026-03-30T16:09:15.531Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" },
{ url = "https://files.pythonhosted.org/packages/d7/8e/7540e8a2036f79a125c1d2ebadf69ed7901608859186c856fa0388ef4197/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a", size = 64947, upload-time = "2026-03-30T16:09:13.83Z" },
]
[[package]]
@ -1502,15 +1555,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/e3/c164c88b2e5ce7b24d667b9bd83589cf4f3520d97cad01534cd3c4f55fdb/setuptools-81.0.0-py3-none-any.whl", hash = "sha256:fdd925d5c5d9f62e4b74b30d6dd7828ce236fd6ed998a08d81de62ce5a6310d6", size = 1062021, upload-time = "2026-02-06T21:10:37.175Z" },
]
[[package]]
name = "shellingham"
version = "1.5.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/58/15/8b3609fd3830ef7b27b655beb4b4e9c62313a4e8da8c676e142cc210d58e/shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de", size = 10310, upload-time = "2023-10-24T04:13:40.426Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e0/f9/0595336914c5619e5f28a1fb793285925a8cd4b432c9da0a987836c7f822/shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686", size = 9755, upload-time = "2023-10-24T04:13:38.866Z" },
]
[[package]]
name = "six"
version = "1.17.0"
@ -1703,22 +1747,23 @@ wheels = [
[[package]]
name = "transformers"
version = "5.5.4"
version = "4.57.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock" },
{ name = "huggingface-hub" },
{ name = "numpy" },
{ name = "packaging" },
{ name = "pyyaml" },
{ name = "regex" },
{ name = "requests" },
{ name = "safetensors" },
{ name = "tokenizers" },
{ name = "tqdm" },
{ name = "typer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a5/1e/1e244ab2ab50a863e6b52cc55761910567fa532b69a6740f6e99c5fdbd98/transformers-5.5.4.tar.gz", hash = "sha256:2e67cadba81fc7608cc07c4dd54f524820bc3d95b1cabd0ef3db7733c4f8b82e", size = 8227649, upload-time = "2026-04-13T16:55:55.181Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c4/35/67252acc1b929dc88b6602e8c4a982e64f31e733b804c14bc24b47da35e6/transformers-4.57.6.tar.gz", hash = "sha256:55e44126ece9dc0a291521b7e5492b572e6ef2766338a610b9ab5afbb70689d3", size = 10134912, upload-time = "2026-01-16T10:38:39.284Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/fb/162a66789c65e5afa3b051309240c26bf37fbc8fea285b4546ae747995a2/transformers-5.5.4-py3-none-any.whl", hash = "sha256:0bd6281b82966fe5a7a16f553ea517a9db1dee6284d7cb224dfd88fc0dd1c167", size = 10236696, upload-time = "2026-04-13T16:55:51.497Z" },
{ url = "https://files.pythonhosted.org/packages/03/b8/e484ef633af3887baeeb4b6ad12743363af7cce68ae51e938e00aaa0529d/transformers-4.57.6-py3-none-any.whl", hash = "sha256:4c9e9de11333ddfe5114bc872c9f370509198acf0b87a832a0ab9458e2bd0550", size = 11993498, upload-time = "2026-01-16T10:38:31.289Z" },
]
[[package]]
@ -1738,21 +1783,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/56/6113c23ff46c00aae423333eb58b3e60bdfe9179d542781955a5e1514cb3/triton-3.6.0-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:46bd1c1af4b6704e554cad2eeb3b0a6513a980d470ccfa63189737340c7746a7", size = 188397994, upload-time = "2026-01-20T16:01:14.236Z" },
]
[[package]]
name = "typer"
version = "0.24.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "annotated-doc" },
{ name = "click" },
{ name = "rich" },
{ name = "shellingham" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f5/24/cb09efec5cc954f7f9b930bf8279447d24618bb6758d4f6adf2574c41780/typer-0.24.1.tar.gz", hash = "sha256:e39b4732d65fbdcde189ae76cf7cd48aeae72919dea1fdfc16593be016256b45", size = 118613, upload-time = "2026-02-21T16:54:40.609Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/91/48db081e7a63bb37284f9fbcefda7c44c277b18b0e13fbc36ea2335b71e6/typer-0.24.1-py3-none-any.whl", hash = "sha256:112c1f0ce578bfb4cab9ffdabc68f031416ebcc216536611ba21f04e9aa84c9e", size = 56085, upload-time = "2026-02-21T16:54:41.616Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
@ -1774,6 +1804,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" },
]
[[package]]
name = "urllib3"
version = "2.6.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
]
[[package]]
name = "uvicorn"
version = "0.44.0"