Compare commits

...

48 commits

Author SHA1 Message Date
f6934bdf2a chore(compose): pin project name to infoxtractor
All checks were successful
tests / test (push) Successful in 2m6s
Without `name:`, Compose infers the project from the parent directory
(`app/` on the server), so containers show up under an "app" stack in
the infra monitoring dashboard instead of "infoxtractor".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 19:57:16 +02:00
ce33aff174 chore: MVP deployed (#42)
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 12:08:21 +00:00
842c4da90c chore: MVP deployed — readme, AGENTS.md status, deploy runbook filled in
All checks were successful
tests / test (push) Successful in 1m16s
tests / test (pull_request) Successful in 1m12s
First deploy done 2026-04-18. E2E extraction of the bank_statement_header
use case completes in 35 s against the live service, with 7 of 9 header
fields provenance-verified + text-agreement-green. closing_balance
asserts from spec §12 all pass.

Updates:
- README.md: status -> "MVP deployed"; worked example curl snippet;
  pointers to deployment runbook + spec + plan.
- AGENTS.md: status line updated with the live URL + date.
- pyproject.toml: version comment referencing the first deploy.
- docs/deployment.md: "First deploy" section filled in with times,
  field-level extraction result, plus a log of every small Docker/ops
  follow-up PR that had to land to make the first deploy healthy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:08:07 +02:00
95a576f744 fix(genai): extract trailing JSON (#41)
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 12:05:46 +00:00
81e3b9a7d0 fix(genai): drop Ollama format flag; extract trailing JSON from response
All checks were successful
tests / test (push) Successful in 1m30s
tests / test (pull_request) Successful in 1m21s
qwen3:14b (and deepseek-r1, other reasoning models) wrap their output
in <think>…</think> chains-of-thought before emitting real output.
With format=json the constrained sampler terminated immediately at
`{}` because the thinking block wasn't valid JSON; without format the
model thinks normally and appends the actual JSON at the end.

OllamaClient now omits the format flag and extracts the outermost
balanced `{…}` block from the response (brace depth counter, string-
literal aware). Works for reasoning models, ```json``` code-fenced
outputs, and plain JSON alike.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:05:28 +02:00
763407ba1c fix(genai): schema in prompt (#40)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 12:02:38 +00:00
34f8268cd5 fix(genai): inject JSON schema into Ollama system prompt
All checks were successful
tests / test (push) Successful in 1m8s
tests / test (pull_request) Successful in 1m18s
format=json loose mode gives valid JSON but no shape — models default
to emitting {} when the system prompt doesn't list fields. Prepend a
schema-guidance system message with the full Pydantic schema (after
the existing null-branch sanitiser) so the model sees exactly what
shape to produce. Pydantic still validates on parse.

Unit tests updated to check the schema message is prepended without
disturbing the caller's own messages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 14:02:25 +02:00
9c73895318 fix(genai): ollama loose JSON (#39)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 11:59:18 +00:00
2efc4d1088 fix(genai): send format="json" (loose mode) to Ollama
All checks were successful
tests / test (push) Successful in 1m13s
tests / test (pull_request) Successful in 1m23s
Ollama 0.11.8 segfaults on any Pydantic-shaped structured-output schema
with $ref, anyOf, or pattern — confirmed on the deploy host with the
simplest MVP case (BankStatementHeader alone). The earlier null-stripping
sanitiser wasn't enough.

Switch to format="json", which is "emit valid JSON" mode. We're already
describing the exact JSON shape in the system prompt (via GenAIStep +
the use case's citation instruction appendix) and validating the
response body through Pydantic on parse — which raises IX_002_001 on
schema mismatch, exactly as before.

Stronger guarantees can come back later via a newer Ollama, an API
fix, or a different GenAIClient impl. None of that is needed for the
MVP to work end to end.

Unit tests: the sanitiser left in place (harmless, still tested). The
"happy path" test now asserts format == "json".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:59:04 +02:00
f6ce97d7fd fix(compose): persist model caches (#38)
All checks were successful
tests / test (push) Successful in 1m8s
2026-04-18 11:49:24 +00:00
9e33923f71 fix(compose): persist Surya + HF caches so rebuilds don't redownload models
All checks were successful
tests / test (push) Successful in 2m1s
tests / test (pull_request) Successful in 1m18s
First /healthz call on a fresh container triggers Surya to fetch the
text-recognition (1.34 GB) and detection (73 MB) models from HuggingFace.
Without a volume they land in the container fs and vanish on every
rebuild, which is every deploy.

Mount named volumes for /root/.cache/datalab (Surya) and
/root/.cache/huggingface. Rebuild now keeps /healthz warm.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:49:09 +02:00
65670af78f fix(genai): sanitise Optional for Ollama (#37)
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 11:48:43 +00:00
9cb62d69af fix(genai): strip null branches from anyOf before sending to Ollama
All checks were successful
tests / test (push) Successful in 1m33s
tests / test (pull_request) Successful in 4m29s
Ollama 0.11.8's llama.cpp structured-output implementation segfaults on
Pydantic v2's standard Optional pattern:

    {"anyOf": [{"type": "string"}, {"type": "null"}]}

Confirmed on the deploy host: /api/chat request with the MVP's
ProvenanceWrappedResponse schema crashed Ollama with SIGSEGV; the client
saw httpx RemoteProtocolError → IX_002_000.

New _sanitise_schema_for_ollama walks the schema recursively and drops
"type: null" branches from every anyOf. Single-branch unions are
inlined so sibling keys (default, title) survive. This only narrows
what the LLM is *told* it may emit; Pydantic still validates the real
response body against the original schema and accepts None for
Optional fields if they were absent or explicitly null.

Existing unit tests updated: the "happy path" test no longer pins the
format to `_Schema.model_json_schema()` verbatim — instead it asserts
the sanitisation effect on a known-Optional field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:48:26 +02:00
4c0746950e fix(deps): surya ^0.17 (#36)
All checks were successful
tests / test (push) Successful in 2m58s
2026-04-18 11:21:54 +00:00
a418969251 fix(deps): pin surya-ocr ^0.17 and drop cu124 index
All checks were successful
tests / test (push) Successful in 1m23s
tests / test (pull_request) Successful in 2m23s
Our client code imports surya.foundation (added in 0.17). The earlier
cu124 torch pin forced uv to downgrade surya to 0.14.1, which doesn't
have that module and depends on a transformers version that lacks
QuantizedCacheConfig. Net: ocr: fail at /healthz.

Drop the cu124 index pin. surya 0.17.1 needs torch >= 2.7, which the
default pypi torch (2.11) satisfies. The deploy host's CUDA 12.4
driver doesn't match torch 2.11's cu13 wheels, so CUDA init warns and
the GPU isn't available — torch + Surya transparently fall back to CPU.
Slower than GPU but correct for MVP. A host driver upgrade later will
unlock GPU with no code changes.

Unit suite stays green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:21:40 +02:00
fae8c3267f fix(deps): torch cu124 (#35)
All checks were successful
tests / test (push) Successful in 3m48s
2026-04-18 11:02:38 +00:00
d90117807b fix(deps): pin torch to the CUDA 12.4 wheel channel
All checks were successful
tests / test (push) Successful in 3m21s
tests / test (pull_request) Successful in 3m40s
The default pypi torch (2.11 as of lockfile) ships cu13 wheels, which
refuse to initialise against the deploy host's NVIDIA 12.4 driver
(UserWarning: "driver on your system is too old (found version 12040)").
/healthz reported ocr: fail because Surya couldn't pick up the GPU.

Use `tool.uv.sources` to route torch through PyTorch's cu124 index.
That pulls torch 2.6.0+cu124 (still satisfies surya-ocr >= 0.9). Lock
updated. transformers downgraded to 4.57.6, triton to 3.2.0 — all
compatible with surya and each other.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:02:26 +02:00
44c3428993 fix(deploy): network_mode: host (#34)
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 11:00:14 +00:00
c7dc40c51e fix(deploy): switch to network_mode: host — reach postgis + ollama on loopback
All checks were successful
tests / test (push) Successful in 1m12s
tests / test (pull_request) Successful in 1m10s
The shared postgis container is bound to 127.0.0.1 on the host (security
hardening, infrastructure §T12). Ollama is similarly LAN-hardened. The
previous `host.docker.internal + extra_hosts: host-gateway` approach
points at the bridge gateway IP, not loopback, so the container couldn't
reach either service.

Switch to `network_mode: host` (same pattern goldstein uses) and update
the default IX_POSTGRES_URL / IX_OLLAMA_URL to 127.0.0.1. Keep the GPU
reservation block; drop the now-meaningless ports: declaration (host mode
publishes directly).

AppConfig defaults + .env.example + test_config assertions + inline
docstring examples all follow.

Caught on fourth deploy attempt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 13:00:02 +02:00
39a6c10634 fix(compose): drop runtime: nvidia (#33)
All checks were successful
tests / test (push) Successful in 1m19s
2026-04-18 10:56:15 +00:00
9f793da778 fix(compose): drop runtime: nvidia — use deploy.resources.devices only
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
Docker on the deploy host doesn't register 'nvidia' as a named runtime
(modern nvidia-container-toolkit hooks via --gpus all / resources.devices
instead). Immich-ml on the same host uses only deploy.resources.devices
with driver: nvidia, which is enough. Drop the legacy runtime line.

Caught on third deploy attempt.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:56:03 +02:00
4802e086a0 fix(docker): README.md for hatchling (#32)
All checks were successful
tests / test (push) Successful in 2m27s
2026-04-18 10:42:41 +00:00
f54f0d317d fix(docker): include README.md in the uv sync COPY so hatchling finds it
All checks were successful
tests / test (push) Successful in 1m22s
tests / test (pull_request) Successful in 1m36s
pyproject.toml names README.md as the readme; hatchling validates that
file exists when uv sync resolves the editable install of the project
itself. Caught on second deploy build.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:42:29 +02:00
e6fcd5fc54 fix(docker): uv via standalone installer (#31)
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 10:33:11 +00:00
1c31444611 fix(docker): install uv via standalone installer (no system pip)
All checks were successful
tests / test (pull_request) Successful in 1m28s
tests / test (push) Successful in 1m19s
Python 3.12 from deadsnakes on Ubuntu 22.04 drops `distutils` from the
stdlib, and Ubuntu's system pip still imports from it — so `pip install`
fails immediately with ModuleNotFoundError: distutils. Switch to the uv
standalone installer, which doesn't need pip at all.

Caught during the first deploy build.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:32:55 +02:00
a9e510362d chore(model): qwen3:14b default (#30)
All checks were successful
tests / test (push) Successful in 2m55s
unblock first deploy
2026-04-18 10:20:38 +00:00
5ee74f367c chore(model): switch default IX_DEFAULT_MODEL to qwen3:14b (already on host)
All checks were successful
tests / test (push) Successful in 1m52s
tests / test (pull_request) Successful in 1m45s
The home server's Ollama doesn't have gpt-oss:20b pulled; qwen3:14b is
already there and is what mammon's chat agent uses. Switching the default
now so the first deploy passes the /healthz ollama probe without an extra
`ollama pull` step. The spec lists gpt-oss:20b as a concrete example;
qwen3:14b is equally on-prem and Ollama-structured-output-compatible.

Touched: AppConfig default, BankStatementHeader Request.default_model,
.env.example, setup_server.sh ollama-list check, AGENTS.md, deployment.md,
live tests. Unit tests that hard-coded the old model string but don't
assert the default were left alone.

Also: ASCII en-dash in e2e_smoke.py Paperless-style text (ruff RUF001).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:20:23 +02:00
f6cc99f062 feat(e2e): e2e_smoke.py deploy gate (#29)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.4.
2026-04-18 10:18:23 +00:00
d0648fe01d feat(e2e): scripts/e2e_smoke.py — live deploy gate
All checks were successful
tests / test (push) Successful in 1m11s
tests / test (pull_request) Successful in 2m14s
Runs from the Mac after every `git push server main`.

Flow: starts a tiny HTTP server on the Mac's LAN IP serving
tests/fixtures/synthetic_giro.pdf → POST /jobs with bank_statement_header
+ Paperless-style texts so text_agreement has something to check against →
poll GET /jobs/{id} until terminal → assert status=done, bank_name
non-empty, closing_balance.provenance_verified=True, text_agreement=True,
elapsed < 60 s. Non-zero exit blocks the deploy.

Uses only stdlib (http.server, urllib) — no extra deps on the Mac-side,
no test framework overhead.

Task 5.4 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:18:07 +02:00
5841bc09c0 feat(deploy): setup_server.sh + deployment runbook (#28)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.2.
2026-04-18 10:17:14 +00:00
6d1bc720b4 feat(deploy): setup_server.sh + deployment runbook
All checks were successful
tests / test (push) Successful in 1m9s
tests / test (pull_request) Successful in 1m10s
- scripts/setup_server.sh: idempotent one-shot. Creates bare repo,
  post-receive hook (which rebuilds docker compose + gates on /healthz),
  infoxtractor Postgres role + DB on the shared postgis container, .env
  (0600) from .env.example with the password substituted in, verifies
  gpt-oss:20b is pulled.
- docs/deployment.md: topology, one-time setup command, normal deploy
  workflow, rollback-via-revert pattern (never force-push main),
  operational checklists for the common /healthz degraded states.
- First deploy section reserved; filled in after Task 5.3 runs.

Task 5.2 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:16:58 +02:00
3c7d607776 feat(docker): Dockerfile + compose (#27)
Some checks are pending
tests / test (push) Waiting to run
Lands Task 5.1.
2026-04-18 10:15:45 +00:00
4646180942 feat(docker): Dockerfile (CUDA+python3.12) + compose with GPU reservation
All checks were successful
tests / test (push) Successful in 1m13s
tests / test (pull_request) Successful in 1m10s
- nvidia/cuda:12.4 runtime base matches the deploy host's driver stack
  (immich-ml / monitoring use the same pattern).
- python3.12 via deadsnakes (Ubuntu 22.04 ships 3.10 only).
- System deps: libmagic1 (python-magic), libgl1/libglib2 (PIL + PyMuPDF
  headless), curl (post-receive /healthz probe), ca-certs (httpx TLS).
- uv sync --frozen --no-dev --extra ocr installs prod + Surya/torch;
  dev tooling stays out of the image.
- CMD runs `alembic upgrade head && uvicorn ix.app:create_app` — idempotent.
- Compose: single service, port 8994, GPU reservation mirroring immich-ml,
  labels for monitoring dashboard auto-discovery + backup opt-in.
- host.docker.internal:host-gateway lets ix reach the host's Ollama and
  postgis containers (same pattern mammon uses).

Task 5.1 of MVP plan.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:15:26 +02:00
c234b67bbf Merge pull request 'feat(app): production wiring factories + /healthz real probes (Task 4.3)' (#26) from feat/production-wiring into main
All checks were successful
tests / test (push) Successful in 1m10s
2026-04-18 10:09:34 +00:00
ebefee4184 feat(app): production wiring — factories, pipeline, /healthz real probes
All checks were successful
tests / test (push) Successful in 1m9s
tests / test (pull_request) Successful in 1m13s
Task 4.3 closes the loop on Chunk 4: the FastAPI lifespan now selects
fake vs real clients via IX_TEST_MODE (new AppConfig field), wires
/healthz probes to the live selfcheck() on OllamaClient / SuryaOCRClient,
and spawns the worker with a production Pipeline factory that builds
SetupStep -> OCRStep -> GenAIStep -> ReliabilityStep -> ResponseHandler
over the injected clients.

Factories:
- make_genai_client(cfg) -> FakeGenAIClient | OllamaClient
- make_ocr_client(cfg)   -> FakeOCRClient  | SuryaOCRClient (spec §6.2)

Probes run the async selfcheck on a fresh event loop in a short-lived
thread so they're safe to call from either sync callers or a live
FastAPI handler without stalling the request loop.

Drops the worker-loop spawn_worker_task stub — the app module owns the
production spawn directly.

Tests: +11 unit tests (5 factories + 6 app-wiring / probe adapter /
pipeline build). Full suite: 236 passed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:09:11 +02:00
b737ed7b21 Merge pull request 'feat(ocr): SuryaOCRClient real OCR backend (spec 6.2)' (#25) from feat/surya-client into main
All checks were successful
tests / test (push) Successful in 1m10s
2026-04-18 10:04:41 +00:00
322f6b2b1b feat(ocr): SuryaOCRClient — real OCR backend (spec §6.2)
All checks were successful
tests / test (push) Successful in 1m14s
tests / test (pull_request) Successful in 1m14s
Runs Surya's detection + recognition over PIL images rendered from each
Page's source file (PDFs via PyMuPDF, images via Pillow). Lazy warm_up
so FastAPI lifespan start stays predictable. Deferred Surya/torch
imports keep the base install slim — the heavy deps stay under [ocr].

Extends OCRClient Protocol with optional files + page_metadata kwargs
so the engine can resolve each page back to its on-disk source; Fake
accepts-and-ignores to keep hermetic tests unchanged.

selfcheck() runs the predictors on a 1x1 PIL image — wired into /healthz
by Task 4.3.

Tests: 6 hermetic unit tests (Surya predictors mocked, no model
download); 2 live tests gated on IX_TEST_OLLAMA=1 (never run in CI).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 12:04:19 +02:00
0f045f814a Merge pull request 'feat(genai): OllamaClient structured-output /api/chat backend (spec 6)' (#24) from feat/ollama-client into main
All checks were successful
tests / test (push) Successful in 1m15s
2026-04-18 09:58:38 +00:00
90e46b707d feat(genai): OllamaClient — structured-output /api/chat backend (spec §6)
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m5s
Real GenAIClient for the production pipeline. Sends `format=<pydantic JSON
schema>`, `stream=false`, and mapped options (`temperature`; drops
`reasoning_effort`). Content-parts lists joined to a single string since
MVP models don't speak native content-parts. Error mapping per spec:
connection/timeout/5xx → IX_002_000, schema violations → IX_002_001.
`selfcheck()` probes /api/tags with a fixed 5 s timeout for /healthz.

Tests: 10 hermetic pytest-httpx unit tests; 2 live tests gated on
IX_TEST_OLLAMA=1 (never run in CI).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:58:15 +02:00
6183b9c886 Merge pull request 'feat(pg-queue): LISTEN ix_jobs_new + 10s fallback poll' (#23) from feat/pg-queue-adapter into main
All checks were successful
tests / test (push) Successful in 1m11s
2026-04-18 09:52:45 +00:00
050f80dcd7 feat(pg-queue): LISTEN ix_jobs_new + 10s fallback poll (spec §4)
All checks were successful
tests / test (push) Successful in 1m8s
tests / test (pull_request) Successful in 1m9s
PgQueueListener:
- Dedicated asyncpg connection outside the SQLAlchemy pool (LISTEN
  needs a persistent connection; pooled connections check in/out).
- Exposes wait_for_work(timeout) — resolves on NOTIFY or timeout,
  whichever fires first. The worker treats both wakes identically.
- asyncpg_dsn_from_sqlalchemy_url strips the +asyncpg driver segment
  and percent-decodes the password so the same URL in IX_POSTGRES_URL
  works for both SQLAlchemy and raw asyncpg.

app.py lifespan now also spawns the listener alongside the worker;
both are gated on spawn_worker=True so REST-only tests stay fast.

2 new integration tests: NOTIFY path (wake within 2 s despite 60 s
poll) + missed-NOTIFY path (fallback poll recovers within 5 s). 33
integration tests total, 209 unit. Forgejo Actions trigger is flaky;
local verification is the gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:52:26 +02:00
415e03fba1 Merge pull request 'feat(worker): async worker loop + one-shot callback delivery' (#22) from feat/worker-loop into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:50:11 +00:00
406a7ea2fd feat(worker): async worker loop + one-shot callback delivery (spec §5)
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m8s
Worker:
- Startup: sweep_orphans(now, max_running_seconds) rescues rows stuck
  in 'running' from a crashed prior process.
- Loop: claim_next_pending → build pipeline via injected factory → run
  → mark_done/mark_error → deliver callback if set → record outcome.
- Non-IX exceptions from the pipeline collapse to IX_002_000 so callers
  see a stable error code.
- Sleep loop uses a cancellable wait so the stop event reacts
  immediately; the wait_for_work hook is ready for Task 3.6 to plug in
  the LISTEN-driven event without the worker knowing about NOTIFY.

Callback:
- One-shot POST, 2xx → delivered, anything else (incl. connect/timeout
  exceptions) → failed. No retries.
- Callback record never reverts the job's terminal state — GET /jobs/{id}
  stays the authoritative fallback.

7 integration tests: happy path, pipeline-raise → error, callback 2xx,
callback 5xx, orphan sweep on startup, no-callback rows stay
callback_status=None (x2 via parametrize). Unit suite still 209.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:49:54 +02:00
ee023d6e34 Merge pull request 'feat(rest): FastAPI adapter + /jobs /healthz /metrics (spec 5)' (#21) from feat/rest-adapter into main
Some checks failed
tests / test (push) Has been cancelled
2026-04-18 09:47:35 +00:00
e46c44f1e0 feat(rest): FastAPI adapter + /jobs, /healthz, /metrics routes (spec §5)
All checks were successful
tests / test (push) Successful in 1m7s
tests / test (pull_request) Successful in 1m5s
Routes:
- POST /jobs: 201 on first insert, 200 on idempotent re-submit.
- GET /jobs/{id}: full Job envelope or 404.
- GET /jobs?client_id=&request_id=: correlation lookup or 404.
- GET /healthz: {postgres, ollama, ocr}; 200 iff all ok (degraded counts
  as non-200 per spec). Postgres probe guarded by a 2 s wait_for.
- GET /metrics: pending/running counts + 24h done/error counters +
  per-use-case avg seconds. Plain JSON, no Prometheus.

create_app(spawn_worker=bool) parameterises worker spawning so tests that
only need REST pass False. Worker spawn is tolerant of the loop module not
being importable yet (Task 3.5 fills it in).

Probes are a DI bundle — production wiring swaps them in at startup
(Chunk 4); tests inject canned ok/fail callables. Session factory is also
DI'd so tests can point at a per-loop engine and sidestep the async-pg
cross-loop future issue that bit the jobs_repo fixture.

9 new integration tests; unit suite unchanged. Forgejo Actions trigger is
flaky; local verification is the gate (unit + integration green locally).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:47:04 +02:00
04a415a191 Merge pull request 'feat(store): JobsRepo CRUD over ix_jobs + integration fixtures' (#20) from feat/jobs-repo into main
All checks were successful
tests / test (push) Successful in 1m14s
2026-04-18 09:43:28 +00:00
141153ffa7 feat(store): JobsRepo CRUD over ix_jobs + integration fixtures (spec §4)
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
JobsRepo covers the full job-lifecycle surface:

- insert_pending: idempotent on (client_id, request_id) via ON CONFLICT
  DO NOTHING + re-select; assigns a 16-hex ix_id.
- claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never
  double-dispatch a row.
- get / get_by_correlation: hydrates JSONB back through Pydantic.
- mark_done: done iff response.error is None, else error.
- mark_error: explicit convenience wrapper.
- update_callback_status: delivered | failed (no status transition).
- sweep_orphans: time-based rescue of stuck running rows; attempts++.

Integration fixtures (tests/integration/conftest.py):
- Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is
  set (unit suite stays runnable on a bare laptop).
- Alembic upgrade/downgrade runs in a subprocess so its internal
  asyncio.run() doesn't collide with pytest-asyncio's loop.
- Per-test engine + truncate so loops never cross and tests start clean.

15 integration tests against a live postgres:16, including SKIP LOCKED
concurrency + orphan sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:43:11 +02:00
8bb220ae43 Merge pull request 'feat(config): AppConfig + cached get_config()' (#19) from feat/config into main
All checks were successful
tests / test (push) Successful in 59s
2026-04-18 09:39:00 +00:00
46 changed files with 4506 additions and 97 deletions

View file

@ -4,11 +4,11 @@
# the Postgres password.
# --- Job store -----------------------------------------------------------
IX_POSTGRES_URL=postgresql+asyncpg://infoxtractor:<password>@host.docker.internal:5431/infoxtractor
IX_POSTGRES_URL=postgresql+asyncpg://infoxtractor:<password>@127.0.0.1:5431/infoxtractor
# --- LLM backend ---------------------------------------------------------
IX_OLLAMA_URL=http://host.docker.internal:11434
IX_DEFAULT_MODEL=gpt-oss:20b
IX_OLLAMA_URL=http://127.0.0.1:11434
IX_DEFAULT_MODEL=qwen3:14b
# --- OCR -----------------------------------------------------------------
IX_OCR_ENGINE=surya

1
.gitignore vendored
View file

@ -15,6 +15,7 @@ dist/
build/
*.log
/tmp/
.claude/
# uv
# uv.lock is committed intentionally for reproducible builds.

View file

@ -4,7 +4,7 @@ Async, on-prem, LLM-powered structured information extraction microservice. Give
Designed to be used by other on-prem services (e.g. mammon) as a reliable fallback / second opinion for format-specific deterministic parsers.
Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP spec will live at `docs/superpowers/specs/`.
Status: MVP deployed (2026-04-18) at `http://192.168.68.42:8994` — LAN only. Full reference spec at `docs/spec-core-pipeline.md`; MVP spec at `docs/superpowers/specs/2026-04-18-ix-mvp-design.md`; deploy runbook at `docs/deployment.md`.
## Guiding Principles
@ -25,7 +25,7 @@ Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP s
- **Language**: Python 3.12, asyncio
- **Web/REST**: FastAPI + uvicorn
- **OCR (pluggable)**: Surya OCR first (GPU, shares RTX 3090 with Ollama / Immich ML)
- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `gpt-oss:20b`, configurable per use case
- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `qwen3:14b`, configurable per use case
- **State**: Postgres on the shared `postgis` container (:5431), new `infoxtractor` database
- **Deployment**: Docker, `git push server main` → post-receive rebuild (pattern from other apps)

69
Dockerfile Normal file
View file

@ -0,0 +1,69 @@
# InfoXtractor container image.
#
# Base image ships CUDA 12.4 runtime libraries so the Surya OCR client can
# use the RTX 3090 on the deploy host. Ubuntu 22.04 is the LTS used across
# the home-server stack (immich-ml, monitoring) so GPU drivers line up.
FROM nvidia/cuda:12.4.0-runtime-ubuntu22.04
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1
# --- System deps --------------------------------------------------------
# - python3.12 via deadsnakes PPA (pinned; Ubuntu 22.04 ships 3.10 only)
# - libmagic1 : python-magic backend for MIME sniffing
# - libgl1 : libGL.so needed by Pillow/OpenCV wheels used by Surya
# - libglib2.0 : shared by Pillow/PyMuPDF headless rendering
# - curl : post-receive hook's /healthz probe & general ops
# - ca-certs : httpx TLS verification
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
software-properties-common \
ca-certificates \
curl \
gnupg \
&& add-apt-repository -y ppa:deadsnakes/ppa \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
python3.12 \
python3.12-venv \
python3.12-dev \
libmagic1 \
libgl1 \
libglib2.0-0 \
&& ln -sf /usr/bin/python3.12 /usr/local/bin/python \
&& ln -sf /usr/bin/python3.12 /usr/local/bin/python3 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# --- uv (dependency resolver used by the project) -----------------------
# Install via the standalone installer; avoids needing a working system pip
# (python3.12 on Ubuntu 22.04 has no `distutils`, which breaks Ubuntu pip).
RUN curl -LsSf https://astral.sh/uv/install.sh | sh \
&& ln -sf /root/.local/bin/uv /usr/local/bin/uv
WORKDIR /app
# Copy dependency manifests + README early so the heavy `uv sync` layer
# caches whenever only application code changes. README.md is required
# because pyproject.toml names it as the package's readme — hatchling
# validates it exists when resolving the editable install.
COPY pyproject.toml uv.lock .python-version README.md ./
# Prod + OCR extras, no dev tooling. --frozen means "must match uv.lock";
# CI catches drift before it reaches the image.
RUN uv sync --frozen --no-dev --extra ocr
# --- Application code ---------------------------------------------------
COPY src src
COPY alembic alembic
COPY alembic.ini ./
EXPOSE 8994
# Migrations are idempotent (alembic upgrade head is a no-op on a current
# DB) so running them on every start keeps the image + DB aligned without
# an extra orchestration step.
CMD ["sh", "-c", "uv run alembic upgrade head && uv run uvicorn ix.app:create_app --factory --host 0.0.0.0 --port 8994"]

View file

@ -4,10 +4,12 @@ Async, on-prem, LLM-powered structured information extraction microservice.
Given a document (PDF, image, text) and a named *use case*, ix returns a structured JSON result whose shape matches the use-case schema — together with per-field provenance (OCR segment IDs, bounding boxes, cross-OCR agreement flags) that let the caller decide how much to trust each extracted value.
**Status:** design phase. Implementation about to start.
**Status:** MVP deployed. Live on the home LAN at `http://192.168.68.42:8994`.
- Full reference spec: [`docs/spec-core-pipeline.md`](docs/spec-core-pipeline.md) (aspirational; MVP is a strict subset)
- **MVP design:** [`docs/superpowers/specs/2026-04-18-ix-mvp-design.md`](docs/superpowers/specs/2026-04-18-ix-mvp-design.md)
- **Implementation plan:** [`docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md`](docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md)
- **Deployment runbook:** [`docs/deployment.md`](docs/deployment.md)
- Agent / development notes: [`AGENTS.md`](AGENTS.md)
## Principles
@ -15,3 +17,44 @@ Given a document (PDF, image, text) and a named *use case*, ix returns a structu
- **On-prem always.** LLM = Ollama, OCR = local engines (Surya first). No OpenAI / Anthropic / Azure / AWS / cloud.
- **Grounded extraction, not DB truth.** ix returns best-effort fields + provenance; the caller decides what to trust.
- **Transport-agnostic pipeline core.** REST + Postgres-queue adapters in parallel on one job store.
## Submitting a job
```bash
curl -X POST http://192.168.68.42:8994/jobs \
-H "Content-Type: application/json" \
-d '{
"use_case": "bank_statement_header",
"ix_client_id": "mammon",
"request_id": "some-correlation-id",
"context": {
"files": [{
"url": "http://paperless.local/api/documents/42/download/",
"headers": {"Authorization": "Token …"}
}],
"texts": ["<Paperless Tesseract OCR content>"]
}
}'
# → {"job_id":"…","ix_id":"…","status":"pending"}
```
Poll `GET /jobs/{job_id}` until `status` is `done` or `error`. Optionally pass `callback_url` to receive a webhook on completion (one-shot, no retry; polling stays authoritative).
Full REST surface + provenance response shape documented in the MVP design spec.
## Running locally
```bash
uv sync --extra dev
uv run pytest tests/unit -v # hermetic unit + integration suite
IX_TEST_OLLAMA=1 uv run pytest tests/live -v # needs LAN access to Ollama + GPU
```
## Deploying
```bash
git push server main # rebuilds Docker image, restarts container, /healthz deploy gate
python scripts/e2e_smoke.py # E2E acceptance against the live service
```
See [`docs/deployment.md`](docs/deployment.md) for full runbook + rollback.

42
docker-compose.yml Normal file
View file

@ -0,0 +1,42 @@
# InfoXtractor Docker Compose stack.
#
# Single service. Uses host networking so the container can reach:
# - Ollama at 127.0.0.1:11434
# - postgis at 127.0.0.1:5431 (bound to loopback only; security hardening)
# Both services are LAN-hardened on the host and never exposed publicly,
# so host-network access stays on-prem. This matches the `goldstein`
# container pattern on the same server.
#
# The GPU reservation block matches immich-ml / the shape Docker Compose
# expects for GPU allocation on this host.
name: infoxtractor
services:
infoxtractor:
build: .
container_name: infoxtractor
network_mode: host
restart: always
env_file: .env
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
volumes:
# Persist Surya (datalab) + HuggingFace model caches so rebuilds don't
# re-download ~1.5 GB of weights every time.
- ix_surya_cache:/root/.cache/datalab
- ix_hf_cache:/root/.cache/huggingface
labels:
infrastructure.web_url: "http://192.168.68.42:8994"
backup.enable: "true"
backup.type: "postgres"
backup.name: "infoxtractor"
volumes:
ix_surya_cache:
ix_hf_cache:

153
docs/deployment.md Normal file
View file

@ -0,0 +1,153 @@
# Deployment
On-prem deploy to `192.168.68.42`. Push-to-deploy via a bare git repo + `post-receive` hook that rebuilds the Docker Compose stack. Pattern mirrors mammon and unified_messaging.
## Topology
```
Mac (dev)
│ git push server main
192.168.68.42:/home/server/Public/infoxtractor/repos.git (bare)
│ post-receive → GIT_WORK_TREE=/…/app git checkout -f main
│ docker compose up -d --build
│ curl /healthz (60 s gate)
Docker container `infoxtractor` (port 8994)
├─ 127.0.0.1:11434 → Ollama (qwen3:14b; host-network mode)
└─ 127.0.0.1:5431 → postgis (database `infoxtractor`; host-network mode)
```
## One-time server setup
Run **once** from the Mac. Idempotent.
```bash
export IX_POSTGRES_PASSWORD=<generate-a-strong-one>
./scripts/setup_server.sh
```
The script:
1. Creates `/home/server/Public/infoxtractor/repos.git` (bare) + `/home/server/Public/infoxtractor/app/` (worktree).
2. Installs the `post-receive` hook (see `scripts/setup_server.sh` for the template).
3. Creates the `infoxtractor` Postgres role + database on the shared `postgis` container.
4. Writes `/home/server/Public/infoxtractor/app/.env` (mode 0600) from `.env.example` with the password substituted in.
5. Verifies `qwen3:14b` is pulled in Ollama.
6. Prints a hint to open UFW for port 8994 on the LAN subnet if it's missing.
After the script finishes, add the deploy remote to the local repo:
```bash
git remote add server ssh://server@192.168.68.42/home/server/Public/infoxtractor/repos.git
```
## Normal deploy workflow
```bash
# after merging a feat branch into main
git push server main
# tail the server's deploy log
ssh server@192.168.68.42 "tail -f /tmp/infoxtractor-deploy.log"
# healthz gate (the post-receive hook also waits up to 60 s for this)
curl http://192.168.68.42:8994/healthz
# end-to-end smoke — this IS the real acceptance test
python scripts/e2e_smoke.py
```
If the post-receive hook exits non-zero (healthz never reaches 200), the deploy is considered failed. The previous container keeps running (the hook swaps via `docker compose up -d --build`, which first builds the new image and only swaps if the build succeeds; if the new container fails `/healthz`, it's still up but broken). Investigate with `docker compose logs --tail 200` in `${APP_DIR}` and either fix forward or revert (see below).
## Rollback
Never force-push `main`. Rollbacks happen as **forward commits** via `git revert`:
```bash
git revert HEAD # creates a revert commit for the last change
git push forgejo main
git push server main
```
## First deploy
- **Date:** 2026-04-18
- **Commit:** `fix/ollama-extract-json` (#36, the last of several Docker/ops follow-ups after PR #27 shipped the initial Dockerfile)
- **`/healthz`:** all three probes (`postgres`, `ollama`, `ocr`) green. First-pass took ~7 min for the fresh container because Surya's recognition (1.34 GB) + detection (73 MB) models download from HuggingFace on first run; subsequent rebuilds reuse the named volumes declared in `docker-compose.yml` and come up in <30 s.
- **E2E extraction:** `bank_statement_header` against `tests/fixtures/synthetic_giro.pdf` with Paperless-style texts:
- Pipeline completes in **35 s**.
- Extracted: `bank_name=DKB`, `account_iban=DE89370400440532013000`, `currency=EUR`, `opening_balance=1234.56`, `closing_balance=1450.22`, `statement_date=2026-03-31`, `statement_period_end=2026-03-31`, `statement_period_start=2026-03-01`, `account_type=null`.
- Provenance: 8 / 9 leaf fields have sources; 7 / 8 `provenance_verified` and `text_agreement` are True. `statement_period_start` shows up in the OCR but normalisation fails (dateutil picks a different interpretation of the cited day); to be chased in a follow-up.
### Docker-ops follow-ups that landed during the first deploy
All small, each merged as its own PR. In commit order after the scaffold (#27):
- **#31** `fix(docker): uv via standalone installer` — Python 3.12 on Ubuntu 22.04 drops `distutils`; Ubuntu's pip needed it. Switched to the `uv` standalone installer, which has no pip dependency.
- **#32** `fix(docker): include README.md in the uv sync COPY``hatchling` validates the readme file exists when resolving the editable project install.
- **#33** `fix(compose): drop runtime: nvidia` — the deploy host's Docker daemon doesn't register a named `nvidia` runtime; `deploy.resources.devices` is sufficient and matches immich-ml.
- **#34** `fix(deploy): network_mode: host``postgis` is bound to `127.0.0.1` on the host (security hardening T12). `host.docker.internal` points at the bridge gateway, not loopback, so the container couldn't reach postgis. Goldstein uses the same pattern.
- **#35** `fix(deps): pin surya-ocr ^0.17` — earlier cu124 torch pin had forced surya to 0.14.1, which breaks our `surya.foundation` import and needs a transformers version that lacks `QuantizedCacheConfig`.
- **#36** `fix(genai): drop Ollama format flag; extract trailing JSON` — Ollama 0.11.8 segfaults on Pydantic JSON Schemas (`$ref`, `anyOf`, `pattern`), and `format="json"` terminates reasoning models (qwen3) at `{}` because their `<think>…</think>` chain-of-thought isn't valid JSON. Omit the flag, inject the schema into the system prompt, extract the outermost `{…}` balanced block from the response.
- **volumes** — named `ix_surya_cache` + `ix_hf_cache` mount `/root/.cache/datalab` + `/root/.cache/huggingface` so rebuilds don't re-download ~1.5 GB of model weights.
Production notes:
- `IX_DEFAULT_MODEL=qwen3:14b` (already pulled on the host). Spec listed `gpt-oss:20b` as a concrete example; swapped to keep the deploy on-prem without an extra `ollama pull`.
- Torch 2.11 default cu13 wheels fall back to CPU against the host's CUDA 12.4 driver — Surya runs on CPU. Expected inference times: seconds per page. Upgrading the NVIDIA driver (or pinning a cu12-compatible torch wheel newer than 2.7) will unlock GPU with no code changes.
## E2E smoke test (`scripts/e2e_smoke.py`)
What it does (from the Mac):
1. Checks `/healthz`.
2. Starts a tiny HTTP server on the Mac's LAN IP serving `tests/fixtures/synthetic_giro.pdf`.
3. Submits a `POST /jobs` with `use_case=bank_statement_header`, the fixture URL in `context.files`, and a Paperless-style OCR text in `context.texts` (to exercise the `text_agreement` cross-check).
4. Polls `GET /jobs/{id}` every 2 s until terminal or 120 s timeout.
5. Asserts: `status=="done"`, `bank_name` non-empty, `provenance.fields["result.closing_balance"].provenance_verified=True`, `text_agreement=True`, total elapsed `< 60s`.
Non-zero exit means the deploy is not healthy. Roll back via `git revert HEAD`.
## Operational checklists
### After `ollama pull` on the host
The `IX_DEFAULT_MODEL` env var on the server's `.env` must match something in `ollama list`. Changing the default means:
1. Edit `/home/server/Public/infoxtractor/app/.env``IX_DEFAULT_MODEL=<new>`.
2. `docker compose --project-directory /home/server/Public/infoxtractor/app restart`.
3. `curl http://192.168.68.42:8994/healthz` → confirm `ollama: ok`.
### If `/healthz` shows `ollama: degraded`
`qwen3:14b` (or the configured default) is not pulled. On the host:
```bash
ssh server@192.168.68.42 "docker exec ollama ollama pull qwen3:14b"
```
### If `/healthz` shows `ocr: fail`
Surya couldn't initialize (model missing, CUDA unavailable, OOM). First run can be slow — models download on first call. Check container logs:
```bash
ssh server@192.168.68.42 "docker logs infoxtractor --tail 200"
```
### If the container fails to start
```bash
ssh server@192.168.68.42 "tail -100 /tmp/infoxtractor-deploy.log"
ssh server@192.168.68.42 "docker compose -f /home/server/Public/infoxtractor/app/docker-compose.yml logs --tail 200"
```
## Monitoring
- Monitoring dashboard auto-discovers via the `infrastructure.web_url` label on the container: `http://192.168.68.42:8001` → "infoxtractor" card.
- Backup opt-in via `backup.enable=true` + `backup.type=postgres` + `backup.name=infoxtractor` labels. The daily backup script picks up the `infoxtractor` Postgres database automatically.
## Ports
| Port | Direction | Source | Service |
|------|-----------|--------|---------|
| 8994/tcp | ALLOW | 192.168.68.0/24 | ix REST + healthz (LAN only; not publicly exposed) |
No VPS Caddy entry; no `infrastructure.docs_url` label — this is an internal service.

View file

@ -1,6 +1,8 @@
[project]
name = "infoxtractor"
version = "0.1.0"
# Released 2026-04-18 with the first live deploy of the MVP. See
# docs/deployment.md §"First deploy" for the commit + /healthz times.
description = "Async on-prem LLM-powered structured information extraction microservice"
readme = "README.md"
requires-python = ">=3.12"
@ -31,10 +33,12 @@ dependencies = [
[project.optional-dependencies]
ocr = [
# Real OCR engine — pulls torch + CUDA wheels. Kept optional so CI
# (no GPU) can install the base package without the model deps.
"surya-ocr>=0.9",
"torch>=2.4",
# Real OCR engine. Kept optional so CI (no GPU) can install the base
# package without the model deps.
# surya >= 0.17 is required: the client code uses the
# `surya.foundation` module, which older releases don't expose.
"surya-ocr>=0.17,<0.18",
"torch>=2.7",
]
dev = [
"pytest>=8.3",
@ -44,6 +48,11 @@ dev = [
"mypy>=1.13",
]
# Note: the default pypi torch ships cu13 wheels, which emit a
# UserWarning and fall back to CPU against the deploy host's CUDA 12.4
# driver. Surya then runs on CPU — slower but correct for MVP. A future
# driver upgrade unlocks GPU Surya with no code changes.
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

210
scripts/e2e_smoke.py Executable file
View file

@ -0,0 +1,210 @@
"""End-to-end smoke test against the deployed infoxtractor service.
Uploads a synthetic bank-statement fixture, polls for completion, and asserts
the provenance flags per spec §12 E2E. Intended to run from the Mac after
every `git push server main` as the deploy gate.
Prerequisites:
- The service is running and reachable at --base-url (default
http://192.168.68.42:8994).
- The fixture `tests/fixtures/synthetic_giro.pdf` is present.
- The Mac and the server are on the same LAN (the server must be able to
reach the Mac to download the fixture).
Exit codes:
0 all assertions passed within the timeout
1 at least one assertion failed
2 the job never reached a terminal state in time
3 the service was unreachable or returned an unexpected error
Usage:
python scripts/e2e_smoke.py
python scripts/e2e_smoke.py --base-url http://localhost:8994
"""
from __future__ import annotations
import argparse
import http.server
import json
import socket
import socketserver
import sys
import threading
import time
import urllib.error
import urllib.request
import uuid
from pathlib import Path
DEFAULT_BASE_URL = "http://192.168.68.42:8994"
FIXTURE = Path(__file__).parent.parent / "tests" / "fixtures" / "synthetic_giro.pdf"
TIMEOUT_SECONDS = 120
POLL_INTERVAL_SECONDS = 2
def find_lan_ip() -> str:
"""Return the Mac's LAN IP that the server can reach."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# 192.168.68.42 is the server; getting the default route towards it
# yields the NIC with the matching subnet.
s.connect(("192.168.68.42", 80))
return s.getsockname()[0]
finally:
s.close()
def serve_fixture_in_background(fixture: Path) -> tuple[str, threading.Event]:
"""Serve the fixture on a temporary HTTP server; return the URL and a stop event."""
if not fixture.exists():
print(f"FIXTURE MISSING: {fixture}", file=sys.stderr)
sys.exit(3)
directory = fixture.parent
filename = fixture.name
lan_ip = find_lan_ip()
class Handler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
def log_message(self, format: str, *args) -> None: # quiet
pass
# Pick any free port.
httpd = socketserver.TCPServer((lan_ip, 0), Handler)
port = httpd.server_address[1]
url = f"http://{lan_ip}:{port}/{filename}"
stop = threading.Event()
def _serve():
try:
while not stop.is_set():
httpd.handle_request()
finally:
httpd.server_close()
# Run in a thread. Use a loose timeout so handle_request returns when stop is set.
httpd.timeout = 0.5
t = threading.Thread(target=_serve, daemon=True)
t.start()
return url, stop
def post_job(base_url: str, file_url: str, client_id: str, request_id: str) -> dict:
# Include a Paperless-style OCR of the fixture as context.texts so the
# text_agreement cross-check has something to compare against.
paperless_text = (
"DKB\n"
"DE89370400440532013000\n"
"Statement period: 01.03.2026 - 31.03.2026\n"
"Opening balance: 1234.56 EUR\n"
"Closing balance: 1450.22 EUR\n"
"31.03.2026\n"
)
payload = {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {
"files": [file_url],
"texts": [paperless_text],
},
}
req = urllib.request.Request(
f"{base_url}/jobs",
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def get_job(base_url: str, job_id: str) -> dict:
req = urllib.request.Request(f"{base_url}/jobs/{job_id}")
with urllib.request.urlopen(req, timeout=10) as resp:
return json.loads(resp.read().decode("utf-8"))
def main() -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--base-url", default=DEFAULT_BASE_URL)
parser.add_argument("--timeout", type=int, default=TIMEOUT_SECONDS)
args = parser.parse_args()
# Sanity-check the service is up.
try:
with urllib.request.urlopen(f"{args.base_url}/healthz", timeout=5) as resp:
health = json.loads(resp.read().decode("utf-8"))
print(f"healthz: {health}")
except urllib.error.URLError as e:
print(f"service unreachable: {e}", file=sys.stderr)
return 3
fixture_url, stop_server = serve_fixture_in_background(FIXTURE)
print(f"serving fixture at {fixture_url}")
try:
client_id = "e2e_smoke"
request_id = f"smoke-{uuid.uuid4().hex[:8]}"
submit = post_job(args.base_url, fixture_url, client_id, request_id)
job_id = submit["job_id"]
print(f"submitted job_id={job_id}")
started = time.monotonic()
last_status = None
job = None
while time.monotonic() - started < args.timeout:
job = get_job(args.base_url, job_id)
if job["status"] != last_status:
print(f"[{time.monotonic() - started:5.1f}s] status={job['status']}")
last_status = job["status"]
if job["status"] in ("done", "error"):
break
time.sleep(POLL_INTERVAL_SECONDS)
else:
print(f"FAIL: timed out after {args.timeout}s", file=sys.stderr)
return 2
assert job is not None
failed = []
if job["status"] != "done":
failed.append(f"status={job['status']!r} (want 'done')")
response = job.get("response") or {}
if response.get("error"):
failed.append(f"response.error={response['error']!r}")
result = (response.get("ix_result") or {}).get("result") or {}
bank = result.get("bank_name")
if not isinstance(bank, str) or not bank.strip():
failed.append(f"bank_name={bank!r} (want non-empty string)")
fields = (response.get("provenance") or {}).get("fields") or {}
closing = fields.get("result.closing_balance") or {}
if not closing.get("provenance_verified"):
failed.append(f"closing_balance.provenance_verified={closing.get('provenance_verified')!r}")
if closing.get("text_agreement") is not True:
failed.append(f"closing_balance.text_agreement={closing.get('text_agreement')!r} (Paperless-style text submitted)")
elapsed = time.monotonic() - started
if elapsed >= 60:
failed.append(f"elapsed={elapsed:.1f}s (≥ 60s; slow path)")
print(json.dumps(result, indent=2, default=str))
if failed:
print("\n".join(f"FAIL: {f}" for f in failed), file=sys.stderr)
return 1
print(f"\nPASS in {elapsed:.1f}s")
return 0
finally:
stop_server.set()
if __name__ == "__main__":
sys.exit(main())

127
scripts/setup_server.sh Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env bash
# One-shot server setup for InfoXtractor. Idempotent: safe to re-run.
#
# Run from the Mac:
# IX_POSTGRES_PASSWORD=<pw> ./scripts/setup_server.sh
#
# What it does on 192.168.68.42:
# 1. Creates the bare git repo `/home/server/Public/infoxtractor/repos.git` if missing.
# 2. Writes the post-receive hook (or updates it) and makes it executable.
# 3. Creates the Postgres role + database on the shared `postgis` container.
# 4. Writes `/home/server/Public/infoxtractor/app/.env` (0600) from .env.example.
# 5. Verifies `qwen3:14b` is pulled in Ollama.
set -euo pipefail
SERVER="${IX_SERVER:-server@192.168.68.42}"
APP_BASE="/home/server/Public/infoxtractor"
REPOS_GIT="${APP_BASE}/repos.git"
APP_DIR="${APP_BASE}/app"
DB_NAME="infoxtractor"
DB_USER="infoxtractor"
if [ -z "${IX_POSTGRES_PASSWORD:-}" ]; then
read -r -s -p "Postgres password for role '${DB_USER}': " IX_POSTGRES_PASSWORD
echo
fi
if [ -z "${IX_POSTGRES_PASSWORD}" ]; then
echo "IX_POSTGRES_PASSWORD is required." >&2
exit 1
fi
echo "==> 1/5 Ensuring bare repo + post-receive hook on ${SERVER}"
ssh "${SERVER}" bash -s <<EOF
set -euo pipefail
mkdir -p "${REPOS_GIT}" "${APP_DIR}"
if [ ! -f "${REPOS_GIT}/HEAD" ]; then
git init --bare "${REPOS_GIT}"
fi
cat >"${REPOS_GIT}/hooks/post-receive" <<'HOOK'
#!/usr/bin/env bash
set -eo pipefail
APP_DIR="${APP_DIR}"
LOG="/tmp/infoxtractor-deploy.log"
echo "[\$(date -u '+%FT%TZ')] post-receive start" >> "\$LOG"
mkdir -p "\$APP_DIR"
GIT_WORK_TREE="\$APP_DIR" git --git-dir="${REPOS_GIT}" checkout -f main >> "\$LOG" 2>&1
cd "\$APP_DIR"
docker compose up -d --build >> "\$LOG" 2>&1
# Deploy gate: /healthz must return 200 within 60 s.
for i in \$(seq 1 30); do
if curl -fsS http://localhost:8994/healthz > /dev/null 2>&1; then
echo "[\$(date -u '+%FT%TZ')] healthz OK" >> "\$LOG"
exit 0
fi
sleep 2
done
echo "[\$(date -u '+%FT%TZ')] healthz never reached OK" >> "\$LOG"
docker compose logs --tail 100 >> "\$LOG" 2>&1 || true
exit 1
HOOK
chmod +x "${REPOS_GIT}/hooks/post-receive"
EOF
echo "==> 2/5 Verifying Ollama has qwen3:14b pulled"
if ! ssh "${SERVER}" "docker exec ollama ollama list | awk '{print \$1}' | grep -qx 'qwen3:14b'"; then
echo "FAIL: qwen3:14b not found in Ollama. Run: ssh ${SERVER} 'docker exec ollama ollama pull qwen3:14b'" >&2
exit 1
fi
echo "==> 3/5 Creating Postgres role '${DB_USER}' and database '${DB_NAME}' on postgis container"
# Idempotent via DO blocks; uses docker exec to avoid needing psql on the host.
ssh "${SERVER}" bash -s <<EOF
set -euo pipefail
docker exec -i postgis psql -U postgres <<SQL
DO \\\$\\\$
BEGIN
IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = '${DB_USER}') THEN
CREATE ROLE ${DB_USER} LOGIN PASSWORD '${IX_POSTGRES_PASSWORD}';
ELSE
ALTER ROLE ${DB_USER} WITH PASSWORD '${IX_POSTGRES_PASSWORD}';
END IF;
END
\\\$\\\$;
SQL
if ! docker exec -i postgis psql -U postgres -tc "SELECT 1 FROM pg_database WHERE datname = '${DB_NAME}'" | grep -q 1; then
docker exec -i postgis createdb -U postgres -O ${DB_USER} ${DB_NAME}
fi
EOF
echo "==> 4/5 Writing ${APP_DIR}/.env on the server"
# Render .env from the repo's .env.example, substituting the password placeholder.
LOCAL_ENV_CONTENT="$(
sed "s#<password>#${IX_POSTGRES_PASSWORD}#g" \
"$(dirname "$0")/../.env.example"
)"
# Append the IX_TEST_MODE=production for safety (fake mode stays off).
# .env is written atomically and permissioned 0600.
ssh "${SERVER}" "install -d -m 0755 '${APP_DIR}' && cat > '${APP_DIR}/.env' <<'ENVEOF'
${LOCAL_ENV_CONTENT}
ENVEOF
chmod 0600 '${APP_DIR}/.env'"
echo "==> 5/5 Checking UFW rule for port 8994 (LAN only)"
ssh "${SERVER}" "sudo ufw status numbered | grep -F 8994" >/dev/null 2>&1 || {
echo "NOTE: UFW doesn't yet allow 8994. Run on the server:"
echo " sudo ufw allow from 192.168.68.0/24 to any port 8994 proto tcp"
}
echo
echo "Done."
echo
echo "Next steps (on the Mac):"
echo " git remote add server ssh://server@192.168.68.42${REPOS_GIT}"
echo " git push server main"
echo " ssh ${SERVER} 'tail -f /tmp/infoxtractor-deploy.log'"
echo " curl http://192.168.68.42:8994/healthz"
echo " python scripts/e2e_smoke.py"

View file

@ -0,0 +1,6 @@
"""Transport adapters — REST (always on) + pg_queue (optional).
Adapters are thin: they marshal external events into :class:`RequestIX`
payloads that land in ``ix_jobs`` as pending rows, and they read back from
the same store. They do NOT run the pipeline themselves; the worker does.
"""

View file

@ -0,0 +1,15 @@
"""Postgres queue adapter — ``LISTEN ix_jobs_new`` + 10 s fallback poll.
This is a secondary transport: a direct-SQL writer can insert a row and
``NOTIFY ix_jobs_new, '<job_id>'`` and the worker wakes up within the roundtrip
time rather than the 10 s fallback poll. The REST adapter doesn't need the
listener because the worker is already running in-process; this exists for
external callers who bypass the REST API.
"""
from ix.adapters.pg_queue.listener import (
PgQueueListener,
asyncpg_dsn_from_sqlalchemy_url,
)
__all__ = ["PgQueueListener", "asyncpg_dsn_from_sqlalchemy_url"]

View file

@ -0,0 +1,111 @@
"""Dedicated asyncpg connection that LISTENs to ``ix_jobs_new``.
We hold the connection *outside* the SQLAlchemy pool because SQLAlchemy's
asyncpg dialect doesn't expose the raw connection in a way that survives
the pool's checkout/checkin dance, and LISTEN needs a connection that
stays open for the full session to receive asynchronous notifications.
The adapter contract the worker sees is a single coroutine-factory,
``wait_for_work(poll_seconds)``, which completes either when a NOTIFY
arrives or when ``poll_seconds`` elapse. The worker doesn't care which
woke it it just goes back to its claim query.
"""
from __future__ import annotations
import asyncio
from urllib.parse import unquote, urlparse
import asyncpg
def asyncpg_dsn_from_sqlalchemy_url(url: str) -> str:
"""Strip the SQLAlchemy ``postgresql+asyncpg://`` prefix for raw asyncpg.
asyncpg's connect() expects the plain ``postgres://user:pass@host/db``
shape; the ``+driver`` segment SQLAlchemy adds confuses it. We also
percent-decode the password asyncpg accepts the raw form but not the
pre-encoded ``%21`` passwords we sometimes use in dev.
"""
parsed = urlparse(url)
scheme = parsed.scheme.split("+", 1)[0]
user = unquote(parsed.username) if parsed.username else ""
password = unquote(parsed.password) if parsed.password else ""
auth = ""
if user:
auth = f"{user}"
if password:
auth += f":{password}"
auth += "@"
netloc = parsed.hostname or ""
if parsed.port:
netloc += f":{parsed.port}"
return f"{scheme}://{auth}{netloc}{parsed.path}"
class PgQueueListener:
"""Long-lived asyncpg connection that sets an event on each NOTIFY.
The worker uses :meth:`wait_for_work` as its ``wait_for_work`` hook:
one call resolves when either a NOTIFY is received OR ``timeout``
seconds elapse, whichever comes first. The event is cleared after each
resolution so subsequent waits don't see stale state.
"""
def __init__(self, dsn: str, channel: str = "ix_jobs_new") -> None:
self._dsn = dsn
self._channel = channel
self._conn: asyncpg.Connection | None = None
self._event = asyncio.Event()
# Protect add_listener / remove_listener against concurrent
# start/stop — shouldn't happen in practice but a stray double-stop
# from a lifespan shutdown shouldn't raise ``listener not found``.
self._lock = asyncio.Lock()
async def start(self) -> None:
async with self._lock:
if self._conn is not None:
return
self._conn = await asyncpg.connect(self._dsn)
await self._conn.add_listener(self._channel, self._on_notify)
async def stop(self) -> None:
async with self._lock:
if self._conn is None:
return
try:
await self._conn.remove_listener(self._channel, self._on_notify)
finally:
await self._conn.close()
self._conn = None
def _on_notify(
self,
connection: asyncpg.Connection,
pid: int,
channel: str,
payload: str,
) -> None:
"""asyncpg listener callback — signals the waiter."""
# We don't care about payload/pid/channel — any NOTIFY on our
# channel means "go check for pending rows". Keep the body tiny so
# asyncpg's single dispatch loop stays snappy.
self._event.set()
async def wait_for_work(self, timeout: float) -> None:
"""Resolve when a NOTIFY arrives or ``timeout`` seconds pass.
We wait on the event with a timeout. ``asyncio.wait_for`` raises
:class:`asyncio.TimeoutError` on expiry; we swallow it because the
worker treats "either signal" identically. The event is cleared
after every wait so the next call starts fresh.
"""
try:
await asyncio.wait_for(self._event.wait(), timeout=timeout)
except TimeoutError:
pass
finally:
self._event.clear()

View file

@ -0,0 +1,5 @@
"""FastAPI REST adapter — POST /jobs / GET /jobs / GET /healthz / GET /metrics.
Routes are defined in ``routes.py``. The ``create_app`` factory in
``ix.app`` wires them up alongside the worker lifespan.
"""

View file

@ -0,0 +1,227 @@
"""REST routes (spec §5).
The routes depend on two injected objects:
* a session factory (``get_session_factory_dep``): swapped in tests so we can
use the fixture's per-test engine instead of the lazy process-wide one in
``ix.store.engine``.
* a :class:`Probes` bundle (``get_probes``): each probe returns the
per-subsystem state string used by ``/healthz``. Tests inject canned
probes; Chunk 4 wires the real Ollama/Surya ones.
``/healthz`` has a strict 2-second postgres timeout we use an
``asyncio.wait_for`` around a ``SELECT 1`` roundtrip so a broken pool or a
hung connection can't wedge the healthcheck endpoint.
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Annotated, Literal
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Response
from sqlalchemy import func, select, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ix.adapters.rest.schemas import HealthStatus, JobSubmitResponse, MetricsResponse
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.store import jobs_repo
from ix.store.engine import get_session_factory
from ix.store.models import IxJob
@dataclass
class Probes:
"""Injected subsystem-probe callables for ``/healthz``.
Each callable returns the literal status string expected in the body.
Probes are sync by design none of the real ones need awaits today and
keeping them sync lets tests pass plain lambdas. Real probes that need
async work run the call through ``asyncio.run_in_executor`` inside the
callable (Chunk 4).
"""
ollama: Callable[[], Literal["ok", "degraded", "fail"]]
ocr: Callable[[], Literal["ok", "fail"]]
def get_session_factory_dep() -> async_sessionmaker[AsyncSession]:
"""Default DI: the process-wide store factory. Tests override this."""
return get_session_factory()
def get_probes() -> Probes:
"""Default DI: a pair of ``fail`` probes.
Production wiring (Chunk 4) overrides this factory with real Ollama +
Surya probes at app-startup time. Integration tests override via
``app.dependency_overrides[get_probes]`` with a canned ``ok`` pair.
The default here ensures a mis-wired deployment surfaces clearly in
``/healthz`` rather than claiming everything is fine by accident.
"""
return Probes(ollama=lambda: "fail", ocr=lambda: "fail")
router = APIRouter()
@router.post("/jobs", response_model=JobSubmitResponse, status_code=201)
async def submit_job(
request: RequestIX,
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> JobSubmitResponse:
"""Submit a new job.
Per spec §5: 201 on first insert, 200 on idempotent re-submit of an
existing ``(client_id, request_id)`` pair. We detect the second case by
snapshotting the pre-insert row set and comparing ``created_at``.
"""
async with session_factory() as session:
existing = await jobs_repo.get_by_correlation(
session, request.ix_client_id, request.request_id
)
job = await jobs_repo.insert_pending(
session, request, callback_url=request.callback_url
)
await session.commit()
if existing is not None:
# Idempotent re-submit — flip to 200. FastAPI's declared status_code
# is 201, but setting response.status_code overrides it per-call.
response.status_code = 200
return JobSubmitResponse(job_id=job.job_id, ix_id=job.ix_id, status=job.status)
@router.get("/jobs/{job_id}", response_model=Job)
async def get_job(
job_id: UUID,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/jobs", response_model=Job)
async def lookup_job_by_correlation(
client_id: Annotated[str, Query(...)],
request_id: Annotated[str, Query(...)],
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> Job:
async with session_factory() as session:
job = await jobs_repo.get_by_correlation(session, client_id, request_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
return job
@router.get("/healthz")
async def healthz(
response: Response,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
probes: Annotated[Probes, Depends(get_probes)],
) -> HealthStatus:
"""Per spec §5: postgres / ollama / ocr; 200 iff all three == ok."""
postgres_state: Literal["ok", "fail"] = "fail"
try:
async def _probe() -> None:
async with session_factory() as session:
await session.execute(text("SELECT 1"))
await asyncio.wait_for(_probe(), timeout=2.0)
postgres_state = "ok"
except Exception:
postgres_state = "fail"
try:
ollama_state = probes.ollama()
except Exception:
ollama_state = "fail"
try:
ocr_state = probes.ocr()
except Exception:
ocr_state = "fail"
body = HealthStatus(
postgres=postgres_state, ollama=ollama_state, ocr=ocr_state
)
if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok":
response.status_code = 503
return body
@router.get("/metrics", response_model=MetricsResponse)
async def metrics(
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
) -> MetricsResponse:
"""Counters over the last 24h — plain JSON per spec §5."""
since = datetime.now(UTC) - timedelta(hours=24)
async with session_factory() as session:
pending = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "pending")
)
running = await session.scalar(
select(func.count()).select_from(IxJob).where(IxJob.status == "running")
)
done_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "done", IxJob.finished_at >= since)
)
error_24h = await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "error", IxJob.finished_at >= since)
)
# Per-use-case average seconds. ``request`` is JSONB, so we dig out
# the use_case key via ->>. Only consider rows that both started and
# finished in the window (can't compute elapsed otherwise).
rows = (
await session.execute(
text(
"SELECT request->>'use_case' AS use_case, "
"AVG(EXTRACT(EPOCH FROM (finished_at - started_at))) "
"FROM ix_jobs "
"WHERE status='done' AND finished_at IS NOT NULL "
"AND started_at IS NOT NULL AND finished_at >= :since "
"GROUP BY request->>'use_case'"
),
{"since": since},
)
).all()
by_use_case = {row[0]: float(row[1]) for row in rows if row[0] is not None}
return MetricsResponse(
jobs_pending=int(pending or 0),
jobs_running=int(running or 0),
jobs_done_24h=int(done_24h or 0),
jobs_error_24h=int(error_24h or 0),
by_use_case_seconds=by_use_case,
)

View file

@ -0,0 +1,52 @@
"""REST-adapter request / response bodies.
Most payloads reuse the core contracts directly (:class:`RequestIX`,
:class:`Job`). The only adapter-specific shape is the lightweight POST /jobs
response (`job_id`, `ix_id`, `status`) callers don't need the full Job
envelope back on submit; they poll ``GET /jobs/{id}`` for that.
"""
from __future__ import annotations
from typing import Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict
class JobSubmitResponse(BaseModel):
"""What POST /jobs returns: just enough to poll or subscribe to callbacks."""
model_config = ConfigDict(extra="forbid")
job_id: UUID
ix_id: str
status: Literal["pending", "running", "done", "error"]
class HealthStatus(BaseModel):
"""Body of GET /healthz.
Each field reports per-subsystem state. Overall HTTP status is 200 iff
every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"``
when the backend is reachable but the default model isn't pulled —
monitoring surfaces that as non-200.
"""
model_config = ConfigDict(extra="forbid")
postgres: Literal["ok", "fail"]
ollama: Literal["ok", "degraded", "fail"]
ocr: Literal["ok", "fail"]
class MetricsResponse(BaseModel):
"""Body of GET /metrics — plain JSON (no Prometheus format for MVP)."""
model_config = ConfigDict(extra="forbid")
jobs_pending: int
jobs_running: int
jobs_done_24h: int
jobs_error_24h: int
by_use_case_seconds: dict[str, float]

232
src/ix/app.py Normal file
View file

@ -0,0 +1,232 @@
"""FastAPI app factory + lifespan.
``create_app()`` wires the REST router on top of a lifespan that spawns the
worker loop (Task 3.5) and the pg_queue listener (Task 3.6). Tests that
don't care about the worker call ``create_app(spawn_worker=False)`` so the
lifespan returns cleanly.
Task 4.3 fills in the production wiring:
* Factories (``make_genai_client`` / ``make_ocr_client``) pick between
fakes (``IX_TEST_MODE=fake``) and real Ollama/Surya clients.
* ``/healthz`` probes call ``selfcheck()`` on the active clients. In
``fake`` mode they always report ok.
* The worker's :class:`Pipeline` is built once per spawn with the real
chain of Steps; each call to the injected ``pipeline_factory`` returns
a fresh Pipeline so per-request state stays isolated.
"""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterator, Callable
from contextlib import asynccontextmanager, suppress
from typing import Literal
from fastapi import FastAPI
from ix.adapters.rest.routes import Probes, get_probes
from ix.adapters.rest.routes import router as rest_router
from ix.config import AppConfig, get_config
from ix.genai import make_genai_client
from ix.genai.client import GenAIClient
from ix.ocr import make_ocr_client
from ix.ocr.client import OCRClient
from ix.pipeline.genai_step import GenAIStep
from ix.pipeline.ocr_step import OCRStep
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.reliability_step import ReliabilityStep
from ix.pipeline.response_handler_step import ResponseHandlerStep
from ix.pipeline.setup_step import SetupStep
def build_pipeline(
genai: GenAIClient, ocr: OCRClient, cfg: AppConfig
) -> Pipeline:
"""Assemble the production :class:`Pipeline` with injected clients.
Kept as a module-level helper so tests that want to exercise the
production wiring (without running the worker) can call it directly.
"""
from pathlib import Path
from ix.ingestion import FetchConfig
return Pipeline(
steps=[
SetupStep(
tmp_dir=Path(cfg.tmp_dir),
fetch_config=FetchConfig(
connect_timeout_s=float(cfg.file_connect_timeout_seconds),
read_timeout_s=float(cfg.file_read_timeout_seconds),
max_bytes=cfg.file_max_bytes,
),
),
OCRStep(ocr_client=ocr),
GenAIStep(genai_client=genai),
ReliabilityStep(),
ResponseHandlerStep(),
]
)
def _make_ollama_probe(
genai: GenAIClient, cfg: AppConfig
) -> Callable[[], Literal["ok", "degraded", "fail"]]:
"""Adapter: async ``selfcheck`` → sync callable the route expects.
Always drives the coroutine on a throwaway event loop in a separate
thread. This keeps the behavior identical whether the caller holds an
event loop (FastAPI request) or doesn't (a CLI tool), and avoids the
``asyncio.run`` vs. already-running-loop footgun.
"""
def probe() -> Literal["ok", "degraded", "fail"]:
if not hasattr(genai, "selfcheck"):
return "ok" # fake client — nothing to probe.
return _run_async_sync(
lambda: genai.selfcheck(expected_model=cfg.default_model), # type: ignore[attr-defined]
fallback="fail",
)
return probe
def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]:
def probe() -> Literal["ok", "fail"]:
if not hasattr(ocr, "selfcheck"):
return "ok" # fake — nothing to probe.
return _run_async_sync(
lambda: ocr.selfcheck(), # type: ignore[attr-defined]
fallback="fail",
)
return probe
def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def]
"""Run ``make_coro()`` on a fresh loop in a thread; return its result.
The thread owns its own event loop so the caller's loop (if any) keeps
running. Any exception collapses to ``fallback``.
"""
import threading
result: dict[str, object] = {}
def _runner() -> None:
loop = asyncio.new_event_loop()
try:
result["value"] = loop.run_until_complete(make_coro())
except Exception as exc: # any error collapses to fallback
result["error"] = exc
finally:
loop.close()
t = threading.Thread(target=_runner)
t.start()
t.join()
if "error" in result or "value" not in result:
return fallback
return str(result["value"])
def create_app(*, spawn_worker: bool = True) -> FastAPI:
"""Construct the ASGI app.
Parameters
----------
spawn_worker:
When True (default), the lifespan spawns the background worker task
and the pg_queue listener. Integration tests that only exercise the
REST adapter pass False so jobs pile up as ``pending`` and the tests
can assert on their state without a racing worker mutating them.
"""
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
cfg = get_config()
# Build the clients once per process. The worker's pipeline
# factory closes over these so every job runs through the same
# Ollama/Surya instance (Surya's predictors are heavy; re-loading
# them per job would be catastrophic).
genai_client = make_genai_client(cfg)
ocr_client = make_ocr_client(cfg)
# Override the route-level probe DI so /healthz reflects the
# actual clients. Tests that want canned probes can still override
# ``get_probes`` at the TestClient layer.
_app.dependency_overrides.setdefault(
get_probes,
lambda: Probes(
ollama=_make_ollama_probe(genai_client, cfg),
ocr=_make_ocr_probe(ocr_client),
),
)
worker_task = None
listener = None
if spawn_worker:
try:
from ix.adapters.pg_queue.listener import (
PgQueueListener,
asyncpg_dsn_from_sqlalchemy_url,
)
listener = PgQueueListener(
dsn=asyncpg_dsn_from_sqlalchemy_url(cfg.postgres_url)
)
await listener.start()
except Exception:
listener = None
try:
worker_task = await _spawn_production_worker(
cfg, genai_client, ocr_client, listener
)
except Exception:
worker_task = None
try:
yield
finally:
if worker_task is not None:
worker_task.cancel()
with suppress(Exception):
await worker_task
if listener is not None:
with suppress(Exception):
await listener.stop()
app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0")
app.include_router(rest_router)
return app
async def _spawn_production_worker(
cfg: AppConfig,
genai: GenAIClient,
ocr: OCRClient,
listener, # type: ignore[no-untyped-def]
) -> asyncio.Task[None]:
"""Spawn the background worker with a production pipeline factory."""
from ix.store.engine import get_session_factory
from ix.worker.loop import Worker
def pipeline_factory() -> Pipeline:
return build_pipeline(genai, ocr, cfg)
worker = Worker(
session_factory=get_session_factory(),
pipeline_factory=pipeline_factory,
poll_interval_seconds=10.0,
max_running_seconds=2 * cfg.pipeline_request_timeout_seconds,
callback_timeout_seconds=cfg.callback_timeout_seconds,
wait_for_work=listener.wait_for_work if listener is not None else None,
)
stop = asyncio.Event()
return asyncio.create_task(worker.run(stop))

View file

@ -12,6 +12,7 @@ re-read after ``monkeypatch.setenv``. Production code never clears the cache.
from __future__ import annotations
from functools import lru_cache
from typing import Literal
from pydantic_settings import BaseSettings, SettingsConfigDict
@ -32,14 +33,17 @@ class AppConfig(BaseSettings):
)
# --- Job store ---
# Defaults assume the ix container runs with `network_mode: host` and
# reaches the shared `postgis` and `ollama` containers on loopback;
# spec §11 / docker-compose.yml ship that configuration.
postgres_url: str = (
"postgresql+asyncpg://infoxtractor:<password>"
"@host.docker.internal:5431/infoxtractor"
"@127.0.0.1:5431/infoxtractor"
)
# --- LLM backend ---
ollama_url: str = "http://host.docker.internal:11434"
default_model: str = "gpt-oss:20b"
ollama_url: str = "http://127.0.0.1:11434"
default_model: str = "qwen3:14b"
# --- OCR ---
ocr_engine: str = "surya"
@ -62,6 +66,13 @@ class AppConfig(BaseSettings):
# --- Observability ---
log_level: str = "INFO"
# --- Test / wiring mode ---
# ``fake``: factories return FakeGenAIClient / FakeOCRClient and
# ``/healthz`` probes report ok. CI sets this so the Forgejo runner
# doesn't need access to Ollama or GPU-backed Surya. ``None`` (default)
# means production wiring: real OllamaClient + SuryaOCRClient.
test_mode: Literal["fake"] | None = None
@lru_cache(maxsize=1)
def get_config() -> AppConfig:

View file

@ -1,18 +1,43 @@
"""GenAI subsystem: protocol + fake client + invocation-result dataclasses.
Real backends (Ollama, etc.) plug in behind :class:`GenAIClient`. The MVP
ships only :class:`FakeGenAIClient` from this package; the real Ollama
client lands in Chunk 4.
Real backends (Ollama, ) plug in behind :class:`GenAIClient`. The factory
:func:`make_genai_client` picks between :class:`FakeGenAIClient` (for CI
/ hermetic tests via ``IX_TEST_MODE=fake``) and :class:`OllamaClient`
(production). Tests that want a real Ollama client anyway can call the
constructor directly.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.genai.client import GenAIClient, GenAIInvocationResult, GenAIUsage
from ix.genai.fake import FakeGenAIClient
from ix.genai.ollama_client import OllamaClient
def make_genai_client(cfg: AppConfig) -> GenAIClient:
"""Return the :class:`GenAIClient` configured for the current run.
When ``cfg.test_mode == "fake"`` the fake is returned; the pipeline
callers are expected to override the injected client via DI if they
want a non-default canned response. Otherwise a live
:class:`OllamaClient` bound to ``cfg.ollama_url`` and the per-call
timeout is returned.
"""
if cfg.test_mode == "fake":
return FakeGenAIClient(parsed=None)
return OllamaClient(
base_url=cfg.ollama_url,
per_call_timeout_s=float(cfg.genai_call_timeout_seconds),
)
__all__ = [
"FakeGenAIClient",
"GenAIClient",
"GenAIInvocationResult",
"GenAIUsage",
"OllamaClient",
"make_genai_client",
]

View file

@ -0,0 +1,340 @@
"""OllamaClient — real :class:`GenAIClient` implementation (spec §6 GenAIStep).
Wraps the Ollama ``/api/chat`` structured-output endpoint. Per spec:
* POST ``{base_url}/api/chat`` with ``format = <pydantic JSON schema>``,
``stream = false``, and ``options`` carrying provider-neutral knobs
(``temperature`` mapped, ``reasoning_effort`` dropped Ollama ignores it).
* Messages are passed through. Content-parts lists (``[{"type":"text",...}]``)
are joined to a single string because MVP models (``gpt-oss:20b`` /
``qwen2.5:32b``) don't accept native content-parts.
* Per-call timeout is enforced via ``httpx``. A connection refusal, read
timeout, or 5xx maps to ``IX_002_000``. A 2xx whose ``message.content`` is
not valid JSON for the schema maps to ``IX_002_001``.
``selfcheck()`` targets ``/api/tags`` with a fixed 5 s timeout and is what
``/healthz`` consumes.
"""
from __future__ import annotations
from typing import Any, Literal
import httpx
from pydantic import BaseModel, ValidationError
from ix.errors import IXErrorCode, IXException
from ix.genai.client import GenAIInvocationResult, GenAIUsage
_OLLAMA_TAGS_TIMEOUT_S: float = 5.0
_BODY_SNIPPET_MAX_CHARS: int = 240
class OllamaClient:
"""Async Ollama backend satisfying :class:`~ix.genai.client.GenAIClient`.
Parameters
----------
base_url:
Root URL of the Ollama server (e.g. ``http://127.0.0.1:11434``).
Trailing slashes are stripped.
per_call_timeout_s:
Hard per-call timeout for ``/api/chat``. Spec default: 1500 s.
"""
def __init__(self, base_url: str, per_call_timeout_s: float) -> None:
self._base_url = base_url.rstrip("/")
self._per_call_timeout_s = per_call_timeout_s
async def invoke(
self,
request_kwargs: dict[str, Any],
response_schema: type[BaseModel],
) -> GenAIInvocationResult:
"""Run one structured-output chat call; parse into ``response_schema``."""
body = self._translate_request(request_kwargs, response_schema)
url = f"{self._base_url}/api/chat"
try:
async with httpx.AsyncClient(timeout=self._per_call_timeout_s) as http:
resp = await http.post(url, json=body)
except httpx.HTTPError as exc:
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama {exc.__class__.__name__}: {exc}",
) from exc
except (ConnectionError, TimeoutError) as exc: # pragma: no cover - httpx wraps these
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama {exc.__class__.__name__}: {exc}",
) from exc
if resp.status_code >= 500:
raise IXException(
IXErrorCode.IX_002_000,
detail=(
f"ollama HTTP {resp.status_code}: "
f"{resp.text[:_BODY_SNIPPET_MAX_CHARS]}"
),
)
if resp.status_code >= 400:
raise IXException(
IXErrorCode.IX_002_000,
detail=(
f"ollama HTTP {resp.status_code}: "
f"{resp.text[:_BODY_SNIPPET_MAX_CHARS]}"
),
)
try:
payload = resp.json()
except ValueError as exc:
raise IXException(
IXErrorCode.IX_002_000,
detail=f"ollama non-JSON body: {resp.text[:_BODY_SNIPPET_MAX_CHARS]}",
) from exc
content = (payload.get("message") or {}).get("content") or ""
json_blob = _extract_json_blob(content)
try:
parsed = response_schema.model_validate_json(json_blob)
except ValidationError as exc:
raise IXException(
IXErrorCode.IX_002_001,
detail=(
f"{response_schema.__name__}: {exc.__class__.__name__}: "
f"body={content[:_BODY_SNIPPET_MAX_CHARS]}"
),
) from exc
except ValueError as exc:
# ``model_validate_json`` raises ValueError on invalid JSON (not
# a ValidationError). Treat as structured-output failure.
raise IXException(
IXErrorCode.IX_002_001,
detail=(
f"{response_schema.__name__}: invalid JSON: "
f"body={content[:_BODY_SNIPPET_MAX_CHARS]}"
),
) from exc
usage = GenAIUsage(
prompt_tokens=int(payload.get("prompt_eval_count") or 0),
completion_tokens=int(payload.get("eval_count") or 0),
)
model_name = str(payload.get("model") or request_kwargs.get("model") or "")
return GenAIInvocationResult(parsed=parsed, usage=usage, model_name=model_name)
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
"""Probe ``/api/tags`` for ``/healthz``.
``ok`` when the server answers 2xx and ``expected_model`` is listed;
``degraded`` when reachable but the model is missing; ``fail``
otherwise. Spec §5, §11.
"""
try:
async with httpx.AsyncClient(timeout=_OLLAMA_TAGS_TIMEOUT_S) as http:
resp = await http.get(f"{self._base_url}/api/tags")
except (httpx.HTTPError, ConnectionError, TimeoutError):
return "fail"
if resp.status_code != 200:
return "fail"
try:
payload = resp.json()
except ValueError:
return "fail"
models = payload.get("models") or []
names = {str(entry.get("name", "")) for entry in models}
if expected_model in names:
return "ok"
return "degraded"
def _translate_request(
self,
request_kwargs: dict[str, Any],
response_schema: type[BaseModel],
) -> dict[str, Any]:
"""Map provider-neutral kwargs to Ollama's /api/chat body.
Schema strategy for Ollama 0.11.8: we pass ``format="json"`` (loose
JSON mode) and bake the Pydantic schema into a system message
ahead of the caller's own system prompt. Rationale:
* The full Pydantic schema as ``format=<schema>`` crashes llama.cpp's
structured-output implementation (SIGSEGV) on every non-trivial
shape ``anyOf`` / ``$ref`` / ``pattern`` all trigger it.
* ``format="json"`` alone guarantees valid JSON but not the shape;
models routinely return ``{}`` when not told what fields to emit.
* Injecting the schema into the prompt is the cheapest way to
get both: the model sees the expected shape explicitly, Pydantic
validates the response at parse time (IX_002_001 on mismatch).
Non-Ollama ``GenAIClient`` impls can ignore this behaviour and use
native structured-output (``response_format`` on OpenAI, etc.).
"""
messages = self._translate_messages(
list(request_kwargs.get("messages") or [])
)
messages = _inject_schema_system_message(messages, response_schema)
body: dict[str, Any] = {
"model": request_kwargs.get("model"),
"messages": messages,
"stream": False,
# NOTE: format is deliberately omitted. `format="json"` made
# reasoning models (qwen3) abort after emitting `{}` because the
# constrained sampler terminated before the chain-of-thought
# finished; `format=<schema>` segfaulted Ollama 0.11.8. Letting
# the model stream freely and then extracting the trailing JSON
# blob works for both reasoning and non-reasoning models.
}
options: dict[str, Any] = {}
if "temperature" in request_kwargs:
options["temperature"] = request_kwargs["temperature"]
# reasoning_effort intentionally dropped — Ollama doesn't support it.
if options:
body["options"] = options
return body
@staticmethod
def _translate_messages(
messages: list[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Collapse content-parts lists into single strings for Ollama."""
out: list[dict[str, Any]] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, list):
text_parts = [
str(part.get("text", ""))
for part in content
if isinstance(part, dict) and part.get("type") == "text"
]
new_content = "\n".join(text_parts)
else:
new_content = content
out.append({**msg, "content": new_content})
return out
def _extract_json_blob(text: str) -> str:
"""Return the outermost balanced JSON object in ``text``.
Reasoning models (qwen3, deepseek-r1) wrap their real answer in
``<think></think>`` blocks. Other models sometimes prefix prose or
fence the JSON in ```json``` code blocks. Finding the last balanced
``{}`` is the cheapest robust parse that works for all three shapes;
a malformed response yields the full text and Pydantic catches it
downstream as ``IX_002_001``.
"""
start = text.find("{")
if start < 0:
return text
depth = 0
in_string = False
escaped = False
for i in range(start, len(text)):
ch = text[i]
if in_string:
if escaped:
escaped = False
elif ch == "\\":
escaped = True
elif ch == '"':
in_string = False
continue
if ch == '"':
in_string = True
elif ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
return text[start : i + 1]
return text[start:]
def _inject_schema_system_message(
messages: list[dict[str, Any]],
response_schema: type[BaseModel],
) -> list[dict[str, Any]]:
"""Prepend a system message that pins the expected JSON shape.
Ollama's ``format="json"`` mode guarantees valid JSON but not the
field set or names. We emit the Pydantic schema as JSON and
instruct the model to match it. If the caller already provides a
system message, we prepend ours; otherwise ours becomes the first
system turn.
"""
import json as _json
schema_json = _json.dumps(
_sanitise_schema_for_ollama(response_schema.model_json_schema()),
indent=2,
)
guidance = (
"Respond ONLY with a single JSON object matching this JSON Schema "
"exactly. No prose, no code fences, no explanations. All top-level "
"properties listed in `required` MUST be present. Use null for "
"fields you cannot confidently extract. The JSON Schema:\n"
f"{schema_json}"
)
return [{"role": "system", "content": guidance}, *messages]
def _sanitise_schema_for_ollama(schema: Any) -> Any:
"""Strip null branches from ``anyOf`` unions.
Ollama 0.11.8's llama.cpp structured-output implementation segfaults on
Pydantic v2's standard Optional pattern::
{"anyOf": [{"type": "string"}, {"type": "null"}]}
We collapse any ``anyOf`` that includes a ``{"type": "null"}`` entry to
its non-null branch single branch becomes that branch inline; multiple
branches keep the union without null. This only narrows what the LLM is
*told* it may emit; Pydantic still validates the real response and can
accept ``None`` at parse time if the field is ``Optional``.
Walk is recursive and structure-preserving. Other ``anyOf`` shapes (e.g.
polymorphic unions without null) are left alone.
"""
if isinstance(schema, dict):
cleaned: dict[str, Any] = {}
for key, value in schema.items():
if key == "anyOf" and isinstance(value, list):
non_null = [
_sanitise_schema_for_ollama(branch)
for branch in value
if not (isinstance(branch, dict) and branch.get("type") == "null")
]
if len(non_null) == 1:
# Inline the single remaining branch; merge its keys into the
# parent so siblings like ``default``/``title`` are preserved.
only = non_null[0]
if isinstance(only, dict):
for ok, ov in only.items():
cleaned.setdefault(ok, ov)
else:
cleaned[key] = non_null
elif len(non_null) == 0:
# Pathological: nothing left. Fall back to a permissive type.
cleaned["type"] = "string"
else:
cleaned[key] = non_null
else:
cleaned[key] = _sanitise_schema_for_ollama(value)
return cleaned
if isinstance(schema, list):
return [_sanitise_schema_for_ollama(item) for item in schema]
return schema
__all__ = ["OllamaClient"]

View file

@ -1,13 +1,34 @@
"""OCR subsystem: protocol + fake client.
"""OCR subsystem: protocol + fake + real Surya client + factory.
Real engines (Surya, Azure DI, ) plug in behind :class:`OCRClient`. The
MVP ships only :class:`FakeOCRClient` from this package; the real Surya
client lands in Chunk 4.
Real engines (Surya today, Azure DI / AWS Textract deferred) plug in
behind :class:`OCRClient`. The factory :func:`make_ocr_client` picks
between :class:`FakeOCRClient` (when ``IX_TEST_MODE=fake``) and
:class:`SuryaOCRClient` (production). Unknown engine names raise so a
typo'd ``IX_OCR_ENGINE`` surfaces at startup, not later.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.contracts.response import OCRDetails, OCRResult
from ix.ocr.client import OCRClient
from ix.ocr.fake import FakeOCRClient
from ix.ocr.surya_client import SuryaOCRClient
__all__ = ["FakeOCRClient", "OCRClient"]
def make_ocr_client(cfg: AppConfig) -> OCRClient:
"""Return the :class:`OCRClient` configured for the current run."""
if cfg.test_mode == "fake":
return FakeOCRClient(canned=OCRResult(result=OCRDetails()))
if cfg.ocr_engine == "surya":
return SuryaOCRClient()
raise ValueError(f"Unknown ocr_engine: {cfg.ocr_engine!r}")
__all__ = [
"FakeOCRClient",
"OCRClient",
"SuryaOCRClient",
"make_ocr_client",
]

View file

@ -5,11 +5,19 @@ method satisfies the Protocol. :class:`~ix.pipeline.ocr_step.OCRStep`
depends on the Protocol, not a concrete class, so swapping engines
(``FakeOCRClient`` in tests, ``SuryaOCRClient`` in prod) stays a wiring
change at the app factory.
Per-page source location (``files`` + ``page_metadata``) flows in as
optional kwargs: fakes ignore them; the real
:class:`~ix.ocr.surya_client.SuryaOCRClient` uses them to render each
page's pixels back from disk. Keeping these optional lets unit tests stay
pages-only while production wiring (Task 4.3) plumbs through the real
filesystem handles.
"""
from __future__ import annotations
from typing import Protocol, runtime_checkable
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
from ix.contracts import OCRResult, Page
@ -24,8 +32,18 @@ class OCRClient(Protocol):
per input page (in the same order).
"""
async def ocr(self, pages: list[Page]) -> OCRResult:
"""Run OCR over the input pages; return the structured result."""
async def ocr(
self,
pages: list[Page],
*,
files: list[tuple[Path, str]] | None = None,
page_metadata: list[Any] | None = None,
) -> OCRResult:
"""Run OCR over the input pages; return the structured result.
``files`` and ``page_metadata`` are optional for hermetic tests;
real engines that need to re-render from disk read them.
"""
...

View file

@ -30,8 +30,17 @@ class FakeOCRClient:
self._canned = canned
self._raise_on_call = raise_on_call
async def ocr(self, pages: list[Page]) -> OCRResult:
"""Return the canned result or raise the configured error."""
async def ocr(
self,
pages: list[Page],
**_kwargs: object,
) -> OCRResult:
"""Return the canned result or raise the configured error.
Accepts (and ignores) any keyword args the production Protocol may
carry keeps the fake swappable for :class:`SuryaOCRClient` at
call sites that pass ``files`` / ``page_metadata``.
"""
if self._raise_on_call is not None:
raise self._raise_on_call
return self._canned

235
src/ix/ocr/surya_client.py Normal file
View file

@ -0,0 +1,235 @@
"""SuryaOCRClient — real :class:`OCRClient` backed by ``surya-ocr``.
Per spec §6.2: the MVP OCR engine. Runs Surya's detection + recognition
predictors over per-page PIL images rendered from the downloaded sources
(PDFs via PyMuPDF, images via Pillow).
Design choices:
* **Lazy model loading.** ``__init__`` is cheap; the heavy predictors are
built on first :meth:`ocr` / :meth:`selfcheck` / explicit :meth:`warm_up`.
This keeps FastAPI's lifespan predictable — ops can decide whether to
pay the load cost up front or on first request.
* **Device is Surya's default.** CUDA on the prod box, MPS on M-series Macs.
We deliberately don't pin.
* **No text-token reuse from PyMuPDF.** The cross-check against Paperless'
Tesseract output (ReliabilityStep's ``text_agreement``) is only meaningful
with a truly independent OCR pass, so we always render-and-recognize
even for PDFs that carry embedded text.
The ``surya-ocr`` package pulls torch + heavy model deps, so it's kept
behind the ``[ocr]`` extra. All Surya imports are deferred into
:meth:`warm_up` so running the unit tests (which patch the predictors)
doesn't require the package to be installed.
"""
from __future__ import annotations
import asyncio
import contextlib
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from ix.contracts import Line, OCRDetails, OCRResult, Page
from ix.segmentation import PageMetadata
if TYPE_CHECKING: # pragma: no cover
from PIL import Image as PILImage
class SuryaOCRClient:
"""Surya-backed OCR engine.
Attributes are created lazily by :meth:`warm_up`. The unit tests inject
mocks directly onto ``_recognition_predictor`` / ``_detection_predictor``
to avoid the Surya import chain.
"""
def __init__(self) -> None:
self._recognition_predictor: Any = None
self._detection_predictor: Any = None
def warm_up(self) -> None:
"""Load the detection + recognition predictors. Idempotent.
Called automatically on the first :meth:`ocr` / :meth:`selfcheck`,
or explicitly from the app lifespan to front-load the cost.
"""
if (
self._recognition_predictor is not None
and self._detection_predictor is not None
):
return
# Deferred imports: only reachable when the optional [ocr] extra is
# installed. Keeping them inside the method so base-install unit
# tests (which patch the predictors) don't need surya on sys.path.
from surya.detection import DetectionPredictor # type: ignore[import-not-found]
from surya.foundation import FoundationPredictor # type: ignore[import-not-found]
from surya.recognition import RecognitionPredictor # type: ignore[import-not-found]
foundation = FoundationPredictor()
self._recognition_predictor = RecognitionPredictor(foundation)
self._detection_predictor = DetectionPredictor()
async def ocr(
self,
pages: list[Page],
*,
files: list[tuple[Path, str]] | None = None,
page_metadata: list[Any] | None = None,
) -> OCRResult:
"""Render each input page, run Surya, translate back to contracts."""
self.warm_up()
images = self._render_pages(pages, files, page_metadata)
# Surya is blocking — run it off the event loop.
loop = asyncio.get_running_loop()
surya_results = await loop.run_in_executor(
None, self._run_recognition, images
)
out_pages: list[Page] = []
all_text_fragments: list[str] = []
for input_page, surya_result in zip(pages, surya_results, strict=True):
lines: list[Line] = []
for tl in getattr(surya_result, "text_lines", []) or []:
flat = self._flatten_polygon(getattr(tl, "polygon", None))
text = getattr(tl, "text", None)
lines.append(Line(text=text, bounding_box=flat))
if text:
all_text_fragments.append(text)
out_pages.append(
Page(
page_no=input_page.page_no,
width=input_page.width,
height=input_page.height,
angle=input_page.angle,
unit=input_page.unit,
lines=lines,
)
)
details = OCRDetails(
text="\n".join(all_text_fragments) if all_text_fragments else None,
pages=out_pages,
)
return OCRResult(result=details, meta_data={"engine": "surya"})
async def selfcheck(self) -> Literal["ok", "fail"]:
"""Run the predictors on a 1x1 image to confirm the stack works."""
try:
self.warm_up()
except Exception:
return "fail"
try:
from PIL import Image as PILImageRuntime
img = PILImageRuntime.new("RGB", (1, 1), color="white")
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._run_recognition, [img])
except Exception:
return "fail"
return "ok"
def _run_recognition(self, images: list[PILImage.Image]) -> list[Any]:
"""Invoke the recognition predictor. Kept tiny for threadpool offload."""
return list(
self._recognition_predictor(
images, det_predictor=self._detection_predictor
)
)
def _render_pages(
self,
pages: list[Page],
files: list[tuple[Path, str]] | None,
page_metadata: list[Any] | None,
) -> list[PILImage.Image]:
"""Render each input :class:`Page` to a PIL image.
We walk pages + page_metadata in lockstep so we know which source
file each page came from and (for PDFs) what page-index to render.
Text-only pages (``file_index is None``) get a blank 1x1 placeholder
so Surya returns an empty result and downstream code still gets one
entry per input page.
"""
from PIL import Image as PILImageRuntime
metas: list[PageMetadata] = list(page_metadata or [])
file_records: list[tuple[Path, str]] = list(files or [])
# Per-file lazy PDF openers so we don't re-open across pages.
pdf_docs: dict[int, Any] = {}
# Per-file running page-within-file counter. For PDFs we emit one
# entry per PDF page in order; ``pages`` was built the same way by
# DocumentIngestor, so a parallel counter reconstructs the mapping.
per_file_cursor: dict[int, int] = {}
rendered: list[PILImage.Image] = []
try:
for idx, _page in enumerate(pages):
meta = metas[idx] if idx < len(metas) else PageMetadata()
file_index = meta.file_index
if file_index is None or file_index >= len(file_records):
# Text-only page — placeholder image; Surya returns empty.
rendered.append(
PILImageRuntime.new("RGB", (1, 1), color="white")
)
continue
local_path, mime = file_records[file_index]
if mime == "application/pdf":
doc = pdf_docs.get(file_index)
if doc is None:
import fitz # PyMuPDF
doc = fitz.open(str(local_path))
pdf_docs[file_index] = doc
pdf_page_no = per_file_cursor.get(file_index, 0)
per_file_cursor[file_index] = pdf_page_no + 1
pdf_page = doc.load_page(pdf_page_no)
pix = pdf_page.get_pixmap(dpi=200)
img = PILImageRuntime.frombytes(
"RGB", (pix.width, pix.height), pix.samples
)
rendered.append(img)
elif mime in ("image/png", "image/jpeg", "image/tiff"):
frame_no = per_file_cursor.get(file_index, 0)
per_file_cursor[file_index] = frame_no + 1
img = PILImageRuntime.open(local_path)
# Handle multi-frame (TIFF) — seek to the right frame.
with contextlib.suppress(EOFError):
img.seek(frame_no)
rendered.append(img.convert("RGB"))
else: # pragma: no cover - ingestor already rejected
rendered.append(
PILImageRuntime.new("RGB", (1, 1), color="white")
)
finally:
for doc in pdf_docs.values():
with contextlib.suppress(Exception):
doc.close()
return rendered
@staticmethod
def _flatten_polygon(polygon: Any) -> list[float]:
"""Flatten ``[[x1,y1],[x2,y2],[x3,y3],[x4,y4]]`` → 8-float list.
Surya emits 4 quad corners. The spec wants 8 raw-pixel coords so
downstream provenance normalisation can consume them directly.
"""
if not polygon:
return []
flat: list[float] = []
for point in polygon:
if isinstance(point, (list, tuple)) and len(point) >= 2:
flat.append(float(point[0]))
flat.append(float(point[1]))
return flat
__all__ = ["SuryaOCRClient"]

View file

@ -56,7 +56,11 @@ class OCRStep(Step):
assert ctx is not None, "SetupStep must populate response_ix.context"
pages = list(getattr(ctx, "pages", []))
ocr_result = await self._client.ocr(pages)
files = list(getattr(ctx, "files", []) or [])
page_metadata = list(getattr(ctx, "page_metadata", []) or [])
ocr_result = await self._client.ocr(
pages, files=files, page_metadata=page_metadata
)
# Inject page tags around each OCR page's content so the LLM can
# cross-reference the visual anchor without a separate prompt hack.

274
src/ix/store/jobs_repo.py Normal file
View file

@ -0,0 +1,274 @@
"""Async CRUD over ``ix_jobs`` — the one module the worker / REST touches.
Every method takes an :class:`AsyncSession` (caller-owned transaction). The
caller commits. We don't manage transactions inside repo methods because the
worker sometimes needs to claim + run-pipeline + mark-done inside one
long-running unit of work, and an inside-the-method commit would break that.
A few invariants worth stating up front:
* ``ix_id`` is a 16-char hex string assigned by :func:`insert_pending` on
first insert. Callers MUST NOT pass one (we generate it); if a
``RequestIX`` arrives with ``ix_id`` set it is ignored.
* ``(client_id, request_id)`` is unique on collision we return the
existing row unchanged. Callback URLs on the second insert are ignored;
the first insert's metadata wins.
* Claim uses ``FOR UPDATE SKIP LOCKED`` so concurrent workers never pick the
same row, and a session holding a lock doesn't block a sibling claimer.
* Status transitions: ``pending running (done | error)``. The sweeper is
the only path back to ``pending`` (and only from ``running``); terminal
states are stable.
"""
from __future__ import annotations
import secrets
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
from uuid import UUID, uuid4
from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.contracts.response import ResponseIX
from ix.store.models import IxJob
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
def _new_ix_id() -> str:
"""Transport-assigned 16-hex handle.
``secrets.token_hex(8)`` gives 16 characters of entropy; good enough to
tag logs per spec §3 without collision risk across the lifetime of the
service.
"""
return secrets.token_hex(8)
def _orm_to_job(row: IxJob) -> Job:
"""Round-trip ORM row back through the Pydantic ``Job`` contract.
The JSONB columns come out as plain dicts; we let Pydantic re-validate
them into :class:`RequestIX` / :class:`ResponseIX`. Catching validation
errors here would mask real bugs; we let them surface.
"""
return Job(
job_id=row.job_id,
ix_id=row.ix_id,
client_id=row.client_id,
request_id=row.request_id,
status=row.status, # type: ignore[arg-type]
request=RequestIX.model_validate(row.request),
response=(
ResponseIX.model_validate(row.response) if row.response is not None else None
),
callback_url=row.callback_url,
callback_status=row.callback_status, # type: ignore[arg-type]
attempts=row.attempts,
created_at=row.created_at,
started_at=row.started_at,
finished_at=row.finished_at,
)
async def insert_pending(
session: AsyncSession,
request: RequestIX,
callback_url: str | None,
) -> Job:
"""Insert a pending row; return the new or existing :class:`Job`.
Uses ``INSERT ... ON CONFLICT DO NOTHING`` on the
``(client_id, request_id)`` unique index, then re-selects. If the insert
was a no-op the existing row is returned verbatim (status / callback_url
unchanged) callers rely on this for idempotent resubmission.
"""
ix_id = request.ix_id or _new_ix_id()
job_id = uuid4()
# Serialise the request through Pydantic so JSONB gets plain JSON types,
# not datetime / Decimal instances asyncpg would reject.
request_json = request.model_copy(update={"ix_id": ix_id}).model_dump(
mode="json"
)
stmt = (
pg_insert(IxJob)
.values(
job_id=job_id,
ix_id=ix_id,
client_id=request.ix_client_id,
request_id=request.request_id,
status="pending",
request=request_json,
response=None,
callback_url=callback_url,
callback_status=None,
attempts=0,
)
.on_conflict_do_nothing(index_elements=["client_id", "request_id"])
)
await session.execute(stmt)
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == request.ix_client_id,
IxJob.request_id == request.request_id,
)
)
assert row is not None, "insert_pending: row missing after upsert"
return _orm_to_job(row)
async def claim_next_pending(session: AsyncSession) -> Job | None:
"""Atomically pick the oldest pending row and flip it to running.
``FOR UPDATE SKIP LOCKED`` means a sibling worker can never deadlock on
our row; they'll skip past it and grab the next pending entry. The
sibling test in :mod:`tests/integration/test_jobs_repo` asserts this.
"""
stmt = (
select(IxJob)
.where(IxJob.status == "pending")
.order_by(IxJob.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
row = await session.scalar(stmt)
if row is None:
return None
row.status = "running"
row.started_at = datetime.now(UTC)
await session.flush()
return _orm_to_job(row)
async def get(session: AsyncSession, job_id: UUID) -> Job | None:
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
return _orm_to_job(row) if row is not None else None
async def get_by_correlation(
session: AsyncSession, client_id: str, request_id: str
) -> Job | None:
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == client_id,
IxJob.request_id == request_id,
)
)
return _orm_to_job(row) if row is not None else None
async def mark_done(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Write the pipeline's response and move to terminal state.
Status is ``done`` iff ``response.error is None``; any non-None error
flips us to ``error``. Spec §3 lifecycle invariant.
"""
status = "done" if response.error is None else "error"
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status=status,
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def mark_error(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Convenience wrapper that always writes status='error'.
Separate from :func:`mark_done` for readability at call sites: when the
worker knows it caught an exception the pipeline didn't handle itself,
``mark_error`` signals intent even if the response body happens to have
a populated error field.
"""
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status="error",
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def update_callback_status(
session: AsyncSession,
job_id: UUID,
status: Literal["delivered", "failed"],
) -> None:
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(callback_status=status)
)
async def sweep_orphans(
session: AsyncSession,
now: datetime,
max_running_seconds: int,
) -> list[UUID]:
"""Reset stale ``running`` rows back to ``pending`` and bump ``attempts``.
Called once at worker startup (spec §3) to rescue jobs whose owner died
mid-pipeline. The threshold is time-based on ``started_at`` so a still-
running worker never reclaims its own in-flight job callers pass
``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS`` per spec.
"""
# Pick candidates and return their ids so the worker can log what it
# did. Two-step (SELECT then UPDATE) is clearer than RETURNING for
# callers who want the id list alongside a plain UPDATE.
candidates = (
await session.scalars(
select(IxJob.job_id).where(
IxJob.status == "running",
IxJob.started_at < now - _as_interval(max_running_seconds),
)
)
).all()
if not candidates:
return []
await session.execute(
update(IxJob)
.where(IxJob.job_id.in_(candidates))
.values(
status="pending",
started_at=None,
attempts=IxJob.attempts + 1,
)
)
return list(candidates)
def _as_interval(seconds: int): # type: ignore[no-untyped-def]
"""Return a SQL interval expression for ``seconds``.
We build the interval via ``func.make_interval`` so asyncpg doesn't have
to guess at a text-form cast the server-side ``make_interval(secs :=)``
is unambiguous and avoids locale-dependent parsing.
"""
return func.make_interval(0, 0, 0, 0, 0, 0, seconds)

View file

@ -26,7 +26,7 @@ class Request(BaseModel):
model_config = ConfigDict(extra="forbid")
use_case_name: str = "Bank Statement Header"
default_model: str = "gpt-oss:20b"
default_model: str = "qwen3:14b"
system_prompt: str = (
"You extract header metadata from a single bank or credit-card statement. "
"Return only facts that appear in the document; leave a field null if uncertain. "

View file

@ -0,0 +1,7 @@
"""Async worker — pulls pending rows and runs the pipeline against them.
The worker is one asyncio task spawned by the FastAPI lifespan (see
``ix.app``). Single-concurrency per MVP spec (Ollama + Surya both want the
GPU serially). Production wiring lives in Chunk 4; until then the pipeline
factory is parameter-injected so tests pass a fakes-only Pipeline.
"""

44
src/ix/worker/callback.py Normal file
View file

@ -0,0 +1,44 @@
"""One-shot webhook callback delivery.
No retries the caller always has ``GET /jobs/{id}`` as the authoritative
fallback. We record the delivery outcome (``delivered`` / ``failed``) on the
row but never change ``status`` based on it; terminal states are stable.
Spec §5 callback semantics: one POST, 2xx delivered, anything else or
exception failed.
"""
from __future__ import annotations
from typing import Literal
import httpx
from ix.contracts.job import Job
async def deliver(
callback_url: str,
job: Job,
timeout_s: int,
) -> Literal["delivered", "failed"]:
"""POST the full :class:`Job` body to ``callback_url``; return the outcome.
``timeout_s`` caps both connect and read we don't configure them
separately for callbacks because the endpoint is caller-supplied and we
don't have a reason to treat slow-to-connect differently from slow-to-
respond. Any exception (connection error, timeout, non-2xx) collapses to
``"failed"``.
"""
try:
async with httpx.AsyncClient(timeout=timeout_s) as client:
response = await client.post(
callback_url,
json=job.model_dump(mode="json"),
)
if 200 <= response.status_code < 300:
return "delivered"
return "failed"
except Exception:
return "failed"

179
src/ix/worker/loop.py Normal file
View file

@ -0,0 +1,179 @@
"""Worker loop — claim pending rows, run pipeline, write terminal state.
One ``Worker`` instance per process. The loop body is:
1. Claim the next pending row (``FOR UPDATE SKIP LOCKED``). If none, wait
for the notify event or the poll interval, whichever fires first.
2. Build a fresh Pipeline via the injected factory and run it.
3. Write the response via ``mark_done`` (spec's ``done iff error is None``
invariant). If the pipeline itself raised (it shouldn't — steps catch
IXException internally but belt-and-braces), we stuff an
``IX_002_000`` into ``response.error`` and mark_error.
4. If the job has a ``callback_url``, POST once, record the outcome.
Startup pre-amble:
* Run ``sweep_orphans(now, 2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS)`` once
before the loop starts. Recovers rows left in ``running`` by a crashed
previous process.
The "wait for work" hook is a callable so Task 3.6's PgQueueListener can
plug in later without the worker needing to know anything about LISTEN.
"""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from ix.contracts.response import ResponseIX
from ix.errors import IXErrorCode, IXException
from ix.pipeline.pipeline import Pipeline
from ix.store import jobs_repo
from ix.worker import callback as cb
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
PipelineFactory = Callable[[], Pipeline]
WaitForWork = Callable[[float], "asyncio.Future[None] | asyncio.Task[None]"]
class Worker:
"""Single-concurrency worker loop.
Parameters
----------
session_factory:
async_sessionmaker bound to an engine on the current event loop.
pipeline_factory:
Zero-arg callable returning a fresh :class:`Pipeline`. In production
this builds the real pipeline with Ollama + Surya; in tests it
returns a Pipeline of fakes.
poll_interval_seconds:
Fallback poll cadence when no notify wakes us (spec: 10 s default).
max_running_seconds:
Threshold passed to :func:`sweep_orphans` at startup.
Production wiring passes ``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS``.
callback_timeout_seconds:
Timeout for the webhook POST per spec §5.
wait_for_work:
Optional coroutine-factory. When set, the worker awaits it instead
of ``asyncio.sleep``. Task 3.6 passes the PgQueueListener's
notify-or-poll helper.
"""
def __init__(
self,
*,
session_factory: async_sessionmaker[AsyncSession],
pipeline_factory: PipelineFactory,
poll_interval_seconds: float = 10.0,
max_running_seconds: int = 5400,
callback_timeout_seconds: int = 10,
wait_for_work: Callable[[float], asyncio.Future[None]] | None = None,
) -> None:
self._session_factory = session_factory
self._pipeline_factory = pipeline_factory
self._poll_interval = poll_interval_seconds
self._max_running_seconds = max_running_seconds
self._callback_timeout = callback_timeout_seconds
self._wait_for_work = wait_for_work
async def run(self, stop: asyncio.Event) -> None:
"""Drive the claim-run-write-callback loop until ``stop`` is set."""
await self._startup_sweep()
while not stop.is_set():
async with self._session_factory() as session:
job = await jobs_repo.claim_next_pending(session)
await session.commit()
if job is None:
await self._sleep_or_wake(stop)
continue
await self._run_one(job)
async def _startup_sweep(self) -> None:
"""Rescue ``running`` rows left behind by a previous crash."""
async with self._session_factory() as session:
await jobs_repo.sweep_orphans(
session,
datetime.now(UTC),
self._max_running_seconds,
)
await session.commit()
async def _sleep_or_wake(self, stop: asyncio.Event) -> None:
"""Either run the custom wait hook or sleep the poll interval.
We cap the wait at either the poll interval or the stop signal,
whichever fires first without this, a worker with no notify hook
would happily sleep for 10 s while the outer app is trying to shut
down.
"""
stop_task = asyncio.create_task(stop.wait())
try:
if self._wait_for_work is not None:
wake_task = asyncio.ensure_future(
self._wait_for_work(self._poll_interval)
)
else:
wake_task = asyncio.create_task(
asyncio.sleep(self._poll_interval)
)
try:
await asyncio.wait(
{stop_task, wake_task},
return_when=asyncio.FIRST_COMPLETED,
)
finally:
if not wake_task.done():
wake_task.cancel()
finally:
if not stop_task.done():
stop_task.cancel()
async def _run_one(self, job) -> None: # type: ignore[no-untyped-def]
"""Run the pipeline for one job; persist the outcome + callback."""
pipeline = self._pipeline_factory()
try:
response = await pipeline.start(job.request)
except Exception as exc:
# The pipeline normally catches IXException itself. Non-IX
# failures land here. We wrap the message in IX_002_000 so the
# caller sees a stable code.
ix_exc = IXException(IXErrorCode.IX_002_000, detail=str(exc))
response = ResponseIX(error=str(ix_exc))
async with self._session_factory() as session:
await jobs_repo.mark_error(session, job.job_id, response)
await session.commit()
else:
async with self._session_factory() as session:
await jobs_repo.mark_done(session, job.job_id, response)
await session.commit()
if job.callback_url:
await self._deliver_callback(job.job_id, job.callback_url)
async def _deliver_callback(self, job_id, callback_url: str) -> None: # type: ignore[no-untyped-def]
# Re-fetch the job so the callback payload reflects the final terminal
# state + response. Cheaper than threading the freshly-marked state
# back out of ``mark_done``, and keeps the callback body canonical.
async with self._session_factory() as session:
final = await jobs_repo.get(session, job_id)
if final is None:
return
status = await cb.deliver(callback_url, final, self._callback_timeout)
async with self._session_factory() as session:
await jobs_repo.update_callback_status(session, job_id, status)
await session.commit()

View file

@ -0,0 +1,124 @@
"""Integration-test fixtures — real Postgres required.
Policy: tests that import these fixtures skip cleanly when no DB is
configured. We check ``IX_TEST_DATABASE_URL`` first (local developer
override, usually a disposable docker container), then ``IX_POSTGRES_URL``
(what Forgejo Actions already sets). If neither is present the fixture
short-circuits with ``pytest.skip`` so a developer running
``pytest tests/unit`` in an unconfigured shell doesn't see the integration
suite hang or raise cryptic ``OperationalError``.
Schema lifecycle:
* session scope: ``alembic upgrade head`` once, ``alembic downgrade base``
at session end. We tried ``Base.metadata.create_all`` at first faster,
but it meant migrations stayed untested by the integration suite and a
developer who broke ``001_initial_ix_jobs.py`` wouldn't find out until
deploy. Current shape keeps migrations in the hot path.
* per-test: ``TRUNCATE ix_jobs`` (via the ``_reset_schema`` autouse fixture)
faster than recreating the schema and preserves indexes/constraints so
tests that want to assert ON a unique-violation path actually get one.
"""
from __future__ import annotations
import os
import subprocess
import sys
from collections.abc import AsyncIterator, Iterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
REPO_ROOT = Path(__file__).resolve().parents[2]
def _resolve_postgres_url() -> str | None:
"""Pick the database URL per policy: test override → CI URL → none."""
return os.environ.get("IX_TEST_DATABASE_URL") or os.environ.get("IX_POSTGRES_URL")
@pytest.fixture(scope="session")
def postgres_url() -> str:
url = _resolve_postgres_url()
if not url:
pytest.skip(
"no postgres configured — set IX_TEST_DATABASE_URL or IX_POSTGRES_URL"
)
return url
def _run_alembic(direction: str, postgres_url: str) -> None:
"""Invoke Alembic in a subprocess so its ``asyncio.run`` inside ``env.py``
doesn't collide with the pytest-asyncio event loop.
We pass the URL via ``IX_POSTGRES_URL`` not ``-x url=...`` because
percent-encoded characters in developer passwords trip up alembic's
configparser-backed ini loader. The env var lane skips configparser.
"""
env = os.environ.copy()
env["IX_POSTGRES_URL"] = postgres_url
subprocess.run(
[sys.executable, "-m", "alembic", direction, "head" if direction == "upgrade" else "base"],
cwd=REPO_ROOT,
env=env,
check=True,
)
@pytest.fixture(scope="session", autouse=True)
def _prepare_schema(postgres_url: str) -> Iterator[None]:
"""Run migrations once per session, torn down at the end.
pytest-asyncio creates one event loop per test (function-scoped by
default) and asyncpg connections can't survive a loop switch. That
forces a function-scoped engine below but migrations are expensive,
so we keep those session-scoped via a subprocess call (no loop
involvement at all).
"""
_run_alembic("downgrade", postgres_url)
_run_alembic("upgrade", postgres_url)
yield
_run_alembic("downgrade", postgres_url)
@pytest_asyncio.fixture
async def engine(postgres_url: str) -> AsyncIterator[AsyncEngine]:
"""Per-test async engine.
Built fresh each test so its asyncpg connections live on the same loop
as the test itself. Dispose on teardown otherwise asyncpg leaks tasks
into the next test's loop and we get ``got Future attached to a
different loop`` errors on the second test in a file.
"""
eng = create_async_engine(postgres_url, pool_pre_ping=True)
try:
yield eng
finally:
await eng.dispose()
@pytest_asyncio.fixture
async def session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
"""Per-test session factory. ``expire_on_commit=False`` per prod parity."""
return async_sessionmaker(engine, expire_on_commit=False)
@pytest_asyncio.fixture(autouse=True)
async def _reset_schema(engine: AsyncEngine) -> None:
"""Truncate ix_jobs between tests so each test starts from empty state."""
async with engine.begin() as conn:
await conn.exec_driver_sql("TRUNCATE ix_jobs")

View file

@ -0,0 +1,367 @@
"""Integration tests for :mod:`ix.store.jobs_repo` — run against a real DB.
Every test exercises one repo method end-to-end. A few go further and
concurrently spin up two sessions to demonstrate the claim query behaves
correctly under ``SKIP LOCKED`` (two claimers should never see the same row).
Skipped cleanly when no Postgres is configured see integration/conftest.py.
"""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
from uuid import UUID, uuid4
from ix.contracts.request import Context, RequestIX
from ix.contracts.response import ResponseIX
from ix.store import jobs_repo
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
def _make_request(client: str = "mammon", request_id: str = "r-1") -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id=client,
request_id=request_id,
context=Context(texts=["hello"]),
)
async def test_insert_pending_creates_row_and_assigns_ix_id(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
assert job.status == "pending"
assert isinstance(job.job_id, UUID)
# ix_id is a 16-hex string per spec §3 — transport-assigned.
assert isinstance(job.ix_id, str)
assert len(job.ix_id) == 16
assert all(c in "0123456789abcdef" for c in job.ix_id)
assert job.attempts == 0
async def test_insert_pending_is_idempotent_on_correlation_key(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""(client_id, request_id) collides → existing row comes back unchanged."""
async with session_factory() as session:
first = await jobs_repo.insert_pending(
session, _make_request("mammon", "same-id"), callback_url="http://x/cb"
)
await session.commit()
async with session_factory() as session:
second = await jobs_repo.insert_pending(
session, _make_request("mammon", "same-id"), callback_url="http://y/cb"
)
await session.commit()
assert second.job_id == first.job_id
assert second.ix_id == first.ix_id
# The callback_url of the FIRST insert wins — we don't overwrite.
assert second.callback_url == "http://x/cb"
async def test_get_returns_full_job(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
async with session_factory() as session:
fetched = await jobs_repo.get(session, inserted.job_id)
assert fetched is not None
assert fetched.job_id == inserted.job_id
assert fetched.request.use_case == "bank_statement_header"
assert fetched.status == "pending"
async def test_get_unknown_id_returns_none(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
result = await jobs_repo.get(session, uuid4())
assert result is None
async def test_get_by_correlation(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request("mammon", "req-42"), callback_url=None
)
await session.commit()
async with session_factory() as session:
found = await jobs_repo.get_by_correlation(session, "mammon", "req-42")
assert found is not None
assert found.job_id == inserted.job_id
async with session_factory() as session:
missing = await jobs_repo.get_by_correlation(session, "mammon", "nope")
assert missing is None
async def test_claim_next_pending_advances_status(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is not None
assert claimed.job_id == inserted.job_id
assert claimed.status == "running"
assert claimed.started_at is not None
async def test_claim_next_pending_returns_none_when_empty(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is None
async def test_claim_next_pending_skips_locked(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Two concurrent claimers pick different rows (SKIP LOCKED in action)."""
async with session_factory() as session:
a = await jobs_repo.insert_pending(
session, _make_request("c", "a"), callback_url=None
)
b = await jobs_repo.insert_pending(
session, _make_request("c", "b"), callback_url=None
)
await session.commit()
session_a = session_factory()
session_b = session_factory()
try:
# Start the first claim but *don't* commit yet — its row is locked.
first = await jobs_repo.claim_next_pending(session_a)
# Second claimer runs while the first is still holding its lock. It
# must see the 'a' row as pending but SKIP it, returning the 'b' row.
second = await jobs_repo.claim_next_pending(session_b)
assert first is not None and second is not None
assert {first.job_id, second.job_id} == {a.job_id, b.job_id}
assert first.job_id != second.job_id
await session_a.commit()
await session_b.commit()
finally:
await session_a.close()
await session_b.close()
async def test_mark_done_writes_response_and_finishes(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
response = ResponseIX(
use_case="bank_statement_header",
ix_client_id="mammon",
request_id="r-1",
)
async with session_factory() as session:
await jobs_repo.mark_done(session, inserted.job_id, response)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "done"
assert after.response is not None
assert after.finished_at is not None
async def test_mark_done_with_error_response_moves_to_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""`done` iff response.error is None — otherwise status='error'."""
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
bad = ResponseIX(error="IX_002_000: boom")
async with session_factory() as session:
await jobs_repo.mark_done(session, inserted.job_id, bad)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "error"
assert after.response is not None
assert (after.response.error or "").startswith("IX_002_000")
async def test_mark_error_always_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
bad = ResponseIX(error="IX_000_005: unsupported")
async with session_factory() as session:
await jobs_repo.mark_error(session, inserted.job_id, bad)
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "error"
async def test_update_callback_status(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url="http://cb"
)
await session.commit()
async with session_factory() as session:
await jobs_repo.update_callback_status(session, inserted.job_id, "delivered")
await session.commit()
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.callback_status == "delivered"
async def test_sweep_orphans_resets_stale_running(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Running rows older than (now - max_running_seconds) go back to pending."""
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request(), callback_url=None
)
await session.commit()
# Backdate started_at by an hour to simulate a crashed worker mid-job.
async with session_factory() as session:
from sqlalchemy import text
stale = datetime.now(UTC) - timedelta(hours=1)
await session.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{"t": stale, "jid": inserted.job_id},
)
await session.commit()
# Max age of 60 s → our hour-old row gets swept.
async with session_factory() as session:
rescued = await jobs_repo.sweep_orphans(
session, datetime.now(UTC), max_running_seconds=60
)
await session.commit()
assert inserted.job_id in rescued
async with session_factory() as session:
after = await jobs_repo.get(session, inserted.job_id)
assert after is not None
assert after.status == "pending"
assert after.attempts == 1
async def test_sweep_orphans_leaves_fresh_running_alone(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""A just-claimed row must not get reclaimed by the sweeper."""
async with session_factory() as session:
await jobs_repo.insert_pending(session, _make_request(), callback_url=None)
await session.commit()
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is not None
# Sweep with a huge threshold (1 hour). Our just-claimed row is fresh, so
# it stays running.
async with session_factory() as session:
rescued = await jobs_repo.sweep_orphans(
session, datetime.now(UTC), max_running_seconds=3600
)
await session.commit()
assert rescued == []
async with session_factory() as session:
after = await jobs_repo.get(session, claimed.job_id)
assert after is not None
assert after.status == "running"
async def test_concurrent_claim_never_double_dispatches(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Spin a batch of concurrent claimers; every insert is claimed exactly once."""
async with session_factory() as session:
ids = []
for i in range(5):
job = await jobs_repo.insert_pending(
session, _make_request("mass", f"r-{i}"), callback_url=None
)
ids.append(job.job_id)
await session.commit()
async def claim_one() -> UUID | None:
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
return claimed.job_id if claimed else None
results = await asyncio.gather(*(claim_one() for _ in range(10)))
non_null = [r for r in results if r is not None]
# Every inserted id appears at most once.
assert sorted(non_null) == sorted(ids)

View file

@ -0,0 +1,153 @@
"""Integration tests for the PgQueueListener + worker integration (Task 3.6).
Two scenarios:
1. NOTIFY delivered worker wakes within ~1 s and picks the row up.
2. Missed NOTIFY the row still gets picked up by the fallback poll.
Both run a real worker + listener against a live Postgres. We drive them via
``asyncio.gather`` + a "until done" watchdog.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from sqlalchemy import text
from ix.adapters.pg_queue.listener import PgQueueListener, asyncpg_dsn_from_sqlalchemy_url
from ix.contracts.request import Context, RequestIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Same minimal fake as test_worker_loop — keeps these suites independent."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
return response_ix
def _factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
async def _wait_for_status(
session_factory: async_sessionmaker[AsyncSession],
job_id,
target: str,
timeout_s: float,
) -> bool:
deadline = asyncio.get_event_loop().time() + timeout_s
while asyncio.get_event_loop().time() < deadline:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is not None and job.status == target:
return True
await asyncio.sleep(0.1)
return False
async def test_notify_wakes_worker_within_2s(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Direct INSERT + NOTIFY → worker picks it up fast (not via the poll)."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# 60 s fallback poll — if we still find the row within 2 s it's
# because NOTIFY woke us, not the poll.
poll_interval_seconds=60.0,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
# Give the worker one tick to reach the sleep_or_wake branch.
await asyncio.sleep(0.3)
# Insert a pending row manually + NOTIFY — simulates a direct-SQL client
# like an external batch script.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="notify-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
async with session_factory() as session:
await session.execute(
text(f"NOTIFY ix_jobs_new, '{job.job_id}'")
)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 3.0), (
"worker didn't pick up the NOTIFY'd row in time"
)
stop.set()
await worker_task
await listener.stop()
async def test_missed_notify_falls_back_to_poll(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Row lands without a NOTIFY; fallback poll still picks it up."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# Short poll so the fallback kicks in quickly — we need the test
# to finish in seconds, not the spec's 10 s.
poll_interval_seconds=0.5,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
await asyncio.sleep(0.3)
# Insert without NOTIFY: simulate a buggy writer.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="missed-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 5.0), (
"fallback poll didn't pick up the row"
)
stop.set()
await worker_task
await listener.stop()

View file

@ -0,0 +1,179 @@
"""Integration tests for the FastAPI REST adapter (spec §5).
Uses ``fastapi.testclient.TestClient`` against a real DB. Ollama / OCR probes
are stubbed via the DI hooks the routes expose for testing in Chunk 4 the
production probes swap in.
"""
from __future__ import annotations
from collections.abc import Iterator
from uuid import uuid4
import pytest
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep
from ix.app import create_app
def _factory_for_url(postgres_url: str): # type: ignore[no-untyped-def]
"""Build a TestClient-compatible session factory.
TestClient runs the ASGI app on its own dedicated event loop (the one it
creates in its sync wrapper), distinct from the per-test loop
pytest-asyncio gives direct tests. Session factories must therefore be
constructed from an engine that was itself created on that inner loop.
We do this lazily: each dependency resolution creates a fresh engine +
factory on the current running loop, which is the TestClient's loop at
route-invocation time. Engine reuse would drag the cross-loop futures
that asyncpg hates back in.
"""
def _factory(): # type: ignore[no-untyped-def]
eng = create_async_engine(postgres_url, pool_pre_ping=True)
return async_sessionmaker(eng, expire_on_commit=False)
return _factory
@pytest.fixture
def app(postgres_url: str) -> Iterator[TestClient]:
"""Spin up the FastAPI app wired to the test DB + stub probes."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok",
ocr=lambda: "ok",
)
with TestClient(app_obj) as client:
yield client
def _valid_request_body(client_id: str = "mammon", request_id: str = "r-1") -> dict:
return {
"use_case": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
"context": {"texts": ["hello world"]},
}
def test_post_jobs_creates_pending(app: TestClient) -> None:
resp = app.post("/jobs", json=_valid_request_body())
assert resp.status_code == 201, resp.text
body = resp.json()
assert body["status"] == "pending"
assert len(body["ix_id"]) == 16
assert body["job_id"]
def test_post_jobs_idempotent_returns_200(app: TestClient) -> None:
first = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert first.status_code == 201
first_body = first.json()
second = app.post("/jobs", json=_valid_request_body("m", "dup"))
assert second.status_code == 200
second_body = second.json()
assert second_body["job_id"] == first_body["job_id"]
assert second_body["ix_id"] == first_body["ix_id"]
def test_get_job_by_id(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body()).json()
resp = app.get(f"/jobs/{created['job_id']}")
assert resp.status_code == 200
body = resp.json()
assert body["job_id"] == created["job_id"]
assert body["request"]["use_case"] == "bank_statement_header"
assert body["status"] == "pending"
def test_get_job_404(app: TestClient) -> None:
resp = app.get(f"/jobs/{uuid4()}")
assert resp.status_code == 404
def test_get_by_correlation_query(app: TestClient) -> None:
created = app.post("/jobs", json=_valid_request_body("mammon", "corr-1")).json()
resp = app.get("/jobs", params={"client_id": "mammon", "request_id": "corr-1"})
assert resp.status_code == 200
assert resp.json()["job_id"] == created["job_id"]
missing = app.get("/jobs", params={"client_id": "mammon", "request_id": "nope"})
assert missing.status_code == 404
def test_healthz_all_ok(app: TestClient) -> None:
resp = app.get("/healthz")
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["postgres"] == "ok"
assert body["ollama"] == "ok"
assert body["ocr"] == "ok"
def test_healthz_503_on_postgres_fail(postgres_url: str) -> None:
"""Broken postgres probe → 503. Ollama/OCR still surface in the body."""
app_obj = create_app(spawn_worker=False)
def _bad_factory(): # type: ignore[no-untyped-def]
def _raise(): # type: ignore[no-untyped-def]
raise RuntimeError("db down")
return _raise
app_obj.dependency_overrides[get_session_factory_dep] = _bad_factory
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "ok", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
body = resp.json()
assert body["postgres"] == "fail"
def test_healthz_degraded_ollama_is_503(postgres_url: str) -> None:
"""Per spec §5: degraded flips HTTP to 503 (only all-ok yields 200)."""
app_obj = create_app(spawn_worker=False)
app_obj.dependency_overrides[get_session_factory_dep] = _factory_for_url(
postgres_url
)
app_obj.dependency_overrides[get_probes] = lambda: Probes(
ollama=lambda: "degraded", ocr=lambda: "ok"
)
with TestClient(app_obj) as client:
resp = client.get("/healthz")
assert resp.status_code == 503
assert resp.json()["ollama"] == "degraded"
def test_metrics_shape(app: TestClient) -> None:
# Submit a couple of pending jobs to populate counters.
app.post("/jobs", json=_valid_request_body("mm", "a"))
app.post("/jobs", json=_valid_request_body("mm", "b"))
resp = app.get("/metrics")
assert resp.status_code == 200
body = resp.json()
for key in (
"jobs_pending",
"jobs_running",
"jobs_done_24h",
"jobs_error_24h",
"by_use_case_seconds",
):
assert key in body
assert body["jobs_pending"] == 2
assert body["jobs_running"] == 0
assert isinstance(body["by_use_case_seconds"], dict)

View file

@ -0,0 +1,325 @@
"""Integration tests for the worker loop (Task 3.5).
We spin up a real worker with a fake pipeline factory and verify the lifecycle
transitions against a live DB. Callback delivery is exercised via
``pytest-httpx`` callers' webhook endpoints are mocked, not run.
"""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
import pytest
from pytest_httpx import HTTPXMock
from ix.contracts.request import Context, RequestIX
from ix.contracts.response import ResponseIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Minimal fake step that writes a sentinel field on the response."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
response_ix.ix_client_id = request_ix.ix_client_id
response_ix.request_id = request_ix.request_id
response_ix.ix_id = request_ix.ix_id
return response_ix
class _RaisingStep(Step):
"""Fake step that raises a non-IX exception to exercise the worker's
belt-and-braces error path."""
step_name = "fake_raise"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
raise RuntimeError("boom")
def _ok_factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
def _bad_factory() -> Pipeline:
return Pipeline(steps=[_RaisingStep()])
async def _insert_pending(session_factory, **kwargs): # type: ignore[no-untyped-def]
request = RequestIX(
use_case="bank_statement_header",
ix_client_id=kwargs.get("client", "test"),
request_id=kwargs.get("rid", "r-1"),
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, request, callback_url=kwargs.get("cb")
)
await session.commit()
return job
async def test_worker_runs_one_job_to_done(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
"""Wait until the job lands in a terminal state, then stop the worker."""
for _ in range(50): # 5 seconds budget
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status in ("done", "error"):
stop.set()
return
stop.set() # timeout — let the worker exit so assertions run
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
assert final.finished_at is not None
async def test_worker_pipeline_exception_marks_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""A step raising a non-IX exception → status=error, response carries the
code. The worker catches what the pipeline doesn't."""
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_bad_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "error":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "error"
assert final.response is not None
assert (final.response.error or "").startswith("IX_002_000")
async def test_worker_delivers_callback(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""callback_url on a done job → one POST, callback_status=delivered."""
httpx_mock.add_response(url="http://caller/webhook", status_code=200)
job = await _insert_pending(session_factory, cb="http://caller/webhook")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status == "delivered"
async def test_worker_marks_callback_failed_on_5xx(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
httpx_mock.add_response(url="http://caller/bad", status_code=500)
job = await _insert_pending(session_factory, cb="http://caller/bad")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done" # terminal state stays done
assert final.callback_status == "failed"
async def test_worker_sweeps_orphans_at_startup(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Stale running rows → pending before the loop starts picking work."""
# Insert a job and backdate it to mimic a crashed worker mid-run.
job = await _insert_pending(session_factory, rid="orphan")
async with session_factory() as session:
from sqlalchemy import text
stale = datetime.now(UTC) - timedelta(hours=2)
await session.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{"t": stale, "jid": job.job_id},
)
await session.commit()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
# max_running_seconds=60 so our 2-hour-old row gets swept.
max_running_seconds=60,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
# attempts starts at 0, gets +1 on sweep.
assert final.attempts >= 1
@pytest.mark.parametrize("non_matching_url", ["http://x/y", None])
async def test_worker_no_callback_leaves_callback_status_none(
session_factory: async_sessionmaker[AsyncSession],
httpx_mock: HTTPXMock,
non_matching_url: str | None,
) -> None:
"""Jobs without a callback_url should never get a callback_status set."""
if non_matching_url is not None:
# If we ever accidentally deliver, pytest-httpx will complain because
# no mock matches — which is the signal we want.
pass
job = await _insert_pending(session_factory) # cb=None by default
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status is None
def _unused() -> None:
"""Silence a ruff F401 for ResponseIX — kept for symmetry w/ other tests."""
_ = ResponseIX

0
tests/live/__init__.py Normal file
View file

View file

@ -0,0 +1,70 @@
"""Live tests for :class:`OllamaClient` — gated on ``IX_TEST_OLLAMA=1``.
Never runs in CI (Forgejo runner has no LAN access to Ollama). Run locally::
IX_TEST_OLLAMA=1 uv run pytest tests/live/test_ollama_client_live.py -v
Assumes the Ollama server at ``http://192.168.68.42:11434`` already has
``qwen3:14b`` pulled.
"""
from __future__ import annotations
import os
import pytest
from ix.genai.ollama_client import OllamaClient
from ix.use_cases.bank_statement_header import BankStatementHeader
pytestmark = [
pytest.mark.live,
pytest.mark.skipif(
os.environ.get("IX_TEST_OLLAMA") != "1",
reason="live: IX_TEST_OLLAMA=1 required",
),
]
_OLLAMA_URL = "http://192.168.68.42:11434"
_MODEL = "qwen3:14b"
async def test_structured_output_round_trip() -> None:
"""Real Ollama returns a parsed BankStatementHeader instance."""
client = OllamaClient(base_url=_OLLAMA_URL, per_call_timeout_s=300.0)
result = await client.invoke(
request_kwargs={
"model": _MODEL,
"messages": [
{
"role": "system",
"content": (
"You extract bank statement header fields. "
"Return valid JSON matching the given schema. "
"Do not invent values."
),
},
{
"role": "user",
"content": (
"Bank: Deutsche Kreditbank (DKB)\n"
"Currency: EUR\n"
"IBAN: DE89370400440532013000\n"
"Period: 2025-01-01 to 2025-01-31"
),
},
],
},
response_schema=BankStatementHeader,
)
assert isinstance(result.parsed, BankStatementHeader)
assert isinstance(result.parsed.bank_name, str)
assert result.parsed.bank_name # non-empty
assert isinstance(result.parsed.currency, str)
assert result.model_name # server echoes a model name
async def test_selfcheck_ok_against_real_server() -> None:
"""``selfcheck`` returns ``ok`` when the target model is pulled."""
client = OllamaClient(base_url=_OLLAMA_URL, per_call_timeout_s=5.0)
assert await client.selfcheck(expected_model=_MODEL) == "ok"

View file

@ -0,0 +1,83 @@
"""Live test for :class:`SuryaOCRClient` — gated on ``IX_TEST_OLLAMA=1``.
Downloads real Surya models (hundreds of MB) on first run. Never runs in
CI. Exercised locally with::
IX_TEST_OLLAMA=1 uv run pytest tests/live/test_surya_client_live.py -v
Note: requires the ``[ocr]`` extra ``uv sync --extra ocr --extra dev``.
"""
from __future__ import annotations
import os
from pathlib import Path
import pytest
from ix.contracts import Page
from ix.segmentation import PageMetadata
pytestmark = [
pytest.mark.live,
pytest.mark.skipif(
os.environ.get("IX_TEST_OLLAMA") != "1",
reason="live: IX_TEST_OLLAMA=1 required",
),
]
async def test_extracts_dkb_and_iban_from_synthetic_giro() -> None:
"""Real Surya run against ``tests/fixtures/synthetic_giro.pdf``.
Assert the flat text contains ``"DKB"`` and the IBAN without spaces.
"""
from ix.ocr.surya_client import SuryaOCRClient
fixture = Path(__file__).parent.parent / "fixtures" / "synthetic_giro.pdf"
assert fixture.exists(), f"missing fixture: {fixture}"
# Build Pages the way DocumentIngestor would for this PDF: count pages
# via PyMuPDF so we pass the right number of inputs.
import fitz
doc = fitz.open(str(fixture))
try:
pages = [
Page(
page_no=i + 1,
width=float(p.rect.width),
height=float(p.rect.height),
lines=[],
)
for i, p in enumerate(doc)
]
finally:
doc.close()
client = SuryaOCRClient()
result = await client.ocr(
pages,
files=[(fixture, "application/pdf")],
page_metadata=[PageMetadata(file_index=0) for _ in pages],
)
flat_text = result.result.text or ""
# Join page-level line texts if flat text missing (shape-safety).
if not flat_text:
flat_text = "\n".join(
line.text or ""
for page in result.result.pages
for line in page.lines
)
assert "DKB" in flat_text
assert "DE89370400440532013000" in flat_text.replace(" ", "")
async def test_selfcheck_ok_against_real_predictors() -> None:
"""``selfcheck()`` returns ``ok`` once Surya's predictors load."""
from ix.ocr.surya_client import SuryaOCRClient
client = SuryaOCRClient()
assert await client.selfcheck() == "ok"

View file

@ -0,0 +1,104 @@
"""Tests for ``ix.app`` lifespan / probe wiring (Task 4.3).
The lifespan selects fake clients when ``IX_TEST_MODE=fake`` and exposes
their probes via the route DI hook. These tests exercise the probe
adapter in isolation no DB, no real Ollama/Surya.
"""
from __future__ import annotations
from typing import Literal
from ix.app import _make_ocr_probe, _make_ollama_probe, build_pipeline
from ix.config import AppConfig
from ix.genai.fake import FakeGenAIClient
from ix.ocr.fake import FakeOCRClient
from ix.pipeline.genai_step import GenAIStep
from ix.pipeline.ocr_step import OCRStep
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.reliability_step import ReliabilityStep
from ix.pipeline.response_handler_step import ResponseHandlerStep
from ix.pipeline.setup_step import SetupStep
def _cfg(**overrides: object) -> AppConfig:
return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg]
class _SelfcheckOllamaClient:
async def invoke(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
self.called_with = expected_model
return "ok"
class _SelfcheckOCRClient:
async def ocr(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(self) -> Literal["ok", "fail"]:
return "ok"
class _BrokenSelfcheckOllama:
async def invoke(self, *a: object, **kw: object) -> object:
raise NotImplementedError
async def selfcheck(
self, expected_model: str
) -> Literal["ok", "degraded", "fail"]:
raise RuntimeError("boom")
class TestOllamaProbe:
def test_fake_client_without_selfcheck_reports_ok(self) -> None:
cfg = _cfg(test_mode="fake", default_model="gpt-oss:20b")
probe = _make_ollama_probe(FakeGenAIClient(parsed=None), cfg)
assert probe() == "ok"
def test_real_selfcheck_returns_its_verdict(self) -> None:
cfg = _cfg(default_model="gpt-oss:20b")
client = _SelfcheckOllamaClient()
probe = _make_ollama_probe(client, cfg) # type: ignore[arg-type]
assert probe() == "ok"
assert client.called_with == "gpt-oss:20b"
def test_selfcheck_exception_falls_back_to_fail(self) -> None:
cfg = _cfg(default_model="gpt-oss:20b")
probe = _make_ollama_probe(_BrokenSelfcheckOllama(), cfg) # type: ignore[arg-type]
assert probe() == "fail"
class TestOCRProbe:
def test_fake_client_without_selfcheck_reports_ok(self) -> None:
from ix.contracts.response import OCRDetails, OCRResult
probe = _make_ocr_probe(FakeOCRClient(canned=OCRResult(result=OCRDetails())))
assert probe() == "ok"
def test_real_selfcheck_returns_its_verdict(self) -> None:
probe = _make_ocr_probe(_SelfcheckOCRClient()) # type: ignore[arg-type]
assert probe() == "ok"
class TestBuildPipeline:
def test_assembles_all_five_steps_in_order(self) -> None:
from ix.contracts.response import OCRDetails, OCRResult
genai = FakeGenAIClient(parsed=None)
ocr = FakeOCRClient(canned=OCRResult(result=OCRDetails()))
cfg = _cfg(test_mode="fake")
pipeline = build_pipeline(genai, ocr, cfg)
assert isinstance(pipeline, Pipeline)
steps = pipeline._steps # type: ignore[attr-defined]
assert [type(s) for s in steps] == [
SetupStep,
OCRStep,
GenAIStep,
ReliabilityStep,
ResponseHandlerStep,
]

View file

@ -51,10 +51,10 @@ def test_defaults_match_spec(monkeypatch: pytest.MonkeyPatch) -> None:
assert cfg.postgres_url == (
"postgresql+asyncpg://infoxtractor:<password>"
"@host.docker.internal:5431/infoxtractor"
"@127.0.0.1:5431/infoxtractor"
)
assert cfg.ollama_url == "http://host.docker.internal:11434"
assert cfg.default_model == "gpt-oss:20b"
assert cfg.ollama_url == "http://127.0.0.1:11434"
assert cfg.default_model == "qwen3:14b"
assert cfg.ocr_engine == "surya"
assert cfg.tmp_dir == "/tmp/ix"
assert cfg.pipeline_worker_concurrency == 1

View file

@ -0,0 +1,60 @@
"""Tests for the GenAI + OCR factories (Task 4.3).
The factories pick between fake and real clients based on
``IX_TEST_MODE``. CI runs with ``IX_TEST_MODE=fake``, production runs
without so the selection knob is the one lever between hermetic CI and
real clients.
"""
from __future__ import annotations
from ix.config import AppConfig
from ix.genai import make_genai_client
from ix.genai.fake import FakeGenAIClient
from ix.genai.ollama_client import OllamaClient
from ix.ocr import make_ocr_client
from ix.ocr.fake import FakeOCRClient
from ix.ocr.surya_client import SuryaOCRClient
def _cfg(**overrides: object) -> AppConfig:
"""Build an AppConfig without loading the repo's .env.example."""
return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg]
class TestGenAIFactory:
def test_fake_mode_returns_fake(self) -> None:
cfg = _cfg(test_mode="fake")
client = make_genai_client(cfg)
assert isinstance(client, FakeGenAIClient)
def test_production_returns_ollama_with_configured_url(self) -> None:
cfg = _cfg(
test_mode=None,
ollama_url="http://ollama.host:11434",
genai_call_timeout_seconds=42,
)
client = make_genai_client(cfg)
assert isinstance(client, OllamaClient)
# Inspect the private attrs for binding correctness.
assert client._base_url == "http://ollama.host:11434"
assert client._per_call_timeout_s == 42
class TestOCRFactory:
def test_fake_mode_returns_fake(self) -> None:
cfg = _cfg(test_mode="fake")
client = make_ocr_client(cfg)
assert isinstance(client, FakeOCRClient)
def test_production_surya_returns_surya(self) -> None:
cfg = _cfg(test_mode=None, ocr_engine="surya")
client = make_ocr_client(cfg)
assert isinstance(client, SuryaOCRClient)
def test_unknown_engine_raises(self) -> None:
cfg = _cfg(test_mode=None, ocr_engine="tesseract")
import pytest
with pytest.raises(ValueError, match="ocr_engine"):
make_ocr_client(cfg)

View file

@ -363,8 +363,8 @@ class TestModelSelection:
req = _make_request(include_provenance=False)
resp = _response_with_segment_index(lines=["hello"])
await step.process(req, resp)
# use-case default is gpt-oss:20b
assert client.request_kwargs["model"] == "gpt-oss:20b" # type: ignore[index]
# use-case default is qwen3:14b
assert client.request_kwargs["model"] == "qwen3:14b" # type: ignore[index]
# ----------------------------------------------------------------------------

View file

@ -0,0 +1,270 @@
"""Tests for :class:`OllamaClient` — hermetic, pytest-httpx-driven.
Covers spec §6 GenAIStep Ollama call contract:
* POST body shape (model / messages / format / stream / options).
* Response parsing :class:`GenAIInvocationResult`.
* Error mapping: connection / timeout / 5xx ``IX_002_000``;
schema-violating body ``IX_002_001``.
* ``selfcheck()``: tags-reachable + model-listed ``ok``;
reachable-but-missing ``degraded``; unreachable ``fail``.
"""
from __future__ import annotations
import httpx
import pytest
from pydantic import BaseModel
from pytest_httpx import HTTPXMock
from ix.errors import IXErrorCode, IXException
from ix.genai.ollama_client import OllamaClient
class _Schema(BaseModel):
"""Trivial structured-output schema for the round-trip tests."""
bank_name: str
account_number: str | None = None
def _ollama_chat_ok_body(content_json: str) -> dict:
"""Build a minimal Ollama /api/chat success body."""
return {
"model": "gpt-oss:20b",
"message": {"role": "assistant", "content": content_json},
"done": True,
"eval_count": 42,
"prompt_eval_count": 17,
}
class TestInvokeHappyPath:
async def test_posts_to_chat_endpoint_with_format_and_no_stream(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"bank_name":"DKB","account_number":"DE89"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
result = await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [
{"role": "system", "content": "You extract."},
{"role": "user", "content": "Doc body"},
],
"temperature": 0.2,
"reasoning_effort": "high", # dropped silently
},
response_schema=_Schema,
)
assert result.parsed == _Schema(bank_name="DKB", account_number="DE89")
assert result.model_name == "gpt-oss:20b"
assert result.usage.prompt_tokens == 17
assert result.usage.completion_tokens == 42
# Verify request shape.
requests = httpx_mock.get_requests()
assert len(requests) == 1
body = requests[0].read().decode()
import json
body_json = json.loads(body)
assert body_json["model"] == "gpt-oss:20b"
assert body_json["stream"] is False
# No `format` is sent: Ollama 0.11.8 segfaults on full schemas and
# aborts to `{}` with `format=json` on reasoning models. Schema is
# injected into the system prompt instead; we extract the trailing
# JSON blob from the response and validate via Pydantic.
assert "format" not in body_json
assert body_json["options"]["temperature"] == 0.2
assert "reasoning_effort" not in body_json
# A schema-guidance system message is prepended to the caller's
# messages so Ollama (format=json loose mode) emits the right shape.
msgs = body_json["messages"]
assert msgs[0]["role"] == "system"
assert "JSON Schema" in msgs[0]["content"]
assert msgs[1:] == [
{"role": "system", "content": "You extract."},
{"role": "user", "content": "Doc body"},
]
async def test_text_parts_content_list_is_joined(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"bank_name":"X"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "part-a"},
{"type": "text", "text": "part-b"},
],
}
],
},
response_schema=_Schema,
)
import json
request_body = json.loads(httpx_mock.get_requests()[0].read())
# First message is the auto-injected schema guidance; after that
# the caller's user message has its text parts joined.
assert request_body["messages"][0]["role"] == "system"
assert request_body["messages"][1:] == [
{"role": "user", "content": "part-a\npart-b"}
]
class TestInvokeErrorPaths:
async def test_connection_error_maps_to_002_000(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_exception(httpx.ConnectError("refused"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=1.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_read_timeout_maps_to_002_000(self, httpx_mock: HTTPXMock) -> None:
httpx_mock.add_exception(httpx.ReadTimeout("slow"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=0.5
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
async def test_500_maps_to_002_000_with_body_snippet(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
status_code=500,
text="boom boom server broken",
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_000
assert "boom" in (ei.value.detail or "")
async def test_200_with_invalid_json_maps_to_002_001(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body("not-json"),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_001
async def test_200_with_schema_violation_maps_to_002_001(
self, httpx_mock: HTTPXMock
) -> None:
# Missing required `bank_name` field.
httpx_mock.add_response(
url="http://ollama.test:11434/api/chat",
method="POST",
json=_ollama_chat_ok_body('{"account_number":"DE89"}'),
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
with pytest.raises(IXException) as ei:
await client.invoke(
request_kwargs={
"model": "gpt-oss:20b",
"messages": [{"role": "user", "content": "hi"}],
},
response_schema=_Schema,
)
assert ei.value.code is IXErrorCode.IX_002_001
class TestSelfcheck:
async def test_selfcheck_ok_when_model_listed(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/tags",
method="GET",
json={"models": [{"name": "gpt-oss:20b"}, {"name": "qwen2.5:32b"}]},
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "ok"
async def test_selfcheck_degraded_when_model_missing(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_response(
url="http://ollama.test:11434/api/tags",
method="GET",
json={"models": [{"name": "qwen2.5:32b"}]},
)
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "degraded"
async def test_selfcheck_fail_on_connection_error(
self, httpx_mock: HTTPXMock
) -> None:
httpx_mock.add_exception(httpx.ConnectError("refused"))
client = OllamaClient(
base_url="http://ollama.test:11434", per_call_timeout_s=5.0
)
assert await client.selfcheck(expected_model="gpt-oss:20b") == "fail"

View file

@ -0,0 +1,166 @@
"""Tests for :class:`SuryaOCRClient` — hermetic, no model download.
The real Surya predictors are patched out with :class:`unittest.mock.MagicMock`
that return trivially-shaped line objects. The tests assert the client's
translation layer flattening polygons, mapping text_lines ``Line``,
preserving ``page_no``/``width``/``height`` per input page.
"""
from __future__ import annotations
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
from ix.contracts import Page
from ix.ocr.surya_client import SuryaOCRClient
from ix.segmentation import PageMetadata
def _make_surya_line(text: str, polygon: list[list[float]]) -> SimpleNamespace:
"""Mimic ``surya.recognition.schema.TextLine`` duck-typing-style."""
return SimpleNamespace(text=text, polygon=polygon, confidence=0.95)
def _make_surya_ocr_result(lines: list[SimpleNamespace]) -> SimpleNamespace:
"""Mimic ``surya.recognition.schema.OCRResult``."""
return SimpleNamespace(text_lines=lines, image_bbox=[0, 0, 100, 100])
class TestOCRBuildsOCRResultFromMockedPredictors:
async def test_one_image_one_line_flatten_polygon(self, tmp_path: Path) -> None:
img_path = tmp_path / "a.png"
_write_tiny_png(img_path)
mock_line = _make_surya_line(
text="hello",
polygon=[[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]],
)
mock_predictor = MagicMock(
return_value=[_make_surya_ocr_result([mock_line])]
)
client = SuryaOCRClient()
# Skip the real warm_up; inject the mock directly.
client._recognition_predictor = mock_predictor
client._detection_predictor = MagicMock()
pages = [Page(page_no=1, width=100.0, height=50.0, lines=[])]
result = await client.ocr(
pages,
files=[(img_path, "image/png")],
page_metadata=[PageMetadata(file_index=0)],
)
assert len(result.result.pages) == 1
out_page = result.result.pages[0]
assert out_page.page_no == 1
assert out_page.width == 100.0
assert out_page.height == 50.0
assert len(out_page.lines) == 1
assert out_page.lines[0].text == "hello"
assert out_page.lines[0].bounding_box == [
1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0
]
async def test_multiple_pages_preserves_order(self, tmp_path: Path) -> None:
img_a = tmp_path / "a.png"
img_b = tmp_path / "b.png"
_write_tiny_png(img_a)
_write_tiny_png(img_b)
mock_predictor = MagicMock(
return_value=[
_make_surya_ocr_result(
[_make_surya_line("a-line", [[0, 0], [1, 0], [1, 1], [0, 1]])]
),
_make_surya_ocr_result(
[_make_surya_line("b-line", [[0, 0], [1, 0], [1, 1], [0, 1]])]
),
]
)
client = SuryaOCRClient()
client._recognition_predictor = mock_predictor
client._detection_predictor = MagicMock()
pages = [
Page(page_no=1, width=10.0, height=20.0, lines=[]),
Page(page_no=2, width=10.0, height=20.0, lines=[]),
]
result = await client.ocr(
pages,
files=[(img_a, "image/png"), (img_b, "image/png")],
page_metadata=[
PageMetadata(file_index=0),
PageMetadata(file_index=1),
],
)
assert [p.lines[0].text for p in result.result.pages] == ["a-line", "b-line"]
async def test_lazy_warm_up_on_first_ocr(self, tmp_path: Path) -> None:
img = tmp_path / "x.png"
_write_tiny_png(img)
client = SuryaOCRClient()
# Use patch.object on the instance's warm_up so we don't need real
# Surya module loading.
with patch.object(client, "warm_up", autospec=True) as mocked_warm_up:
# After warm_up is called, the predictors must be assigned.
def fake_warm_up(self: SuryaOCRClient) -> None:
self._recognition_predictor = MagicMock(
return_value=[
_make_surya_ocr_result(
[
_make_surya_line(
"hi", [[0, 0], [1, 0], [1, 1], [0, 1]]
)
]
)
]
)
self._detection_predictor = MagicMock()
mocked_warm_up.side_effect = lambda: fake_warm_up(client)
pages = [Page(page_no=1, width=10.0, height=10.0, lines=[])]
await client.ocr(
pages,
files=[(img, "image/png")],
page_metadata=[PageMetadata(file_index=0)],
)
mocked_warm_up.assert_called_once()
class TestSelfcheck:
async def test_selfcheck_ok_with_mocked_predictors(self) -> None:
client = SuryaOCRClient()
client._recognition_predictor = MagicMock(
return_value=[_make_surya_ocr_result([])]
)
client._detection_predictor = MagicMock()
assert await client.selfcheck() == "ok"
async def test_selfcheck_fail_when_predictor_raises(self) -> None:
client = SuryaOCRClient()
client._recognition_predictor = MagicMock(
side_effect=RuntimeError("cuda broken")
)
client._detection_predictor = MagicMock()
assert await client.selfcheck() == "fail"
def _write_tiny_png(path: Path) -> None:
"""Write a 2x2 white PNG so PIL can open it."""
from PIL import Image
Image.new("RGB", (2, 2), color="white").save(path, format="PNG")
@pytest.mark.parametrize("unused", [None]) # keep pytest happy if file ever runs alone
def test_module_imports(unused: None) -> None:
assert SuryaOCRClient is not None

View file

@ -12,7 +12,7 @@ class TestRequest:
def test_defaults(self) -> None:
r = Request()
assert r.use_case_name == "Bank Statement Header"
assert r.default_model == "gpt-oss:20b"
assert r.default_model == "qwen3:14b"
# Stable substring for agent/worker tests that want to confirm the
# prompt is what they think it is.
assert "extract header metadata" in r.system_prompt

165
uv.lock
View file

@ -110,6 +110,79 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/db/3c/33bac158f8ab7f89b2e59426d5fe2e4f63f7ed25df84c036890172b412b5/cfgv-3.5.0-py2.py3-none-any.whl", hash = "sha256:a8dc6b26ad22ff227d2634a65cb388215ce6cc96bbcc5cfde7641ae87e8dacc0", size = 7445, upload-time = "2025-11-19T20:55:50.744Z" },
]
[[package]]
name = "charset-normalizer"
version = "3.4.7"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/a1/67fe25fac3c7642725500a3f6cfe5821ad557c3abb11c9d20d12c7008d3e/charset_normalizer-3.4.7.tar.gz", hash = "sha256:ae89db9e5f98a11a4bf50407d4363e7b09b31e55bc117b4f7d80aab97ba009e5", size = 144271, upload-time = "2026-04-02T09:28:39.342Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/eb/4fc8d0a7110eb5fc9cc161723a34a8a6c200ce3b4fbf681bc86feee22308/charset_normalizer-3.4.7-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:eca9705049ad3c7345d574e3510665cb2cf844c2f2dcfe675332677f081cbd46", size = 311328, upload-time = "2026-04-02T09:26:24.331Z" },
{ url = "https://files.pythonhosted.org/packages/f8/e3/0fadc706008ac9d7b9b5be6dc767c05f9d3e5df51744ce4cc9605de7b9f4/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:6178f72c5508bfc5fd446a5905e698c6212932f25bcdd4b47a757a50605a90e2", size = 208061, upload-time = "2026-04-02T09:26:25.568Z" },
{ url = "https://files.pythonhosted.org/packages/42/f0/3dd1045c47f4a4604df85ec18ad093912ae1344ac706993aff91d38773a2/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e1421b502d83040e6d7fb2fb18dff63957f720da3d77b2fbd3187ceb63755d7b", size = 229031, upload-time = "2026-04-02T09:26:26.865Z" },
{ url = "https://files.pythonhosted.org/packages/dc/67/675a46eb016118a2fbde5a277a5d15f4f69d5f3f5f338e5ee2f8948fcf43/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:edac0f1ab77644605be2cbba52e6b7f630731fc42b34cb0f634be1a6eface56a", size = 225239, upload-time = "2026-04-02T09:26:28.044Z" },
{ url = "https://files.pythonhosted.org/packages/4b/f8/d0118a2f5f23b02cd166fa385c60f9b0d4f9194f574e2b31cef350ad7223/charset_normalizer-3.4.7-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5649fd1c7bade02f320a462fdefd0b4bd3ce036065836d4f42e0de958038e116", size = 216589, upload-time = "2026-04-02T09:26:29.239Z" },
{ url = "https://files.pythonhosted.org/packages/b1/f1/6d2b0b261b6c4ceef0fcb0d17a01cc5bc53586c2d4796fa04b5c540bc13d/charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_armv7l.whl", hash = "sha256:203104ed3e428044fd943bc4bf45fa73c0730391f9621e37fe39ecf477b128cb", size = 202733, upload-time = "2026-04-02T09:26:30.5Z" },
{ url = "https://files.pythonhosted.org/packages/6f/c0/7b1f943f7e87cc3db9626ba17807d042c38645f0a1d4415c7a14afb5591f/charset_normalizer-3.4.7-cp312-cp312-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:298930cec56029e05497a76988377cbd7457ba864beeea92ad7e844fe74cd1f1", size = 212652, upload-time = "2026-04-02T09:26:31.709Z" },
{ url = "https://files.pythonhosted.org/packages/38/dd/5a9ab159fe45c6e72079398f277b7d2b523e7f716acc489726115a910097/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:708838739abf24b2ceb208d0e22403dd018faeef86ddac04319a62ae884c4f15", size = 211229, upload-time = "2026-04-02T09:26:33.282Z" },
{ url = "https://files.pythonhosted.org/packages/d5/ff/531a1cad5ca855d1c1a8b69cb71abfd6d85c0291580146fda7c82857caa1/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:0f7eb884681e3938906ed0434f20c63046eacd0111c4ba96f27b76084cd679f5", size = 203552, upload-time = "2026-04-02T09:26:34.845Z" },
{ url = "https://files.pythonhosted.org/packages/c1/4c/a5fb52d528a8ca41f7598cb619409ece30a169fbdf9cdce592e53b46c3a6/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:4dc1e73c36828f982bfe79fadf5919923f8a6f4df2860804db9a98c48824ce8d", size = 230806, upload-time = "2026-04-02T09:26:36.152Z" },
{ url = "https://files.pythonhosted.org/packages/59/7a/071feed8124111a32b316b33ae4de83d36923039ef8cf48120266844285b/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_riscv64.whl", hash = "sha256:aed52fea0513bac0ccde438c188c8a471c4e0f457c2dd20cdbf6ea7a450046c7", size = 212316, upload-time = "2026-04-02T09:26:37.672Z" },
{ url = "https://files.pythonhosted.org/packages/fd/35/f7dba3994312d7ba508e041eaac39a36b120f32d4c8662b8814dab876431/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:fea24543955a6a729c45a73fe90e08c743f0b3334bbf3201e6c4bc1b0c7fa464", size = 227274, upload-time = "2026-04-02T09:26:38.93Z" },
{ url = "https://files.pythonhosted.org/packages/8a/2d/a572df5c9204ab7688ec1edc895a73ebded3b023bb07364710b05dd1c9be/charset_normalizer-3.4.7-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:bb6d88045545b26da47aa879dd4a89a71d1dce0f0e549b1abcb31dfe4a8eac49", size = 218468, upload-time = "2026-04-02T09:26:40.17Z" },
{ url = "https://files.pythonhosted.org/packages/86/eb/890922a8b03a568ca2f336c36585a4713c55d4d67bf0f0c78924be6315ca/charset_normalizer-3.4.7-cp312-cp312-win32.whl", hash = "sha256:2257141f39fe65a3fdf38aeccae4b953e5f3b3324f4ff0daf9f15b8518666a2c", size = 148460, upload-time = "2026-04-02T09:26:41.416Z" },
{ url = "https://files.pythonhosted.org/packages/35/d9/0e7dffa06c5ab081f75b1b786f0aefc88365825dfcd0ac544bdb7b2b6853/charset_normalizer-3.4.7-cp312-cp312-win_amd64.whl", hash = "sha256:5ed6ab538499c8644b8a3e18debabcd7ce684f3fa91cf867521a7a0279cab2d6", size = 159330, upload-time = "2026-04-02T09:26:42.554Z" },
{ url = "https://files.pythonhosted.org/packages/9e/5d/481bcc2a7c88ea6b0878c299547843b2521ccbc40980cb406267088bc701/charset_normalizer-3.4.7-cp312-cp312-win_arm64.whl", hash = "sha256:56be790f86bfb2c98fb742ce566dfb4816e5a83384616ab59c49e0604d49c51d", size = 147828, upload-time = "2026-04-02T09:26:44.075Z" },
{ url = "https://files.pythonhosted.org/packages/c1/3b/66777e39d3ae1ddc77ee606be4ec6d8cbd4c801f65e5a1b6f2b11b8346dd/charset_normalizer-3.4.7-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:f496c9c3cc02230093d8330875c4c3cdfc3b73612a5fd921c65d39cbcef08063", size = 309627, upload-time = "2026-04-02T09:26:45.198Z" },
{ url = "https://files.pythonhosted.org/packages/2e/4e/b7f84e617b4854ade48a1b7915c8ccfadeba444d2a18c291f696e37f0d3b/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0ea948db76d31190bf08bd371623927ee1339d5f2a0b4b1b4a4439a65298703c", size = 207008, upload-time = "2026-04-02T09:26:46.824Z" },
{ url = "https://files.pythonhosted.org/packages/c4/bb/ec73c0257c9e11b268f018f068f5d00aa0ef8c8b09f7753ebd5f2880e248/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:a277ab8928b9f299723bc1a2dabb1265911b1a76341f90a510368ca44ad9ab66", size = 228303, upload-time = "2026-04-02T09:26:48.397Z" },
{ url = "https://files.pythonhosted.org/packages/85/fb/32d1f5033484494619f701e719429c69b766bfc4dbc61aa9e9c8c166528b/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:3bec022aec2c514d9cf199522a802bd007cd588ab17ab2525f20f9c34d067c18", size = 224282, upload-time = "2026-04-02T09:26:49.684Z" },
{ url = "https://files.pythonhosted.org/packages/fa/07/330e3a0dda4c404d6da83b327270906e9654a24f6c546dc886a0eb0ffb23/charset_normalizer-3.4.7-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e044c39e41b92c845bc815e5ae4230804e8e7bc29e399b0437d64222d92809dd", size = 215595, upload-time = "2026-04-02T09:26:50.915Z" },
{ url = "https://files.pythonhosted.org/packages/e3/7c/fc890655786e423f02556e0216d4b8c6bcb6bdfa890160dc66bf52dee468/charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_armv7l.whl", hash = "sha256:f495a1652cf3fbab2eb0639776dad966c2fb874d79d87ca07f9d5f059b8bd215", size = 201986, upload-time = "2026-04-02T09:26:52.197Z" },
{ url = "https://files.pythonhosted.org/packages/d8/97/bfb18b3db2aed3b90cf54dc292ad79fdd5ad65c4eae454099475cbeadd0d/charset_normalizer-3.4.7-cp313-cp313-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:e712b419df8ba5e42b226c510472b37bd57b38e897d3eca5e8cfd410a29fa859", size = 211711, upload-time = "2026-04-02T09:26:53.49Z" },
{ url = "https://files.pythonhosted.org/packages/6f/a5/a581c13798546a7fd557c82614a5c65a13df2157e9ad6373166d2a3e645d/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7804338df6fcc08105c7745f1502ba68d900f45fd770d5bdd5288ddccb8a42d8", size = 210036, upload-time = "2026-04-02T09:26:54.975Z" },
{ url = "https://files.pythonhosted.org/packages/8c/bf/b3ab5bcb478e4193d517644b0fb2bf5497fbceeaa7a1bc0f4d5b50953861/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:481551899c856c704d58119b5025793fa6730adda3571971af568f66d2424bb5", size = 202998, upload-time = "2026-04-02T09:26:56.303Z" },
{ url = "https://files.pythonhosted.org/packages/e7/4e/23efd79b65d314fa320ec6017b4b5834d5c12a58ba4610aa353af2e2f577/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f59099f9b66f0d7145115e6f80dd8b1d847176df89b234a5a6b3f00437aa0832", size = 230056, upload-time = "2026-04-02T09:26:57.554Z" },
{ url = "https://files.pythonhosted.org/packages/b9/9f/1e1941bc3f0e01df116e68dc37a55c4d249df5e6fa77f008841aef68264f/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_riscv64.whl", hash = "sha256:f59ad4c0e8f6bba240a9bb85504faa1ab438237199d4cce5f622761507b8f6a6", size = 211537, upload-time = "2026-04-02T09:26:58.843Z" },
{ url = "https://files.pythonhosted.org/packages/80/0f/088cbb3020d44428964a6c97fe1edfb1b9550396bf6d278330281e8b709c/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_s390x.whl", hash = "sha256:3dedcc22d73ec993f42055eff4fcfed9318d1eeb9a6606c55892a26964964e48", size = 226176, upload-time = "2026-04-02T09:27:00.437Z" },
{ url = "https://files.pythonhosted.org/packages/6a/9f/130394f9bbe06f4f63e22641d32fc9b202b7e251c9aef4db044324dac493/charset_normalizer-3.4.7-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:64f02c6841d7d83f832cd97ccf8eb8a906d06eb95d5276069175c696b024b60a", size = 217723, upload-time = "2026-04-02T09:27:02.021Z" },
{ url = "https://files.pythonhosted.org/packages/73/55/c469897448a06e49f8fa03f6caae97074fde823f432a98f979cc42b90e69/charset_normalizer-3.4.7-cp313-cp313-win32.whl", hash = "sha256:4042d5c8f957e15221d423ba781e85d553722fc4113f523f2feb7b188cc34c5e", size = 148085, upload-time = "2026-04-02T09:27:03.192Z" },
{ url = "https://files.pythonhosted.org/packages/5d/78/1b74c5bbb3f99b77a1715c91b3e0b5bdb6fe302d95ace4f5b1bec37b0167/charset_normalizer-3.4.7-cp313-cp313-win_amd64.whl", hash = "sha256:3946fa46a0cf3e4c8cb1cc52f56bb536310d34f25f01ca9b6c16afa767dab110", size = 158819, upload-time = "2026-04-02T09:27:04.454Z" },
{ url = "https://files.pythonhosted.org/packages/68/86/46bd42279d323deb8687c4a5a811fd548cb7d1de10cf6535d099877a9a9f/charset_normalizer-3.4.7-cp313-cp313-win_arm64.whl", hash = "sha256:80d04837f55fc81da168b98de4f4b797ef007fc8a79ab71c6ec9bc4dd662b15b", size = 147915, upload-time = "2026-04-02T09:27:05.971Z" },
{ url = "https://files.pythonhosted.org/packages/97/c8/c67cb8c70e19ef1960b97b22ed2a1567711de46c4ddf19799923adc836c2/charset_normalizer-3.4.7-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:c36c333c39be2dbca264d7803333c896ab8fa7d4d6f0ab7edb7dfd7aea6e98c0", size = 309234, upload-time = "2026-04-02T09:27:07.194Z" },
{ url = "https://files.pythonhosted.org/packages/99/85/c091fdee33f20de70d6c8b522743b6f831a2f1cd3ff86de4c6a827c48a76/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1c2aed2e5e41f24ea8ef1590b8e848a79b56f3a5564a65ceec43c9d692dc7d8a", size = 208042, upload-time = "2026-04-02T09:27:08.749Z" },
{ url = "https://files.pythonhosted.org/packages/87/1c/ab2ce611b984d2fd5d86a5a8a19c1ae26acac6bad967da4967562c75114d/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:54523e136b8948060c0fa0bc7b1b50c32c186f2fceee897a495406bb6e311d2b", size = 228706, upload-time = "2026-04-02T09:27:09.951Z" },
{ url = "https://files.pythonhosted.org/packages/a8/29/2b1d2cb00bf085f59d29eb773ce58ec2d325430f8c216804a0a5cd83cbca/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:715479b9a2802ecac752a3b0efa2b0b60285cf962ee38414211abdfccc233b41", size = 224727, upload-time = "2026-04-02T09:27:11.175Z" },
{ url = "https://files.pythonhosted.org/packages/47/5c/032c2d5a07fe4d4855fea851209cca2b6f03ebeb6d4e3afdb3358386a684/charset_normalizer-3.4.7-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bd6c2a1c7573c64738d716488d2cdd3c00e340e4835707d8fdb8dc1a66ef164e", size = 215882, upload-time = "2026-04-02T09:27:12.446Z" },
{ url = "https://files.pythonhosted.org/packages/2c/c2/356065d5a8b78ed04499cae5f339f091946a6a74f91e03476c33f0ab7100/charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_armv7l.whl", hash = "sha256:c45e9440fb78f8ddabcf714b68f936737a121355bf59f3907f4e17721b9d1aae", size = 200860, upload-time = "2026-04-02T09:27:13.721Z" },
{ url = "https://files.pythonhosted.org/packages/0c/cd/a32a84217ced5039f53b29f460962abb2d4420def55afabe45b1c3c7483d/charset_normalizer-3.4.7-cp314-cp314-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3534e7dcbdcf757da6b85a0bbf5b6868786d5982dd959b065e65481644817a18", size = 211564, upload-time = "2026-04-02T09:27:15.272Z" },
{ url = "https://files.pythonhosted.org/packages/44/86/58e6f13ce26cc3b8f4a36b94a0f22ae2f00a72534520f4ae6857c4b81f89/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:e8ac484bf18ce6975760921bb6148041faa8fef0547200386ea0b52b5d27bf7b", size = 211276, upload-time = "2026-04-02T09:27:16.834Z" },
{ url = "https://files.pythonhosted.org/packages/8f/fe/d17c32dc72e17e155e06883efa84514ca375f8a528ba2546bee73fc4df81/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_armv7l.whl", hash = "sha256:a5fe03b42827c13cdccd08e6c0247b6a6d4b5e3cdc53fd1749f5896adcdc2356", size = 201238, upload-time = "2026-04-02T09:27:18.229Z" },
{ url = "https://files.pythonhosted.org/packages/6a/29/f33daa50b06525a237451cdb6c69da366c381a3dadcd833fa5676bc468b3/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_ppc64le.whl", hash = "sha256:2d6eb928e13016cea4f1f21d1e10c1cebd5a421bc57ddf5b1142ae3f86824fab", size = 230189, upload-time = "2026-04-02T09:27:19.445Z" },
{ url = "https://files.pythonhosted.org/packages/b6/6e/52c84015394a6a0bdcd435210a7e944c5f94ea1055f5cc5d56c5fe368e7b/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_riscv64.whl", hash = "sha256:e74327fb75de8986940def6e8dee4f127cc9752bee7355bb323cc5b2659b6d46", size = 211352, upload-time = "2026-04-02T09:27:20.79Z" },
{ url = "https://files.pythonhosted.org/packages/8c/d7/4353be581b373033fb9198bf1da3cf8f09c1082561e8e922aa7b39bf9fe8/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_s390x.whl", hash = "sha256:d6038d37043bced98a66e68d3aa2b6a35505dc01328cd65217cefe82f25def44", size = 227024, upload-time = "2026-04-02T09:27:22.063Z" },
{ url = "https://files.pythonhosted.org/packages/30/45/99d18aa925bd1740098ccd3060e238e21115fffbfdcb8f3ece837d0ace6c/charset_normalizer-3.4.7-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:7579e913a5339fb8fa133f6bbcfd8e6749696206cf05acdbdca71a1b436d8e72", size = 217869, upload-time = "2026-04-02T09:27:23.486Z" },
{ url = "https://files.pythonhosted.org/packages/5c/05/5ee478aa53f4bb7996482153d4bfe1b89e0f087f0ab6b294fcf92d595873/charset_normalizer-3.4.7-cp314-cp314-win32.whl", hash = "sha256:5b77459df20e08151cd6f8b9ef8ef1f961ef73d85c21a555c7eed5b79410ec10", size = 148541, upload-time = "2026-04-02T09:27:25.146Z" },
{ url = "https://files.pythonhosted.org/packages/48/77/72dcb0921b2ce86420b2d79d454c7022bf5be40202a2a07906b9f2a35c97/charset_normalizer-3.4.7-cp314-cp314-win_amd64.whl", hash = "sha256:92a0a01ead5e668468e952e4238cccd7c537364eb7d851ab144ab6627dbbe12f", size = 159634, upload-time = "2026-04-02T09:27:26.642Z" },
{ url = "https://files.pythonhosted.org/packages/c6/a3/c2369911cd72f02386e4e340770f6e158c7980267da16af8f668217abaa0/charset_normalizer-3.4.7-cp314-cp314-win_arm64.whl", hash = "sha256:67f6279d125ca0046a7fd386d01b311c6363844deac3e5b069b514ba3e63c246", size = 148384, upload-time = "2026-04-02T09:27:28.271Z" },
{ url = "https://files.pythonhosted.org/packages/94/09/7e8a7f73d24dba1f0035fbbf014d2c36828fc1bf9c88f84093e57d315935/charset_normalizer-3.4.7-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:effc3f449787117233702311a1b7d8f59cba9ced946ba727bdc329ec69028e24", size = 330133, upload-time = "2026-04-02T09:27:29.474Z" },
{ url = "https://files.pythonhosted.org/packages/8d/da/96975ddb11f8e977f706f45cddd8540fd8242f71ecdb5d18a80723dcf62c/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:fbccdc05410c9ee21bbf16a35f4c1d16123dcdeb8a1d38f33654fa21d0234f79", size = 216257, upload-time = "2026-04-02T09:27:30.793Z" },
{ url = "https://files.pythonhosted.org/packages/e5/e8/1d63bf8ef2d388e95c64b2098f45f84758f6d102a087552da1485912637b/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_ppc64le.manylinux_2_17_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:733784b6d6def852c814bce5f318d25da2ee65dd4839a0718641c696e09a2960", size = 234851, upload-time = "2026-04-02T09:27:32.44Z" },
{ url = "https://files.pythonhosted.org/packages/9b/40/e5ff04233e70da2681fa43969ad6f66ca5611d7e669be0246c4c7aaf6dc8/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:a89c23ef8d2c6b27fd200a42aa4ac72786e7c60d40efdc76e6011260b6e949c4", size = 233393, upload-time = "2026-04-02T09:27:34.03Z" },
{ url = "https://files.pythonhosted.org/packages/be/c1/06c6c49d5a5450f76899992f1ee40b41d076aee9279b49cf9974d2f313d5/charset_normalizer-3.4.7-cp314-cp314t-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6c114670c45346afedc0d947faf3c7f701051d2518b943679c8ff88befe14f8e", size = 223251, upload-time = "2026-04-02T09:27:35.369Z" },
{ url = "https://files.pythonhosted.org/packages/2b/9f/f2ff16fb050946169e3e1f82134d107e5d4ae72647ec8a1b1446c148480f/charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_armv7l.whl", hash = "sha256:a180c5e59792af262bf263b21a3c49353f25945d8d9f70628e73de370d55e1e1", size = 206609, upload-time = "2026-04-02T09:27:36.661Z" },
{ url = "https://files.pythonhosted.org/packages/69/d5/a527c0cd8d64d2eab7459784fb4169a0ac76e5a6fc5237337982fd61347e/charset_normalizer-3.4.7-cp314-cp314t-manylinux_2_31_riscv64.manylinux_2_39_riscv64.whl", hash = "sha256:3c9a494bc5ec77d43cea229c4f6db1e4d8fe7e1bbffa8b6f0f0032430ff8ab44", size = 220014, upload-time = "2026-04-02T09:27:38.019Z" },
{ url = "https://files.pythonhosted.org/packages/7e/80/8a7b8104a3e203074dc9aa2c613d4b726c0e136bad1cc734594b02867972/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:8d828b6667a32a728a1ad1d93957cdf37489c57b97ae6c4de2860fa749b8fc1e", size = 218979, upload-time = "2026-04-02T09:27:39.37Z" },
{ url = "https://files.pythonhosted.org/packages/02/9a/b759b503d507f375b2b5c153e4d2ee0a75aa215b7f2489cf314f4541f2c0/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_armv7l.whl", hash = "sha256:cf1493cd8607bec4d8a7b9b004e699fcf8f9103a9284cc94962cb73d20f9d4a3", size = 209238, upload-time = "2026-04-02T09:27:40.722Z" },
{ url = "https://files.pythonhosted.org/packages/c2/4e/0f3f5d47b86bdb79256e7290b26ac847a2832d9a4033f7eb2cd4bcf4bb5b/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_ppc64le.whl", hash = "sha256:0c96c3b819b5c3e9e165495db84d41914d6894d55181d2d108cc1a69bfc9cce0", size = 236110, upload-time = "2026-04-02T09:27:42.33Z" },
{ url = "https://files.pythonhosted.org/packages/96/23/bce28734eb3ed2c91dcf93abeb8a5cf393a7b2749725030bb630e554fdd8/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_riscv64.whl", hash = "sha256:752a45dc4a6934060b3b0dab47e04edc3326575f82be64bc4fc293914566503e", size = 219824, upload-time = "2026-04-02T09:27:43.924Z" },
{ url = "https://files.pythonhosted.org/packages/2c/6f/6e897c6984cc4d41af319b077f2f600fc8214eb2fe2d6bcb79141b882400/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_s390x.whl", hash = "sha256:8778f0c7a52e56f75d12dae53ae320fae900a8b9b4164b981b9c5ce059cd1fcb", size = 233103, upload-time = "2026-04-02T09:27:45.348Z" },
{ url = "https://files.pythonhosted.org/packages/76/22/ef7bd0fe480a0ae9b656189ec00744b60933f68b4f42a7bb06589f6f576a/charset_normalizer-3.4.7-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:ce3412fbe1e31eb81ea42f4169ed94861c56e643189e1e75f0041f3fe7020abe", size = 225194, upload-time = "2026-04-02T09:27:46.706Z" },
{ url = "https://files.pythonhosted.org/packages/c5/a7/0e0ab3e0b5bc1219bd80a6a0d4d72ca74d9250cb2382b7c699c147e06017/charset_normalizer-3.4.7-cp314-cp314t-win32.whl", hash = "sha256:c03a41a8784091e67a39648f70c5f97b5b6a37f216896d44d2cdcb82615339a0", size = 159827, upload-time = "2026-04-02T09:27:48.053Z" },
{ url = "https://files.pythonhosted.org/packages/7a/1d/29d32e0fb40864b1f878c7f5a0b343ae676c6e2b271a2d55cc3a152391da/charset_normalizer-3.4.7-cp314-cp314t-win_amd64.whl", hash = "sha256:03853ed82eeebbce3c2abfdbc98c96dc205f32a79627688ac9a27370ea61a49c", size = 174168, upload-time = "2026-04-02T09:27:49.795Z" },
{ url = "https://files.pythonhosted.org/packages/de/32/d92444ad05c7a6e41fb2036749777c163baf7a0301a040cb672d6b2b1ae9/charset_normalizer-3.4.7-cp314-cp314t-win_arm64.whl", hash = "sha256:c35abb8bfff0185efac5878da64c45dafd2b37fb0383add1be155a763c1f083d", size = 153018, upload-time = "2026-04-02T09:27:51.116Z" },
{ url = "https://files.pythonhosted.org/packages/db/8f/61959034484a4a7c527811f4721e75d02d653a35afb0b6054474d8185d4c/charset_normalizer-3.4.7-py3-none-any.whl", hash = "sha256:3dce51d0f5e7951f8bb4900c257dad282f49190fdbebecd4ba99bcc41fef404d", size = 61958, upload-time = "2026-04-02T09:28:37.794Z" },
]
[[package]]
name = "click"
version = "8.3.2"
@ -408,22 +481,21 @@ wheels = [
[[package]]
name = "huggingface-hub"
version = "1.11.0"
version = "0.36.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock" },
{ name = "fsspec" },
{ name = "hf-xet", marker = "platform_machine == 'AMD64' or platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'arm64' or platform_machine == 'x86_64'" },
{ name = "httpx" },
{ name = "hf-xet", marker = "platform_machine == 'aarch64' or platform_machine == 'amd64' or platform_machine == 'arm64' or platform_machine == 'x86_64'" },
{ name = "packaging" },
{ name = "pyyaml" },
{ name = "requests" },
{ name = "tqdm" },
{ name = "typer" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/dc/89/e7aa12d8a6b9259bed10671abb25ae6fa437c0f88a86ecbf59617bae7759/huggingface_hub-1.11.0.tar.gz", hash = "sha256:15fb3713c7f9cdff7b808a94fd91664f661ab142796bb48c9cd9493e8d166278", size = 761749, upload-time = "2026-04-16T13:07:39.73Z" }
sdist = { url = "https://files.pythonhosted.org/packages/7c/b7/8cb61d2eece5fb05a83271da168186721c450eb74e3c31f7ef3169fa475b/huggingface_hub-0.36.2.tar.gz", hash = "sha256:1934304d2fb224f8afa3b87007d58501acfda9215b334eed53072dd5e815ff7a", size = 649782, upload-time = "2026-02-06T09:24:13.098Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/37/02/4f3f8997d1ea7fe0146b343e5e14bd065fa87af790d07e5576d31b31cc18/huggingface_hub-1.11.0-py3-none-any.whl", hash = "sha256:42a6de0afbfeb5e022222d36398f029679db4eb4778801aafda32257ae9131ab", size = 645499, upload-time = "2026-04-16T13:07:37.716Z" },
{ url = "https://files.pythonhosted.org/packages/a8/af/48ac8483240de756d2438c380746e7130d1c6f75802ef22f3c6d49982787/huggingface_hub-0.36.2-py3-none-any.whl", hash = "sha256:48f0c8eac16145dfce371e9d2d7772854a4f591bcb56c9cf548accf531d54270", size = 566395, upload-time = "2026-02-06T09:24:11.133Z" },
]
[[package]]
@ -494,8 +566,8 @@ requires-dist = [
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8" },
{ name = "sqlalchemy", extras = ["asyncio"], specifier = ">=2.0.36" },
{ name = "surya-ocr", marker = "extra == 'ocr'", specifier = ">=0.9" },
{ name = "torch", marker = "extra == 'ocr'", specifier = ">=2.4" },
{ name = "surya-ocr", marker = "extra == 'ocr'", specifier = ">=0.17,<0.18" },
{ name = "torch", marker = "extra == 'ocr'", specifier = ">=2.7" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.32" },
]
provides-extras = ["ocr", "dev"]
@ -593,18 +665,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/68/a5/19d7aaa7e433713ffe881df33705925a196afb9532efc8475d26593921a6/mako-1.3.11-py3-none-any.whl", hash = "sha256:e372c6e333cf004aa736a15f425087ec977e1fcbd2966aae7f17c8dc1da27a77", size = 78503, upload-time = "2026-04-14T20:19:53.233Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5b/f5/4ec618ed16cc4f8fb3b701563655a69816155e79e24a17b651541804721d/markdown_it_py-4.0.0.tar.gz", hash = "sha256:cb0a2b4aa34f932c007117b194e945bd74e0ec24133ceb5bac59009cda1cb9f3", size = 73070, upload-time = "2025-08-11T12:57:52.854Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/94/54/e7d793b573f298e1c9013b8c4dade17d481164aa517d1d7148619c2cedbf/markdown_it_py-4.0.0-py3-none-any.whl", hash = "sha256:87327c59b172c5011896038353a81343b6754500a08cd7a4973bb48c6d578147", size = 87321, upload-time = "2025-08-11T12:57:51.923Z" },
]
[[package]]
name = "markupsafe"
version = "3.0.3"
@ -668,15 +728,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/70/bc/6f1c2f612465f5fa89b95bead1f44dcb607670fd42891d8fdcd5d039f4f4/markupsafe-3.0.3-cp314-cp314t-win_arm64.whl", hash = "sha256:32001d6a8fc98c8cb5c947787c5d08b0a50663d139f1305bac5885d98d9b40fa", size = 14146, upload-time = "2025-09-27T18:37:28.327Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "mpmath"
version = "1.3.0"
@ -1434,16 +1485,18 @@ wheels = [
]
[[package]]
name = "rich"
version = "15.0.0"
name = "requests"
version = "2.33.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
{ name = "certifi" },
{ name = "charset-normalizer" },
{ name = "idna" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5f/a4/98b9c7c6428a668bf7e42ebb7c79d576a1c3c1e3ae2d47e674b468388871/requests-2.33.1.tar.gz", hash = "sha256:18817f8c57c6263968bc123d237e3b8b08ac046f5456bd1e307ee8f4250d3517", size = 134120, upload-time = "2026-03-30T16:09:15.531Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" },
{ url = "https://files.pythonhosted.org/packages/d7/8e/7540e8a2036f79a125c1d2ebadf69ed7901608859186c856fa0388ef4197/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a", size = 64947, upload-time = "2026-03-30T16:09:13.83Z" },
]
[[package]]
@ -1502,15 +1555,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/e3/c164c88b2e5ce7b24d667b9bd83589cf4f3520d97cad01534cd3c4f55fdb/setuptools-81.0.0-py3-none-any.whl", hash = "sha256:fdd925d5c5d9f62e4b74b30d6dd7828ce236fd6ed998a08d81de62ce5a6310d6", size = 1062021, upload-time = "2026-02-06T21:10:37.175Z" },
]
[[package]]
name = "shellingham"
version = "1.5.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/58/15/8b3609fd3830ef7b27b655beb4b4e9c62313a4e8da8c676e142cc210d58e/shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de", size = 10310, upload-time = "2023-10-24T04:13:40.426Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e0/f9/0595336914c5619e5f28a1fb793285925a8cd4b432c9da0a987836c7f822/shellingham-1.5.4-py2.py3-none-any.whl", hash = "sha256:7ecfff8f2fd72616f7481040475a65b2bf8af90a56c89140852d1120324e8686", size = 9755, upload-time = "2023-10-24T04:13:38.866Z" },
]
[[package]]
name = "six"
version = "1.17.0"
@ -1703,22 +1747,23 @@ wheels = [
[[package]]
name = "transformers"
version = "5.5.4"
version = "4.57.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "filelock" },
{ name = "huggingface-hub" },
{ name = "numpy" },
{ name = "packaging" },
{ name = "pyyaml" },
{ name = "regex" },
{ name = "requests" },
{ name = "safetensors" },
{ name = "tokenizers" },
{ name = "tqdm" },
{ name = "typer" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a5/1e/1e244ab2ab50a863e6b52cc55761910567fa532b69a6740f6e99c5fdbd98/transformers-5.5.4.tar.gz", hash = "sha256:2e67cadba81fc7608cc07c4dd54f524820bc3d95b1cabd0ef3db7733c4f8b82e", size = 8227649, upload-time = "2026-04-13T16:55:55.181Z" }
sdist = { url = "https://files.pythonhosted.org/packages/c4/35/67252acc1b929dc88b6602e8c4a982e64f31e733b804c14bc24b47da35e6/transformers-4.57.6.tar.gz", hash = "sha256:55e44126ece9dc0a291521b7e5492b572e6ef2766338a610b9ab5afbb70689d3", size = 10134912, upload-time = "2026-01-16T10:38:39.284Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/29/fb/162a66789c65e5afa3b051309240c26bf37fbc8fea285b4546ae747995a2/transformers-5.5.4-py3-none-any.whl", hash = "sha256:0bd6281b82966fe5a7a16f553ea517a9db1dee6284d7cb224dfd88fc0dd1c167", size = 10236696, upload-time = "2026-04-13T16:55:51.497Z" },
{ url = "https://files.pythonhosted.org/packages/03/b8/e484ef633af3887baeeb4b6ad12743363af7cce68ae51e938e00aaa0529d/transformers-4.57.6-py3-none-any.whl", hash = "sha256:4c9e9de11333ddfe5114bc872c9f370509198acf0b87a832a0ab9458e2bd0550", size = 11993498, upload-time = "2026-01-16T10:38:31.289Z" },
]
[[package]]
@ -1738,21 +1783,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/56/6113c23ff46c00aae423333eb58b3e60bdfe9179d542781955a5e1514cb3/triton-3.6.0-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:46bd1c1af4b6704e554cad2eeb3b0a6513a980d470ccfa63189737340c7746a7", size = 188397994, upload-time = "2026-01-20T16:01:14.236Z" },
]
[[package]]
name = "typer"
version = "0.24.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "annotated-doc" },
{ name = "click" },
{ name = "rich" },
{ name = "shellingham" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f5/24/cb09efec5cc954f7f9b930bf8279447d24618bb6758d4f6adf2574c41780/typer-0.24.1.tar.gz", hash = "sha256:e39b4732d65fbdcde189ae76cf7cd48aeae72919dea1fdfc16593be016256b45", size = 118613, upload-time = "2026-02-21T16:54:40.609Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/4a/91/48db081e7a63bb37284f9fbcefda7c44c277b18b0e13fbc36ea2335b71e6/typer-0.24.1-py3-none-any.whl", hash = "sha256:112c1f0ce578bfb4cab9ffdabc68f031416ebcc216536611ba21f04e9aa84c9e", size = 56085, upload-time = "2026-02-21T16:54:41.616Z" },
]
[[package]]
name = "typing-extensions"
version = "4.15.0"
@ -1774,6 +1804,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" },
]
[[package]]
name = "urllib3"
version = "2.6.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
]
[[package]]
name = "uvicorn"
version = "0.44.0"