# InfoXtractor MVP — Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Ship an on-prem, async, LLM-powered structured-extraction microservice with one end-to-end use case (`bank_statement_header`) reachable from mammon via REST. **Architecture:** FastAPI container + single asyncio worker + shared postgis job store. Transport-agnostic pipeline core with pluggable `OCRClient` (Surya) and `GenAIClient` (Ollama). Provenance-based reliability signals per field. No cloud services. **Tech Stack:** Python 3.12, FastAPI, uvicorn, SQLAlchemy 2.0 async, asyncpg, Alembic, Pydantic v2, pydantic-settings, httpx, PyMuPDF, python-magic, Pillow, surya-ocr (CUDA), Ollama REST API, pytest (+ pytest-asyncio, pytest-postgresql fixture), uv for dep management. **Spec reference:** `docs/superpowers/specs/2026-04-18-ix-mvp-design.md` (approved 2026-04-18 after 2 review rounds). **Habits** (inherited from mammon; see `AGENTS.md`): - Every task lands as its own feature branch: `feat/`. - TDD: write failing test → minimal implementation → green → refactor → commit. - Every commit updates code + tests + docs (`AGENTS.md`, `README.md`, section in this plan) in one shot. - Push: `git push forgejo feat/` → PR via Forgejo API → wait for CI green → merge → `git push server main` to deploy. - Never skip hooks, never force-push `main`, never amend merged commits. - After each deploy, run `scripts/e2e_smoke.py` against the live service. --- ## File structure (target) ``` infoxtractor/ ├── AGENTS.md # Guiding principles, habits, stack, deploy ├── README.md # One-paragraph + pointers ├── Dockerfile # nvidia/cuda:12.4 + python 3.12 + surya ├── docker-compose.yml # GPU reservation, env_file, monitoring labels ├── pyproject.toml # deps via uv; no setup.py ├── uv.lock ├── alembic.ini ├── alembic/ │ ├── env.py │ └── versions/ │ └── 001_initial_ix_jobs.py ├── src/ix/ │ ├── __init__.py │ ├── app.py # FastAPI create_app factory + lifespan │ ├── config.py # AppConfig (pydantic-settings) │ ├── errors.py # IXException + IX_* error codes │ ├── logging.py # JSON formatter, ix_id context │ ├── contracts/ # Pydantic data contracts │ │ ├── __init__.py │ │ ├── request.py # RequestIX, Context, FileRef, Options, *Options │ │ ├── response.py # ResponseIX, IXResult, OCRResult, OCRDetails, Page, Line, Metadata │ │ ├── provenance.py # ProvenanceData, FieldProvenance, ExtractionSource, BoundingBox, SegmentCitation │ │ └── job.py # Job envelope (status enum, lifecycle) │ ├── use_cases/ │ │ ├── __init__.py # REGISTRY │ │ └── bank_statement_header.py │ ├── pipeline/ │ │ ├── __init__.py │ │ ├── step.py # Step ABC │ │ ├── pipeline.py # Pipeline orchestrator + Timer │ │ ├── setup_step.py │ │ ├── ocr_step.py │ │ ├── genai_step.py │ │ ├── reliability_step.py │ │ └── response_handler_step.py │ ├── segmentation/ │ │ ├── __init__.py │ │ └── segment_index.py # SegmentIndex + prompt formatting │ ├── provenance/ │ │ ├── __init__.py │ │ ├── mapper.py # map_segment_refs_to_provenance │ │ ├── normalize.py # normalizers (string, number, date, iban) │ │ └── verify.py # provenance_verified + text_agreement logic │ ├── ocr/ │ │ ├── __init__.py │ │ ├── client.py # OCRClient Protocol │ │ ├── fake.py # FakeOCRClient (for tests) │ │ └── surya_client.py # SuryaOCRClient (real) │ ├── genai/ │ │ ├── __init__.py │ │ ├── client.py # GenAIClient Protocol, GenAIInvocationResult, GenAIUsage │ │ ├── fake.py # FakeGenAIClient (for tests) │ │ └── ollama_client.py # OllamaClient (real) │ ├── ingestion/ │ │ ├── __init__.py │ │ ├── fetch.py # file downloader w/ auth headers, timeouts, size cap │ │ ├── mime.py # python-magic wrapper │ │ └── pages.py # DocumentIngestor: PDF/image/text → Page list │ ├── store/ │ │ ├── __init__.py │ │ ├── models.py # SQLAlchemy ORM for ix_jobs │ │ ├── engine.py # lazy async engine, session factory │ │ └── jobs_repo.py # claim_next, insert, get, list_by_correlation, sweep_orphans, update │ ├── worker/ │ │ ├── __init__.py │ │ ├── loop.py # worker task: claim → run pipeline → deliver callback │ │ └── callback.py # one-shot webhook delivery │ ├── adapters/ │ │ ├── __init__.py │ │ ├── rest/ │ │ │ ├── __init__.py │ │ │ ├── routes.py # /jobs, /jobs/{id}, /jobs (list), /healthz, /metrics │ │ │ └── schemas.py # request/response bodies │ │ └── pg_queue/ │ │ ├── __init__.py │ │ └── listener.py # LISTEN ix_jobs_new + 10s fallback poll │ └── metrics/ │ ├── __init__.py │ └── counters.py # plain-JSON counter queries ├── tests/ │ ├── __init__.py │ ├── conftest.py │ ├── fixtures/ │ │ ├── synthetic_giro.pdf # generated from template │ │ └── ocr_canned/ # canned Surya outputs for integration tests │ ├── unit/ │ │ ├── test_contracts.py │ │ ├── test_errors.py │ │ ├── test_segment_index.py │ │ ├── test_provenance_normalize.py │ │ ├── test_provenance_verify.py │ │ ├── test_provenance_mapper.py │ │ ├── test_setup_step.py │ │ ├── test_ocr_step.py │ │ ├── test_genai_step.py │ │ ├── test_reliability_step.py │ │ ├── test_response_handler_step.py │ │ ├── test_pipeline.py │ │ ├── test_use_case_registry.py │ │ ├── test_ingestion_fetch.py │ │ ├── test_ingestion_pages.py │ │ └── test_use_case_bank_statement_header.py │ ├── integration/ │ │ ├── test_jobs_repo.py │ │ ├── test_rest_adapter.py │ │ ├── test_pg_queue_adapter.py │ │ ├── test_worker_loop.py │ │ └── test_pipeline_end_to_end.py # fakes, real DB │ └── live/ │ └── test_ollama_surya_smoke.py # gated on IX_TEST_OLLAMA=1 ├── scripts/ │ ├── e2e_smoke.py # post-deploy gate (Mac → :8994) │ ├── create_fixture_pdf.py # builds synthetic_giro.pdf deterministically │ └── forgejo_pr.py # wrapper: create branch → PR → merge ├── .env.example ├── .gitignore └── docs/ ├── spec-core-pipeline.md └── superpowers/ ├── specs/2026-04-18-ix-mvp-design.md └── plans/2026-04-18-ix-mvp-implementation.md (this file) ``` **Boundary rules:** - `pipeline/` knows about `contracts/`, `segmentation/`, `provenance/`, `ocr.client`, `genai.client`. NOT `store/`, `adapters/`, `worker/`. - `adapters/` knows about `store/` and `contracts/`, NOT `pipeline/` directly (it hands requests to the job store; the worker pulls from the store). - `worker/` knows about `store/`, `pipeline/`, `contracts/`. Bridges the two. - `store/` knows only about `contracts/` (for JSONB serialization). - `ocr/surya_client.py` and `genai/ollama_client.py` are the only files that import external libraries beyond stdlib/FastAPI/SQLAlchemy — all other modules stay hermetic. --- ## Chunk 1: Foundation (scaffolding + contracts + use case + SegmentIndex) **Purpose:** Set up the project skeleton and land the data contracts, error model, use-case registry with the first use case, and SegmentIndex. No pipeline, no transport, no LLM — just the Pydantic/types core plus the scaffolding to run tests and CI. ### Task 1.1: Project scaffolding **Branch:** `feat/scaffold` **Files:** - Create: `pyproject.toml`, `.python-version`, `uv.lock`, `.env.example` - Create: `src/ix/__init__.py` - Create: `tests/__init__.py`, `tests/conftest.py` - Create: `.forgejo/workflows/ci.yml` (pytest on push + PR) - Create: `pytest.ini` with asyncio_mode=auto - [ ] Write `pyproject.toml` with deps: `fastapi`, `uvicorn[standard]`, `sqlalchemy[asyncio]>=2`, `asyncpg`, `alembic`, `pydantic>=2`, `pydantic-settings`, `httpx`, `pymupdf`, `python-magic`, `pillow`, `python-dateutil`. Dev: `pytest`, `pytest-asyncio`, `pytest-httpx`, `ruff`, `mypy`. - [ ] `.env.example`: every var from spec §9, all placeholders obvious (``, ``). - [ ] `pytest.ini`: `asyncio_mode = auto`, collect from `tests/`. - [ ] `.forgejo/workflows/ci.yml`: runs `uv sync && uv run pytest tests/unit tests/integration -v`; service container `postgres:16`; env `IX_POSTGRES_URL` points at service. Excludes `tests/live/` (real Ollama). - [ ] Commit, push branch, create PR, merge, deploy — no deploy needed yet (nothing to deploy). Stop at merge. ### Task 1.2: Error model **Branch:** `feat/errors` **Files:** - Create: `src/ix/errors.py` - Create: `tests/unit/test_errors.py` - [ ] Write failing tests for `IXException` and every `IX_*` code in spec §8. Each code is a class attribute: `IXError.IX_000_000 = "IX_000_000: request_ix is None"`. Exception carries `code` + `detail`. - [ ] Implement enum-like class with `__str__` producing `"IX_000_000: message (detail=...)"`. - [ ] Green, commit, PR, merge. ### Task 1.3: Data contracts — RequestIX / Options / Context / FileRef **Branch:** `feat/contracts-request` **Files:** - Create: `src/ix/contracts/request.py` - Create: `src/ix/contracts/__init__.py` - Create: `tests/unit/test_contracts.py` - [ ] Failing tests: round-trip `RequestIX.model_validate_json(...)` with each shape in spec §3. String-or-FileRef union. Defaults: `include_provenance=True`, `service="surya"`, etc. Validation errors on unknown fields. - [ ] Pydantic models per spec §3. `Options`, `OCROptions`, `GenAIOptions`, `ProvenanceOptions`, `Context`, `FileRef`, `RequestIX`. - [ ] Green, commit, PR, merge. ### Task 1.4: Data contracts — ResponseIX / Provenance / IXResult / OCRResult / Metadata / Job **Branch:** `feat/contracts-response` **Files:** - Create: `src/ix/contracts/response.py` - Create: `src/ix/contracts/provenance.py` - Create: `src/ix/contracts/job.py` - Modify: `src/ix/contracts/__init__.py` (export) - Modify: `tests/unit/test_contracts.py` - [ ] Failing tests: - `FieldProvenance` with new `provenance_verified` / `text_agreement` fields; `None`-allowed. - `quality_metrics` keys: `fields_with_provenance`, `total_fields`, `coverage_rate`, `invalid_references`, `verified_fields`, `text_agreement_fields`. - `ResponseIX.context` excluded from `model_dump` (use `exclude`). - `Job` envelope, `status` is a Literal, `callback_status` starts as `None`. - [ ] Implement per spec §3 + §9.3. `ResponseIX.context` uses `Field(exclude=True)`; a sibling internal model `_InternalContext` holds `pages`, `files`, `texts`, `use_case_request`, `use_case_response`, `segment_index`. Keep it simple: one class, `Field(exclude=True)` on the attribute. - [ ] Green, commit, PR, merge. ### Task 1.5: Use-case registry and first use case **Branch:** `feat/use-case-bank-statement-header` **Files:** - Create: `src/ix/use_cases/__init__.py` (REGISTRY) - Create: `src/ix/use_cases/bank_statement_header.py` - Create: `tests/unit/test_use_case_registry.py` - Create: `tests/unit/test_use_case_bank_statement_header.py` - [ ] Failing tests: `REGISTRY["bank_statement_header"]` returns `(Request, BankStatementHeader)`; unknown name raises `IX_001_001`; `Request().system_prompt` contains "extract header metadata" substring. - [ ] Implement per spec §7. Pydantic models. Register on import of the module (side-effect registration, or explicit registry assembly in `__init__.py`). Prefer explicit — `REGISTRY = {"bank_statement_header": (Request, BankStatementHeader)}` — no import-time side effects. - [ ] Green, commit, PR, merge. ### Task 1.6: SegmentIndex **Branch:** `feat/segment-index` **Files:** - Create: `src/ix/segmentation/__init__.py` - Create: `src/ix/segmentation/segment_index.py` - Create: `tests/unit/test_segment_index.py` - [ ] Failing tests (take from spec §9.1): - `build()` assigns IDs `p1_l0`, `p1_l1`, … across the flat page list. - `` tag lines are excluded from IDs. - `lookup_segment("p1_l0")` returns `{page, bbox, text, file_index}`; unknown → `None`. - `to_prompt_text()` emits `"[p1_l0] text\n…"` and appends raw `context.texts` untagged at the end. - BoundingBox normalization divides by page width/height. - [ ] Implement. `SegmentIndex` is built from an `OCRResult` + `pages` metadata; holds `_id_to_position: dict[str, dict]` and `_ordered_ids: list[str]`. - [ ] Green, commit, PR, merge. ### Task 1.7: Provenance normalizers **Branch:** `feat/provenance-normalize` **Files:** - Create: `src/ix/provenance/__init__.py` - Create: `src/ix/provenance/normalize.py` - Create: `tests/unit/test_provenance_normalize.py` - [ ] Failing tests for each normalizer in spec §6 ReliabilityStep: - String: `" FOO bar!!! "` → `"foo bar"` (after NFKC + casefold + whitespace collapse + punctuation strip). - Number: `"CHF 1'234.56"` ↔ `Decimal("1234.56")` → same canonical form. - Date: `"31.03.2026"` ↔ `date(2026,3,31)` → `"2026-03-31"` via `dateutil(dayfirst=True)`. - IBAN: `"de 89 3704 0044 0532 0130 00"` → `"DE89370400440532013000"`. - Short-value rule: `_should_skip_text_agreement("0", field_type=int)` → `True`; `"AB"` for str → `True`. - [ ] Implement. Pure functions, no external state, fully unit-testable. - [ ] Green, commit, PR, merge. ### Task 1.8: Provenance mapper + verifier **Branch:** `feat/provenance-mapper-verifier` **Files:** - Create: `src/ix/provenance/mapper.py` (map_segment_refs_to_provenance per spec §9.4) - Create: `src/ix/provenance/verify.py` (verify_field_value + text_agreement_for_field) - Create: `tests/unit/test_provenance_mapper.py` - Create: `tests/unit/test_provenance_verify.py` - [ ] Failing tests for mapper: given fake `SegmentIndex` + fake `segment_citations` → correct `FieldProvenance.sources`; invalid_references count; value resolution via dot-path (`"result.invoice_number"`, `"items.0.name"`); `max_sources_per_field` cap. - [ ] Failing tests for verifier: `provenance_verified` true/false per field type; `text_agreement` with and without `context.texts`; Literal → `None`; None value → `None`; short value → `text_agreement` `None`; date parses both sides. - [ ] Implement; pure functions. - [ ] Green, commit, PR, merge. **Chunk 1 end state:** `pytest tests/unit` runs green locally and in Forgejo Actions. No runtime service yet. ~8 merged PRs to main. Time estimate: one focused afternoon. --- ## Chunk 2: Pipeline core **Purpose:** Wire up the Step ABC + Pipeline orchestrator + all five steps + fake OCR/GenAI clients. At end of chunk, pipeline runs end-to-end with fakes and produces a full `ResponseIX` for `bank_statement_header`, entirely hermetic. ### Task 2.1: Step ABC + Pipeline orchestrator + Timer **Branch:** `feat/pipeline-core` **Files:** - Create: `src/ix/pipeline/__init__.py` - Create: `src/ix/pipeline/step.py` - Create: `src/ix/pipeline/pipeline.py` - Create: `tests/unit/test_pipeline.py` - [ ] Failing tests using synthetic steps: order preserved; `validate=False` skips step; `validate` raise → error written + abort; `process` raise → error written + abort; each step's elapsed seconds added to `metadata.timings`. - [ ] Implement per spec §3/§4. `Pipeline(steps=[...])`. `_execute_step` wraps in timer + try/except, sets `response_ix.error` on raise. - [ ] Green, commit, PR, merge. ### Task 2.2: OCRClient and GenAIClient protocols + fakes **Branch:** `feat/client-protocols` **Files:** - Create: `src/ix/ocr/__init__.py`, `src/ix/ocr/client.py`, `src/ix/ocr/fake.py` - Create: `src/ix/genai/__init__.py`, `src/ix/genai/client.py`, `src/ix/genai/fake.py` - Create: `tests/unit/test_ocr_fake.py`, `tests/unit/test_genai_fake.py` - [ ] Failing tests: `FakeOCRClient(canned=OCRResult(...))` returns the canned result; `FakeGenAIClient(parsed=MyModel(...))` returns a `GenAIInvocationResult` with that parsed instance + stubbed usage. - [ ] Implement Protocols + fakes. Protocols are `@runtime_checkable`. - [ ] Green, commit, PR, merge. ### Task 2.3: Ingestion — fetch + MIME + pages **Branch:** `feat/ingestion` **Files:** - Create: `src/ix/ingestion/__init__.py` - Create: `src/ix/ingestion/fetch.py` - Create: `src/ix/ingestion/mime.py` - Create: `src/ix/ingestion/pages.py` - Create: `tests/unit/test_ingestion_fetch.py` (pytest-httpx mocks) - Create: `tests/unit/test_ingestion_pages.py` (fixture PDFs/images) - [ ] Failing tests: - `fetch_file(FileRef, …)` passes headers; size cap raises `IX_000_007`; timeout raises `IX_000_007`; non-2xx raises `IX_000_007`. - `detect_mime(bytes)` classifies PDF/PNG/JPEG/TIFF correctly; unknown raises `IX_000_005`. - `DocumentIngestor.build_pages(files, texts)`: PDF with 3 pages → 3 `Page` objects with `page_no`/`width`/`height`; multi-frame TIFF → multiple Pages; plain text entry → one Page; >100 PDF pages raises `IX_000_006`. - [ ] Implement. `fetch_file` uses httpx AsyncClient with timeouts from config, `stream=True` to enforce size cap incrementally. `DocumentIngestor` uses PyMuPDF for PDFs, PIL for images. - [ ] Green, commit, PR, merge. ### Task 2.4: SetupStep **Branch:** `feat/step-setup` **Files:** - Create: `src/ix/pipeline/setup_step.py` - Create: `tests/unit/test_setup_step.py` - [ ] Failing tests: - `validate` raises `IX_000_000` if request None; `IX_000_002` if no files+no texts. - `process` downloads files (pytest-httpx mocks), assembles `response_ix.context.pages`, loads use case; unknown use case → `IX_001_001`. - [ ] Implement per spec §6. Use `ingestion.fetch_file` + `DocumentIngestor`. - [ ] Green, commit, PR, merge. ### Task 2.5: OCRStep **Branch:** `feat/step-ocr` **Files:** - Create: `src/ix/pipeline/ocr_step.py` - Create: `tests/unit/test_ocr_step.py` - [ ] Failing tests: - `validate` raises `IX_000_004` when geometries/text/ocr_only set but no files. - `validate` returns `False` for pure-text requests. - `process` runs `FakeOCRClient`, injects page tags, builds `SegmentIndex` when provenance on. - [ ] Implement per spec §6. - [ ] Green, commit, PR, merge. ### Task 2.6: GenAIStep **Branch:** `feat/step-genai` **Files:** - Create: `src/ix/pipeline/genai_step.py` - Create: `tests/unit/test_genai_step.py` - [ ] Failing tests: - System prompt concat with citation instruction when provenance on. - Text content format: `[p1_l0] foo\n[p1_l1] bar`. - Response schema wrapped in `ProvenanceWrappedResponse` when provenance on. - `FakeGenAIClient` returns parsed result → written to `ix_result.result`. - `IX_002_000` / `IX_002_001` surfaced on client raise. - Provenance mapping produces `ProvenanceData` with the expected field paths. - [ ] Implement per spec §6. Use `provenance.mapper.map_segment_refs_to_provenance`. - [ ] Green, commit, PR, merge. ### Task 2.7: ReliabilityStep **Branch:** `feat/step-reliability` **Files:** - Create: `src/ix/pipeline/reliability_step.py` - Create: `tests/unit/test_reliability_step.py` - [ ] Failing tests: - Skipped when `include_provenance=False`. - Per-type dispatch: Literal field → `None` flags; None value → `None` flags; short value → `text_agreement=None`. - Dates parse both sides before comparison. - Counters `verified_fields` and `text_agreement_fields` written. - Tests using `BankStatementHeader` + concrete `ProvenanceData` → exact flag values. - [ ] Implement using `provenance.verify` + `provenance.normalize`; dispatch via `type hints` on the use-case response schema (introspect via `get_type_hints`). - [ ] Green, commit, PR, merge. ### Task 2.8: ResponseHandlerStep **Branch:** `feat/step-response-handler` **Files:** - Create: `src/ix/pipeline/response_handler_step.py` - Create: `tests/unit/test_response_handler_step.py` - [ ] Failing tests per spec §8: attach OCR text; strip geometries when not requested; delete `context`. - [ ] Implement. - [ ] Green, commit, PR, merge. ### Task 2.9: End-to-end pipeline test with fakes **Branch:** `feat/pipeline-e2e-fakes` **Files:** - Create: `tests/unit/test_pipeline_end_to_end.py` - Create: `tests/fixtures/synthetic_giro.pdf` (generated) - Create: `scripts/create_fixture_pdf.py` - [ ] `scripts/create_fixture_pdf.py` builds a deterministic PDF with known header fields (bank name, IBAN, period, balances) using reportlab or PyMuPDF. Script re-runs on demand; output is committed. - [ ] Failing test: feed the fixture + canned OCR + canned LLM response through the full `Pipeline([Setup, OCR, GenAI, Reliability, ResponseHandler])` and assert `response_ix.ix_result.result == expected`, `provenance_verified[closing_balance] is True`, timings populated. - [ ] Implement — only wiring; no new logic. - [ ] Green, commit, PR, merge. **Chunk 2 end state:** Full pipeline runs in tests with fakes. No DB, no transport, no real clients. Running `pytest tests/unit -v` goes green end-to-end. ~9 merged PRs. --- ## Chunk 3: Job store + REST adapter + worker loop **Purpose:** Persist jobs in Postgres, expose REST endpoints, run the worker task in the FastAPI lifespan, deliver callbacks. At chunk end, the container (locally or on the server) accepts `POST /jobs`, runs the fake-backed pipeline against a real DB, and returns results via polling or callback. ### Task 3.1: Alembic scaffolding + initial migration **Branch:** `feat/alembic-init` **Files:** - Create: `alembic.ini`, `alembic/env.py`, `alembic/versions/001_initial_ix_jobs.py` - Create: `src/ix/store/__init__.py`, `src/ix/store/models.py`, `src/ix/store/engine.py` - [ ] `alembic/env.py`: async engine, `NullPool`, reads `IX_POSTGRES_URL`. - [ ] `models.py`: `IxJob` ORM mapping to `ix_jobs` table per spec §4. - [ ] `001_initial_ix_jobs.py`: CREATE TABLE + indexes (including `UNIQUE` on `(client_id, request_id)`). No NOTIFY trigger (NOTIFY is fired by writers, not DDL). - [ ] Smoke test: `alembic upgrade head` against a disposable postgres (via docker) creates the table and indexes. No unit test here — verified in integration tests. - [ ] Commit, PR, merge. ### Task 3.2: Config module (AppConfig) **Branch:** `feat/config` **Files:** - Create: `src/ix/config.py` - Create: `tests/unit/test_config.py` - [ ] Failing tests: every env var in spec §9 loads from `IX_*` env; defaults match spec. - [ ] Implement `AppConfig` via `pydantic-settings`; no `.env` auto-load in tests — use `monkeypatch`. - [ ] Green, commit, PR, merge. ### Task 3.3: JobsRepo (store CRUD) **Branch:** `feat/jobs-repo` **Files:** - Create: `src/ix/store/jobs_repo.py` - Create: `tests/integration/test_jobs_repo.py` - Create: `tests/conftest.py` fixtures — `postgres_url`, `engine`, `session_factory` (Forgejo CI service container). - [ ] Failing tests (integration, need real DB): - `insert_pending(request, callback_url)` creates row; returns `job_id` + `ix_id`. - Inserting with existing `(client_id, request_id)` returns the *existing* `job_id` (idempotency), status unchanged. - `claim_next_pending()` → locks a pending row and updates to `running`; returns `None` if none available; concurrent callers each claim distinct rows (SKIP LOCKED). - `get(job_id)` returns the full `Job` with nested request/response parsed. - `mark_done(job_id, response)` / `mark_error(job_id, response_with_error)` / `update_callback_status(...)`. - `sweep_orphans(now, max_running_age)` → returns list of rescued job IDs; their status goes back to `pending`, `attempts++`. - [ ] Implement using SQLAlchemy 2.0 async. Each method is a single transaction. - [ ] Green, commit, PR, merge. ### Task 3.4: FastAPI app + REST routes **Branch:** `feat/rest-adapter` **Files:** - Create: `src/ix/app.py` - Create: `src/ix/adapters/__init__.py`, `src/ix/adapters/rest/__init__.py`, `src/ix/adapters/rest/routes.py`, `src/ix/adapters/rest/schemas.py` - Create: `tests/integration/test_rest_adapter.py` - [ ] Failing tests (integration, FastAPI TestClient + real DB): - `POST /jobs` with valid body → 201, returns `{job_id, ix_id, status: "pending"}`. - `POST /jobs` idempotent on `(client_id, request_id)` — second call returns same `job_id` with 200. - `GET /jobs/{id}` returns the `Job` shape; 404 on unknown. - `GET /jobs?client_id=…&request_id=…` returns the row or 404. - `GET /healthz` returns JSON with `postgres`/`ollama`/`ocr` keys. In tests, `ollama` and `ocr` are mocked via dependency-injection hook. - `GET /metrics` returns 24h counters. - [ ] Implement `create_app()` factory. Lifespan: create engine, run `alembic upgrade head`, spawn worker task (Chunk 3.5), tear down on shutdown. - [ ] Green, commit, PR, merge. ### Task 3.5: Worker loop + callback delivery **Branch:** `feat/worker-loop` **Files:** - Create: `src/ix/worker/__init__.py`, `src/ix/worker/loop.py`, `src/ix/worker/callback.py` - Modify: `src/ix/app.py` (lifespan spawns worker task) - Create: `tests/integration/test_worker_loop.py` - [ ] Failing tests (integration): - Worker claims a pending job, runs a fake pipeline, writes response, updates status to `done`. - On pipeline exception: status → `error`, response carries the error code. - On `callback_url` set and 200 response: `callback_status` → `delivered`. - On callback 500 or timeout: `callback_status` → `failed`; `status` stays `done`/`error`. - Worker startup orphan sweep: job left in `running` with `started_at < now - 2 * per_job_timeout` → reset to `pending`, attempts++. - [ ] Implement. Worker pipeline factory injected — tests pass a stub; production wiring builds the real `Pipeline` with `FakeOCRClient` / `FakeGenAIClient` for now (Chunk 4 swaps them). - [ ] Green, commit, PR, merge. ### Task 3.6: Postgres queue adapter **Branch:** `feat/pg-queue-adapter` **Files:** - Create: `src/ix/adapters/pg_queue/__init__.py`, `src/ix/adapters/pg_queue/listener.py` - Modify: `src/ix/app.py` (lifespan spawns listener task if enabled) - Create: `tests/integration/test_pg_queue_adapter.py` - [ ] Failing tests (integration): - Caller inserts a row directly and `NOTIFY ix_jobs_new, ''` → worker picks it up within 1 s. - Missed NOTIFY (e.g., listener not started yet) → 10 s fallback poll finds the row. - [ ] Implement. `listener.py` opens a dedicated asyncpg connection (outside the SQLAlchemy pool) to run `LISTEN`; on notify, emits an asyncio event the worker `wait_for_notify_or_poll(10s)` reacts to. - [ ] Green, commit, PR, merge. **Chunk 3 end state:** FastAPI container serves the REST API, backed by a real Postgres. Pipeline still uses fakes under the hood (real Surya + Ollama land in Chunk 4). ~6 PRs. --- ## Chunk 4: Real OCR + real LLM clients **Purpose:** Wire SuryaOCRClient and OllamaClient into production. Tests gated on `IX_TEST_OLLAMA=1`. Pipeline factory switches from fakes to real clients based on env. ### Task 4.1: OllamaClient (real) **Branch:** `feat/ollama-client` **Files:** - Create: `src/ix/genai/ollama_client.py` - Create: `tests/unit/test_ollama_client.py` (uses pytest-httpx to mock) - Create: `tests/live/test_ollama_client_live.py` (gated on `IX_TEST_OLLAMA=1`) - [ ] Failing unit tests: `invoke` POSTs to `/api/chat` with `format=`; parses response into the Pydantic schema; surfaces `IX_002_000` on connection error / timeout; surfaces `IX_002_001` on schema-parse failure. - [ ] Live test: real call to `host.docker.internal:11434` (or `192.168.68.42:11434` from Mac) with `gpt-oss:20b` against a tiny `BankStatementHeader`-shaped schema; skipped unless `IX_TEST_OLLAMA=1`. - [ ] Implement. httpx AsyncClient with per-call timeout from config. - [ ] Green, commit, PR, merge. ### Task 4.2: SuryaOCRClient (real) **Branch:** `feat/surya-client` **Files:** - Create: `src/ix/ocr/surya_client.py` - Create: `tests/unit/test_surya_client.py` (mocked `surya.recognition.RecognitionPredictor`) - Create: `tests/live/test_surya_client_live.py` (gated on `IX_TEST_OLLAMA=1` — reuses the flag; rename to `IX_TEST_LIVE=1` if that collides) - [ ] Failing unit tests with Surya API mocked: given 3 Pages, returns an `OCRResult` with 3 matching pages, each with lines + 8-coord polygons. - [ ] Live test: runs Surya against `tests/fixtures/synthetic_giro.pdf`; asserts extracted text contains the known IBAN substring. - [ ] Implement. `selfcheck()` loads the model at startup and runs a 1-page sanity OCR on a blank page; used by `/healthz`. - [ ] Add surya to `pyproject.toml`: `surya-ocr` + `torch>=2.2` (CUDA 12.4 wheels). - [ ] Green, commit, PR, merge. ### Task 4.3: Pipeline factory + `/healthz` wiring **Branch:** `feat/production-wiring` **Files:** - Modify: `src/ix/app.py` (build production pipeline in lifespan, not fakes) - Create: `src/ix/genai/__init__.py` (factory: `make_client(config) -> GenAIClient`) - Create: `src/ix/ocr/__init__.py` (factory: `make_client(config) -> OCRClient`) - Modify: `src/ix/adapters/rest/routes.py` (`/healthz` probes real clients) - [ ] Failing tests: factory returns `OllamaClient` / `SuryaOCRClient` in production mode; `FakeOCRClient` / `FakeGenAIClient` when `IX_TEST_MODE=fake` env is set (used by integration tests). - [ ] Implement. - [ ] Green, commit, PR, merge. **Chunk 4 end state:** Running container can handle a real PDF end-to-end with real OCR and real LLM. Unit tests stay hermetic; live tests run on the Mac against the home server. ~3 PRs. --- ## Chunk 5: Containerization + deployment + E2E **Purpose:** Dockerize, configure the server, push-to-deploy, run the first live smoke test. ### Task 5.1: Dockerfile + docker-compose **Branch:** `feat/dockerize` **Files:** - Create: `Dockerfile` - Create: `docker-compose.yml` - Modify: `.env.example` (final list of vars) - [ ] Dockerfile: base `nvidia/cuda:12.4.0-runtime-ubuntu22.04`, install Python 3.12 via `deadsnakes`, install `uv`, copy source, `uv sync --no-dev`, CMD `alembic upgrade head && uvicorn ix.app:create_app --factory --host 0.0.0.0 --port 8994`. - [ ] docker-compose.yml: single service `infoxtractor`, port 8994, `runtime: nvidia`, GPU reservation, env_file `.env`, monitoring labels, backup labels, `extra_hosts: host.docker.internal:host-gateway`. - [ ] Build locally (`docker compose build`) to verify. - [ ] Commit, PR, merge (no deploy yet — see next task). ### Task 5.2: Server setup + post-receive hook **Branch:** `feat/deploy-setup` **Files:** - Create: `docs/deployment.md` - Create: `scripts/setup_server.sh` (one-shot: creates bare repo + post-receive hook + `infoxtractor` DB on postgis + `.env` on server) - [ ] `setup_server.sh` (run manually once): SSH to `server@192.168.68.42`, create `/home/server/Public/infoxtractor/repos.git` bare repo with post-receive hook that checks out to `/home/server/Public/infoxtractor/app/`, runs `docker compose up -d --build`, polls `/healthz` for 60 s, exits non-zero on failure. - [ ] Creates `infoxtractor` DB + role on the postgis container. - [ ] Writes `/home/server/Public/infoxtractor/app/.env` with real passwords (user provides via environment or prompt). - [ ] Commit, PR, merge. Run the script manually; doc the run in `deployment.md`. ### Task 5.3: Add `server` git remote + first deploy **Branch:** `feat/first-deploy` - [ ] Local: `git remote add server ssh://server@192.168.68.42/home/server/Public/infoxtractor/repos.git`. - [ ] Verify `ollama pull gpt-oss:20b` is done on the host (check `docker exec ollama ollama list`). - [ ] `git push server main`. Hook rebuilds. `/healthz` check. Smoke: `curl http://192.168.68.42:8994/healthz`. - [ ] Document remote setup in `deployment.md`. - [ ] No code PR — this task is ops. ### Task 5.4: E2E smoke test script **Branch:** `feat/e2e-smoke` **Files:** - Create: `scripts/e2e_smoke.py` - [ ] Submits `tests/fixtures/synthetic_giro.pdf` via `POST http://192.168.68.42:8994/jobs` (from Mac), polls, asserts per spec §12. Exits non-zero on failure. Prints timings. - [ ] Runs from Mac after every `git push server main` (documented as part of deploy habit in AGENTS.md). - [ ] Commit, PR, merge, deploy. Run smoke script; paste output into the PR description. **Chunk 5 end state:** Service live on `http://192.168.68.42:8994`, deploy gated by `/healthz` + E2E smoke. First consumer (mammon) can start building its integration. --- ## Out of scope for this plan (owned by mammon or future ix work) - **Mammon integration** — owned by mammon repo; spec'd separately. - **Second use case** (receipt/invoice) — after `bank_statement_header` is proven live. - **Multi-container worker**, Prometheus exporter, OpenTelemetry exporter, vision path, Config Server, Kafka transport, Azure/AWS/OpenAI adapters — all in spec §14. --- ## Review / handoff After all chunks merged and deployed: 1. Run `scripts/e2e_smoke.py` against live service; screenshot / paste output. 2. Ensure monitoring dashboard shows `infoxtractor` healthy at `http://192.168.68.42:8001`. 3. Confirm `/healthz` returns 200 for 5 minutes straight (no Surya OOMs, no Ollama missing model). 4. Tag release: `git tag v0.1.0 && git push forgejo v0.1.0`. 5. Open follow-up in mammon: "plan ix integration for needs_parser docs" referencing this spec + service URL.