commit 124403252dfd2e13fc5eaf744c52d0555a6c4789 Author: Dirk Riemann Date: Sat Apr 18 10:23:17 2026 +0200 Initial design: on-prem LLM extraction microservice MVP Establishes ix as an async, on-prem, LLM-powered structured extraction microservice. Full reference spec stays in docs/spec-core-pipeline.md; MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue transports in parallel, in-repo use cases, provenance-based reliability signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md. First use case: bank_statement_header (feeds mammon's needs_parser flow). Co-Authored-By: Claude Opus 4.7 (1M context) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..94e72d8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,17 @@ +__pycache__/ +*.pyc +*.pyo +*.egg-info/ +.venv/ +venv/ +.env +.env.local +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +htmlcov/ +.coverage +dist/ +build/ +*.log +/tmp/ diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..88db862 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,44 @@ +# InfoXtractor (ix) + +Async, on-prem, LLM-powered structured information extraction microservice. Given a document (PDF, image) or text plus a use case (a Pydantic schema), returns a structured JSON result with per-field provenance (source page, bounding box, OCR segment). + +Designed to be used by other on-prem services (e.g. mammon) as a reliable fallback / second opinion for format-specific deterministic parsers. + +Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP spec will live at `docs/superpowers/specs/`. + +## Guiding Principles + +- **On-prem always.** All LLM inference, OCR, and user-data processing run on the home server (192.168.68.42). No cloud APIs — OpenAI, Anthropic, Azure, AWS Bedrock/Textract, Google Document AI, Mistral, etc. are not to be used for user data or inference. LLM backend is Ollama (:11434); OCR runs locally (pluggable `OCRClient` interface, first engine: Surya on the RTX 3090); job state lives in local Postgres on the postgis container. The spec's references to Azure / AWS / OpenAI are examples to *replace*, not inherit. +- **Grounded extraction, not DB truth.** ix returns best-effort extracted fields with segment citations, provenance, and cross-OCR agreement signals. ix does *not* claim its output is DB-grade; the calling service (e.g. mammon) owns the reliability decision (reconcile against anchors, stage for review, compare to deterministic parsers). +- **Transport-agnostic pipeline core.** The pipeline (`RequestIX` → `ResponseIX`) knows nothing about HTTP, queues, or databases. Transport adapters (REST, Postgres queue, …) run in parallel alongside the core and all converge on one job store. + +## Habits + +- **Feature branches + PRs.** New work: `git checkout -b feat/` → commit small, logical chunks → `git push forgejo feat/` → create PR via Forgejo API → **wait for tests to pass** → merge → `git push server main` to deploy. +- **Keep documentation up to date in the same commit as the code.** `README.md`, `docs/`, and `AGENTS.md` update alongside the change. Unpushed / undocumented work is work that isn't done. +- **Deploy after merging.** `git push server main` rebuilds the Docker image via `post-receive` and restarts the container. Smoke-test the live service before walking away. +- **Never skip hooks** (`--no-verify`, etc.) without explicit user approval. Prefer creating new commits over amending. Never force-push `main`. +- **Forgejo**: repo at `http://192.168.68.42:3030/goldstein/infoxtractor` (to be created). Use basic auth with `FORGEJO_USR` / `FORGEJO_PSD` from `~/Projects/infrastructure/.env`, or an API token once issued for this repo. + +## Tech Stack (MVP) + +- **Language**: Python 3.12, asyncio +- **Web/REST**: FastAPI + uvicorn +- **OCR (pluggable)**: Surya OCR first (GPU, shares RTX 3090 with Ollama / Immich ML) +- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `gpt-oss:20b`, configurable per use case +- **State**: Postgres on the shared `postgis` container (:5431), new `infoxtractor` database +- **Deployment**: Docker, `git push server main` → post-receive rebuild (pattern from other apps) + +## Repository / Deploy + +- Git remotes: + - `forgejo`: `ssh://git@192.168.68.42:2222/goldstein/infoxtractor.git` (source of truth / PRs) + - `server`: bare repo with `post-receive` rebuild hook (to be set up) +- Workflow: feat branch → `git push forgejo feat/name` → PR via Forgejo API → merge → `git push server main` to deploy +- Monitoring label: `infrastructure.web_url=http://192.168.68.42:` +- Backup opt-in: `backup.enable=true` label on the container + +## Related Projects + +- **mammon** (`../mammon`) — first consumer. Uses ix as a fallback / second opinion for Paperless-imported bank statements where deterministic parsers don't match. +- **infrastructure** (`../infrastructure`) — server topology, deployment pattern, Ollama setup, shared `postgis` Postgres. diff --git a/README.md b/README.md new file mode 100644 index 0000000..71652b4 --- /dev/null +++ b/README.md @@ -0,0 +1,17 @@ +# InfoXtractor (ix) + +Async, on-prem, LLM-powered structured information extraction microservice. + +Given a document (PDF, image, text) and a named *use case*, ix returns a structured JSON result whose shape matches the use-case schema — together with per-field provenance (OCR segment IDs, bounding boxes, cross-OCR agreement flags) that let the caller decide how much to trust each extracted value. + +**Status:** design phase. Implementation about to start. + +- Full reference spec: [`docs/spec-core-pipeline.md`](docs/spec-core-pipeline.md) (aspirational; MVP is a strict subset) +- **MVP design:** [`docs/superpowers/specs/2026-04-18-ix-mvp-design.md`](docs/superpowers/specs/2026-04-18-ix-mvp-design.md) +- Agent / development notes: [`AGENTS.md`](AGENTS.md) + +## Principles + +- **On-prem always.** LLM = Ollama, OCR = local engines (Surya first). No OpenAI / Anthropic / Azure / AWS / cloud. +- **Grounded extraction, not DB truth.** ix returns best-effort fields + provenance; the caller decides what to trust. +- **Transport-agnostic pipeline core.** REST + Postgres-queue adapters in parallel on one job store. diff --git a/docs/spec-core-pipeline.md b/docs/spec-core-pipeline.md new file mode 100644 index 0000000..be617ba --- /dev/null +++ b/docs/spec-core-pipeline.md @@ -0,0 +1,912 @@ +# Core Pipeline Specification — InfoXtractor + + +## 1. What the system does + +InfoXtractor is an **async, LLM-powered document extraction pipeline**. Given one or more +documents (PDFs, images) or plain text and a *use case* that defines what to extract, it returns +a structured JSON result whose shape matches the use-case schema. + +Optional capabilities that are composed at runtime: + +| Capability | Trigger | What it adds | +|---|---|---| +| OCR | `options.ocr.use_ocr = true` (default) | Converts document pages to text via cloud OCR before passing to the LLM | +| Vision | `options.gen_ai.use_vision = true` | Also passes page images to the LLM as base64 | +| Provenance | `options.provenance.include_provenance = true` | Each extracted field is annotated with the page number, bounding box, and text snippet it came from | +| OCR-only | `options.ocr.ocr_only = true` | Returns OCR result without calling the LLM | + +--- + +## 2. Top-level data contracts + +### 2.1 RequestIX — pipeline input + +```python +class RequestIX: + use_case: str # Name, URL, or Base64-encoded use-case definition + ix_client_id: str # Client identifier (used for auth + metrics) + request_id: str # Correlation key set by the caller; echoed in response + ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller + context: Context # What to process + options: Options # How to process it + version: Optional[int] # Schema version (default 2) + +class Context: + files: List[str] # URLs to PDFs or images (S3, HTTP) + texts: List[str] # Raw text strings (treated as pages) + +class Options: + ocr: OCROptions + gen_ai: GenAIOptions + provenance: ProvenanceOptions +``` + +**OCROptions** + +| Field | Type | Default | Meaning | +|---|---|---|---| +| `use_ocr` | bool | `True` | Run cloud OCR on files | +| `ocr_only` | bool | `False` | Stop after OCR; skip LLM | +| `service` | enum | `document_intelligence` | Which OCR backend (`document_intelligence` / `computer_vision`) | +| `include_geometries` | bool | `False` | Include raw bounding boxes in `ocr_result` | +| `include_ocr_text` | bool | `False` | Attach flat OCR text to `ocr_result.result.text` | +| `include_page_tags` | bool | `True` | Inject `` XML markers into OCR lines | +| `computer_vision_scaling_factor` | int 1–10 | `1` | Scale multiplier for CV-based OCR | + +**GenAIOptions** + +| Field | Type | Default | Meaning | +|---|---|---|---| +| `gen_ai_model_name` | str | `gpt-4o` (from config) | LLM deployment name | +| `use_vision` | bool | `False` | Include page images in the LLM message | +| `vision_scaling_factor` | int 1–10 | `1` | Scale factor for rendered images | +| `vision_detail` | enum | `auto` | `auto` / `low` / `high` (OpenAI vision detail) | +| `reasoning_effort` | Optional[str] | `None` | Passed to reasoning models (e.g. `medium`, `high`) | + +**ProvenanceOptions** + +| Field | Type | Default | Meaning | +|---|---|---|---| +| `include_provenance` | bool | `False` | Enable provenance tracking | +| `granularity` | enum | `line` | `line` (only supported today) or `word` | +| `include_bounding_boxes` | bool | `True` | Attach normalised coordinates to each source | +| `max_sources_per_field` | int ≥1 | `10` | Cap on sources returned per field | +| `source_type` | enum | `value_and_context` | `value`: value segments only; `value_and_context`: value + label segments | +| `min_confidence` | float 0–1 | `0.0` | Minimum per-field confidence (currently a no-op; hook for future) | + +--- + +### 2.2 ResponseIX — pipeline output + +```python +class ResponseIX: + use_case: Optional[str] # Echoed from RequestIX.use_case + use_case_name: Optional[str] # Clean name from use-case definition + ix_client_id: Optional[str] + request_id: Optional[str] + ix_id: Optional[str] + error: Optional[str] # Non-None means processing failed; pipeline stopped + warning: List[str] # Non-fatal issues encountered during processing + ix_result: IXResult # LLM extraction result + ocr_result: OCRResult # OCR output (may be empty if OCR skipped) + provenance: Optional[ProvenanceData] # Field-level source tracking + metadata: Metadata # Timings, hostname + +class IXResult: + result: dict # Matches the use-case response schema + result_confidence: dict # Reserved; currently empty + meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name + +class OCRResult: + result: OCRDetails # pages with lines + bounding boxes + meta_data: dict + +class OCRDetails: + text: Optional[str] # Flat text (populated only if include_ocr_text=True) + pages: List[Page] # Structured OCR pages + +class Page: + page_no: int + width: float # Page width in points + height: float # Page height in points + angle: float # Skew angle + unit: Optional[str] + lines: List[Line] + +class Line: + text: Optional[str] + bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels) + words: Words + +class Metadata: + timings: List[Dict] # Step-level timing records + processed_by: Optional[str] # Hostname + use_case_truncated: Optional[bool] +``` + +**`ResponseIX.context`** is an internal-only mutable object that accumulates data as it passes +through pipeline steps. It is **excluded from serialisation** (`exclude=True`) and must be +stripped before the response is sent to any external consumer. + +--- + +## 3. The Step interface + +Every pipeline stage implements a two-method async interface: + +```python +class Step(ABC): + @abstractmethod + async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool: + """ + Returns True → run process() + Returns False → silently skip this step + Raises → set ResponseIX.error and abort the pipeline + """ + + @abstractmethod + async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX: + """Execute the step; mutate response_ix; return it.""" +``` + +**Key invariants:** + +- Both `validate` and `process` receive the **same** `request_ix` and `response_ix` objects. +- `process` receives the `response_ix` as enriched by all previous steps (via `context`). +- If `validate` raises, the error string is written to `response_ix.error` and no further steps run. +- If `process` raises, same: error is captured and pipeline stops. +- A step that returns `False` from `validate` leaves `response_ix` unchanged and the pipeline continues. + +--- + +## 4. Pipeline orchestration (`Pipeline`) + +```python +class Pipeline: + steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()] + debug_mode: bool # Enables memory-usage logging and GC inspection + + async def start(self, request_ix: RequestIX) -> ResponseIX: + # Wraps _execute_pipeline in an observability span + ... + + async def _execute_pipeline(self, request_ix, obs) -> ResponseIX: + response_ix = ResponseIX() + for step in self.steps: + # wrapped in a step-level observability span + Timer + success, response_ix = await self._execute_step(step, request_ix, response_ix) + if not success: + return response_ix # abort on first error + return response_ix + + async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX): + valid = await step.validate(request_ix, response_ix) + if valid: + response_ix = await step.process(request_ix, response_ix) + return True, response_ix + # exceptions caught → response_ix.error set → return False +``` + +The pipeline itself holds **no I/O references** — it does not know about Kafka, HTTP, or +databases. The transport layer instantiates `Pipeline`, calls `await pipeline.start(request_ix)`, +and handles routing the returned `ResponseIX`. + +--- + +## 5. Step 1 — SetupStep + +**Purpose:** Validate the request, fetch external files, load the use-case definition, and +convert all inputs into an internal `pages` representation for downstream steps. + +### 5.1 validate() + +Raises if: +- `request_ix` is `None` +- Both `context.files` and `context.texts` are empty + +### 5.2 process() + +**a) Copy text context** + +```python +response_ix.context.texts = list(request_ix.context.texts) +``` + +**b) Download files** (parallel, async) + +Each URL in `request_ix.context.files` is downloaded to a local temp path (`/tmp/ix/`). +The result is a list of `File(file_path, mime_type)` objects stored in +`response_ix.context.files`. + +Supported MIME types depend on the chosen OCR service; unsupported files raise immediately. + +**c) Load use case** + +The use case is fetched from the Config Server (see §9) by name, URL, or Base64. +On success, the two classes `UseCaseRequest` and `UseCaseResponse` are stored in +`response_ix.context.use_case_request` and `response_ix.context.use_case_response`. + +`use_case_name` is extracted from `UseCaseRequest().use_case_name` and stored in +`response_ix.use_case_name` for downstream steps and the final response. + +**d) Build pages** (parallel per document) + +The ingestor registers every file and classifies its MIME type. Rendering is done per document: + +- **PDF** (via PyMuPDF): one `Page` object per PDF page, holding `file_path`, `item_index` (0-based index into `request_ix.context.files`), `page_no`, `width`, `height`. +- **Image** (via PIL): one `Page` per frame. Multi-frame TIFFs produce multiple pages. +- **Text**: one `Page` per entry in `context.texts`. + +Maximum pages per PDF: 100 (hard cap; raises on violation). + +All pages are concatenated into a flat list stored in `response_ix.context.pages`. + +A `DocumentIngestor` instance is stored in `response_ix.context.ingestion` so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning). + +--- + +## 6. Step 2 — OCRStep + +**Purpose:** Convert document pages to structured text with bounding boxes. Optionally build +a `SegmentIndex` for provenance tracking. + +### 6.1 validate() + +Returns `True` when **any** of these flags are set AND `response_ix.context.files` is non-empty: + +- `use_ocr = True` +- `include_geometries = True` +- `include_ocr_text = True` +- `ocr_only = True` + +If none of the above apply (text-only request), returns `False` → step is skipped. + +Raises `IX_000_004` if `ocr_only`, `include_geometries`, or `include_ocr_text` is requested but +no files were provided. + +### 6.2 process() + +**a) Select OCR service** + +``` +service == "document_intelligence" → AzureDocumentIntelligence +service == "computer_vision" → AzureComputerVision +``` + +**b) `perform_ocr(request_ix, response_ix)`** (async, runs against all pages) + +Populates `response_ix.ocr_result` with an `OCRResult` containing pages with lines and +raw bounding boxes (8-coordinate polygons, pixel coordinates). + +**c) Inject page tags** (when `include_page_tags = True`) + +For each OCR page a synthetic `Line` is prepended and appended: + +``` + +… original lines … + +``` + +These tags are visible to the LLM as structural markers. The `SegmentIndex` builder +explicitly skips them. + +**d) Build SegmentIndex** (when `include_provenance = True`) + +```python +response_ix.context.segment_index = await SegmentIndex.build( + response_ix.ocr_result, + granularity, # line (default) or word + pages_metadata=response_ix.context.pages, +) +``` + +See §8 for the full SegmentIndex specification. + +--- + +## 7. Step 3 — GenAIStep + +**Purpose:** Call the LLM with the prepared context and parse its structured output into +`response_ix.ix_result`. Optionally map the LLM's segment citations to bounding boxes. + +### 7.1 validate() + +- Returns `False` when `ocr_only = True`. +- Raises when: + - `use_case` is empty + - `use_ocr = True` but no OCR text was produced + - neither `use_ocr` nor `use_vision` nor plain text context is available + - model is not found in Config Server + - `use_vision = True` but the model does not support vision + +If `include_provenance = True` but `use_ocr = False`, a warning is added (provenance will be +empty) but processing continues. + +### 7.2 process() + +**a) Prepare system prompt** + +```python +prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base +``` + +If `include_provenance = True`, append the segment-citation instruction (see §8.2). + +**b) Build text context** + +When provenance is on and a `SegmentIndex` exists: + +``` +[p1_l0] Line text +[p1_l1] Next line +… +``` + +Otherwise: join OCR page lines with `\n\n` between pages, then append any raw texts. + +**c) Build visual context** (when `use_vision = True`) + +For each page: +1. Render to PIL image at `vision_scaling_factor × original_size` (capped by `render_max_pixels_per_page`). +2. Convert TIFF frames to JPEG. +3. Base64-encode. +4. Wrap as a provider-neutral image part: + +```json +{"type": "image", "media_type": "image/jpeg", "data": "", "detail": "auto"} +``` + +**d) Assemble LLM call parameters** + +```python +request_kwargs = { + "model": model_name, + "temperature": from_model_settings, + "messages": [ + {"role": "system", "content": prompt_string}, + {"role": "user", "content": [text_part, *image_parts]}, + ], + # "reasoning_effort": "medium" ← only for reasoning-capable models +} +``` + +**e) Determine response schema** + +- When `include_provenance = False`: schema is `UseCaseResponse` directly. +- When `include_provenance = True`: a dynamic wrapper is created at runtime: + +```python +ProvenanceWrappedResponse = create_model( + "ProvenanceWrappedResponse", + result=(UseCaseResponse, Field(...)), + segment_citations=(List[SegmentCitation], Field(default_factory=list)), +) +``` + +The use-case file is **never modified**; the wrapper is ephemeral. + +**f) Invoke LLM** + +```python +result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings) +# result.parsed → the Pydantic model instance +# result.usage → GenAIUsage(prompt_tokens, completion_tokens) +# result.model_name → string +``` + +**g) Write results** + +- `response_ix.ix_result.result = parsed.result.model_dump()` (provenance mode) or `parsed.model_dump()` (normal mode) +- `response_ix.ix_result.meta_data = {token_usage, model_name}` + +**h) Build provenance** (when `include_provenance = True`) + +```python +response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance( + extraction_result={"result": ix_result}, + segment_citations=parsed.segment_citations, + segment_index=response_ix.context.segment_index, + max_sources_per_field=opts.max_sources_per_field, + min_confidence=opts.min_confidence, + include_bounding_boxes=opts.include_bounding_boxes, + source_type=opts.source_type, +) +``` + +See §8.4 for the provenance resolution algorithm. + +### 7.3 Provider routing + +```python +provider = model_settings["provider"] # "azure" or "bedrock" +if provider == "bedrock": service = AWSGenAI() +else: service = AzureOpenAI() +``` + +The `GenAIClient` interface is: + +```python +class GenAIClient(ABC): + def setup_gen_ai_client(self, request_ix, model_settings) -> None: ... + async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ... +``` + +`AzureOpenAI` translates provider-neutral image parts to OpenAI's `image_url` content format. +It caches one `AsyncOpenAI` instance per `(base_url, api_key)` pair. + +--- + +## 8. Step 4 — ResponseHandlerStep + +**Purpose:** Shape the final payload before handing it to the transport layer. + +### 8.1 validate() + +Always returns `True`. + +### 8.2 process() + +1. **Attach OCR text** (`include_ocr_text = True`): concatenate all line texts per page, + join pages with `\n\n`, store in `ocr_result.result.text`. + +2. **Strip geometries** (`include_geometries = False`, default): clear `ocr_result.result.pages` + and `ocr_result.meta_data`. The structured page data is large; callers who don't need it + should not receive it. + +3. **Trim context**: delete `response_ix.context` entirely. The mutable internal context is + never serialised. + +--- + +## 9. Provenance subsystem + +### 9.1 SegmentIndex + +Built from `ocr_result` after OCR completes. It assigns each non-tag OCR line a unique ID: + +``` +p{global_page_position}_l{line_index_within_page} +``` + +- `global_page_position` is 1-based and refers to the **flat pages list position**, not the + page number printed in the document. This guarantees uniqueness across multi-document requests. +- Page tag lines (`` / ``) are explicitly excluded. + +Internal storage: + +```python +_id_to_position: Dict[str, Dict] = { + "p1_l0": { + "page": 1, # global 1-based position + "bbox": BoundingBox(...), # normalised 0-1 coordinates + "text": "Invoice Number: INV-001", + "file_index": 0, # 0-based index into request.context.files + }, + ... +} +_ordered_ids: List[str] # insertion order, for deterministic prompt text +``` + +**Bounding box normalisation:** + +```python +# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height) +[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H] +``` + +Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering. + +**Prompt format** (`SegmentIndex.to_prompt_text()`): + +``` +[p1_l0] Invoice Number: INV-001 +[p1_l1] Date: 2024-01-15 +[p1_l2] Total: CHF 1234.00 +… +``` + +Extra plain texts (from `context.texts`) are appended untagged. + +**O(1) lookup:** + +```python +position = segment_index.lookup_segment("p1_l0") # None if unknown +``` + +### 9.2 SegmentCitation (LLM output model) + +The LLM is instructed to populate `segment_citations` in its structured output: + +```python +class SegmentCitation(BaseModel): + field_path: str # e.g. "result.invoice_number" + value_segment_ids: List[str] # segments containing the extracted value + context_segment_ids: List[str] # surrounding label/anchor segments (may be empty) +``` + +**System prompt addition (injected by GenAIStep when provenance is on):** + +``` +For each extracted field, you must also populate the `segment_citations` list. +Each entry maps one field to the document segments that were its source. +Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number'). +Use two separate segment ID lists: +- `value_segment_ids`: segment IDs whose text directly contains the extracted value + (e.g. ['p1_l4'] for the line containing 'INV-001'). +- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you + identify the field but does not contain the value itself + (e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label. +Only use segment IDs that appear in the document text. +Omit fields for which you cannot identify a source segment. +``` + +### 9.3 ProvenanceData (response model) + +```python +class ProvenanceData(BaseModel): + fields: Dict[str, FieldProvenance] # field_path → provenance + quality_metrics: Dict[str, Any] # see below + segment_count: Optional[int] + granularity: Optional[str] # "line" or "word" + +class FieldProvenance(BaseModel): + field_name: str # last segment of field_path + field_path: str # e.g. "result.invoice_number" + value: Any # actual extracted value (resolved from ix_result.result) + sources: List[ExtractionSource] + confidence: Optional[float] # None today; reserved for future + +class ExtractionSource(BaseModel): + page_number: int # global 1-based position + file_index: Optional[int] # 0-based index into request.context.files + bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False + text_snippet: str # actual OCR text at this location + relevance_score: float # always 1.0 (exact segment ID match) + segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering + +class BoundingBox(BaseModel): + coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised +``` + +**quality_metrics:** + +```json +{ + "fields_with_provenance": 8, + "total_fields": 10, + "coverage_rate": 0.8, + "invalid_references": 2 +} +``` + +- `coverage_rate` = fields with at least one source ÷ total leaf fields in result +- `invalid_references` = segment IDs cited by LLM that were not found in SegmentIndex + +### 9.4 Provenance resolution algorithm + +```python +for citation in segment_citations: + if source_type == VALUE: + seg_ids = citation.value_segment_ids + else: + seg_ids = citation.value_segment_ids + citation.context_segment_ids + + sources = [] + for seg_id in seg_ids[:max_sources_per_field]: + pos = segment_index.lookup_segment(seg_id) + if pos is None: + invalid_ref_count += 1 + continue + sources.append(ExtractionSource( + page_number=pos["page"], + file_index=pos["file_index"], + bounding_box=pos["bbox"] if include_bounding_boxes else None, + text_snippet=pos["text"], + relevance_score=1.0, + segment_id=seg_id, + )) + + if not sources: + continue # skip fields that resolve to nothing + + # Resolve value: traverse result dict via dot-notation field_path + # e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"] + value = resolve_nested_path(extraction_result, citation.field_path) + + provenance[citation.field_path] = FieldProvenance( + field_name=citation.field_path.split(".")[-1], + field_path=citation.field_path, + value=value, + sources=sources, + confidence=None, + ) +``` + +Array notation is normalised before traversal: `"items[0].name"` → `"items.0.name"`. + +--- + +## 10. Use-case definition contract + +A use case is a Python module (or dynamically loaded class) that defines two Pydantic models. +The Config Server serves them by name; the pipeline loads them via `config_server.use_cases.load_and_validate_use_case(use_case)`. + +### UseCaseRequest + +```python +class UseCaseRequest(BaseModel): + use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name + system_prompt: str # System prompt sent to the LLM + # Legacy: prompt_template_base (deprecated alias for system_prompt) + # Legacy: kwargs_use_case (deprecated; triggers format(**kwargs)) +``` + +### UseCaseResponse + +A Pydantic model whose fields define the extraction schema. The LLM is instructed to return +JSON matching this schema (via `chat.completions.parse(response_format=UseCaseResponse)`). + +**Example:** + +```python +class InvoiceResponse(BaseModel): + invoice_number: str + invoice_date: str + total_amount: float + line_items: List[LineItem] + +class LineItem(BaseModel): + description: str + quantity: int + unit_price: float +``` + +The schema is passed as the `response_format` argument to the OpenAI structured-output API. +When provenance is on, it is wrapped in `ProvenanceWrappedResponse` at runtime (§7.2e). + +--- + +## 11. LLM invocation contract + +### 11.1 Provider-neutral request_kwargs + +```python +request_kwargs = { + "model": str, # deployment name + "temperature": Optional[float], # from model_settings; None for reasoning models + "messages": [ + {"role": "system", "content": str}, + {"role": "user", "content": List[ContentPart]}, + ], + "reasoning_effort": Optional[str], # only present when supported by model +} + +ContentPart = ( + {"type": "text", "text": str} + | {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str} +) +``` + +### 11.2 GenAIInvocationResult + +```python +@dataclass +class GenAIInvocationResult: + parsed: Any # Parsed Pydantic model instance (matches response_schema) + usage: GenAIUsage # prompt_tokens, completion_tokens + model_name: str +``` + +### 11.3 Model settings (from Config Server) + +```json +{ + "provider": "azure", + "temperature": 0.0, + "vision_capability": true, + "supports_reasoning_efforts": ["low", "medium", "high"], + "default_reasoning_effort": "medium" +} +``` + +- `temperature` is `None` for reasoning models (OpenAI ignores it and it must be omitted). +- `vision_capability` must be `true` for `use_vision = True` requests to be accepted. +- `supports_reasoning_efforts` is the allowlist; requests for unsupported values fall back to + `default_reasoning_effort` with a warning. + +--- + +## 12. Error model + +### 12.1 IXException + +All domain errors are wrapped in `IXException(message_or_Error_enum)`. The pipeline catches +any exception from `validate()` or `process()`, writes `str(e)` to `response_ix.error`, and +stops. + +### 12.2 Named error codes + +| Code | Trigger | +|---|---| +| `IX_000_000` | `request_ix` is `None` | +| `IX_000_002` | No context (neither files nor texts) | +| `IX_000_004` | OCR required but no files provided | +| `IX_000_005` | File MIME type not supported by chosen OCR service | +| `IX_001_000` | `use_case` is empty or missing | +| `IX_001_001` | Use case could not be loaded or validated | +| `IX_003_003` | `use_vision=True` but model has no vision capability | +| `IX_004_001` | Response too large for transport (e.g. Kafka max message size) | + +### 12.3 Warnings + +Non-fatal issues are appended to `response_ix.warning` and logged. Examples: + +- Vision requested but no renderable documents found +- Image scale capped due to pixel limit +- Provenance requested with `use_ocr=False` +- Word document contains embedded images that will be skipped +- `use_case_name` not set in use-case definition + +--- + +## 13. Configuration (AppConfig) + +All settings are loaded from environment variables (with `.env` fallback via `pydantic-settings`). + +| Category | Key env vars | +|---|---| +| Azure OCR | `DOCUMENT_INTELLIGENCE_ENDPOINT`, `DOCUMENT_INTELLIGENCE_KEY`, `COMPUTER_VISION_ENDPOINT`, `COMPUTER_VISION_SUBSCRIPTION_KEY` | +| Azure OpenAI | `OPENAI_BASE_URL`, `OPENAI_API_KEY` | +| LLM defaults | `DEFAULT_LLM` (default: `gpt-4o`) | +| Config Server | `CONFIG_SERVER_BASE_URL` (default: `http://ix-config:8998/`) | +| Pipeline | `PIPELINE_WORKER_CONCURRENCY` (1–10), `PIPELINE_REQUEST_TIMEOUT_SECONDS` (default: 2700) | +| Rendering | `RENDER_MAX_PIXELS_PER_PAGE` (default: 75,000,000) | +| AWS | `S3_BUCKET`, `S3_OBJECT_PREFIX`, `AWS_REGION`, `TARGET_ROLE` | +| Kafka | `PROCESSING_TOPIC`, `KAFKA_BOOTSTRAP_SERVER`, … | +| Observability | `LOG_LEVEL`, `OTEL_EXPORTER_OTLP_ENDPOINT`, `PYROSCOPE_SERVER_ADDRESS` | + +--- + +## 14. Observability + +### 14.1 Logging + +Every log call carries `ix_id` as contextual metadata. The custom logger exposes: + +```python +logger.info_ix(msg, request_ix) +logger.warning_ix(msg, request_ix) +logger.error_ix(msg, request_ix) +``` + +If `ix_id` is not yet set on `request_ix`, the logger generates one lazily on first use. + +Step timing is recorded via a `Timer` context manager; results are stored in +`response_ix.metadata.timings`. + +### 14.2 Tracing (OpenTelemetry) + +- **Pipeline span** wraps the entire `_execute_pipeline()` call (attributes: `request_id`, `use_case`, `client_id`). +- **Step span** wraps each individual step. +- Errors are recorded on the span with `mark_failed(error_message)`. +- Exported via gRPC to `OTEL_EXPORTER_OTLP_ENDPOINT`. + +### 14.3 Metrics (Prometheus) + +| Metric | Labels | +|---|---| +| `pipeline_requests_total` | `client_id` | +| LLM token counter | `model`, `client_id`, `use_case`, `status` | +| LLM latency histogram | `model`, `client_id` | +| Extraction quality | `client_id`, `use_case`, `model`, `reasoning_effort` | + +Exposed on the health port (default 9009). + +--- + +## 15. Transport adapter contract + +The pipeline is transport-agnostic. Any delivery mechanism must: + +1. **Deserialise** the incoming payload into a `RequestIX` instance. +2. **Assign `ix_id`** (a random 16-character hex string) if not already present. +3. **Call** `await Pipeline().start(request_ix)` → `response_ix`. +4. **Serialise** `response_ix` (use `.model_dump(by_alias=True)` to honour `serialization_alias` on `metadata`). +5. **Deliver** the serialised response to the caller via the appropriate channel. + +Optionally: +- **Persist** `request_ix` and `response_ix` to a durable store (S3, database) keyed by `ix_id`. +- **Apply a timeout**: wrap the `await pipeline.start(...)` call and write a timeout error to `response_ix.error` on expiry. + +### Kafka adapter (existing) + +- Consumes from `ch-infoxtractor.processing.v2`; produces to `ch-infoxtractor.outbound.v2` and client-specific topics. +- Concurrency controlled via asyncio semaphore (`PIPELINE_WORKER_CONCURRENCY`). +- Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets). +- Graceful shutdown: drains in-flight messages before stopping. + +### FastAPI adapter (example) + +```python +@router.post("/extract") +async def extract(body: RequestIX) -> ResponseIX: + body.ix_id = body.ix_id or secrets.token_hex(8) + response = await pipeline.start(body) + return response +``` + +### Database adapter (example) + +```python +async def process_job(job_id: str, db: AsyncSession): + row = await db.get(Job, job_id) + request_ix = RequestIX.model_validate_json(row.payload) + request_ix.ix_id = row.ix_id + response_ix = await pipeline.start(request_ix) + row.result = response_ix.model_dump_json(by_alias=True) + row.status = "done" if not response_ix.error else "error" + await db.commit() +``` + +--- + +## 16. End-to-end data flow summary + +``` +[Transport layer] + ↓ RequestIX (use_case, context.files/texts, options, ix_id) + ↓ +[Pipeline.start()] + ↓ +[SetupStep] + • Download files → local paths → response_ix.context.files (File objects) + • Load use case from Config Server → response_ix.context.use_case_request/response + • Split PDFs / load images / wrap texts → response_ix.context.pages (flat list) + ↓ +[OCRStep] (skipped if no files or use_ocr=False) + • Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox) + • Inject XML markers → lines prepended/appended in ocr_result.pages + • Build SegmentIndex (if provenance) → response_ix.context.segment_index + ↓ +[GenAIStep] (skipped if ocr_only=True) + • Build system prompt (+ citation hint if provenance) + • Build user content: segment-tagged text OR plain OCR text + optional base64 images + • Determine response schema (wrapped if provenance) + • Call LLM via GenAIClient.invoke() + • Write ix_result.result + meta_data + • Resolve segment citations → ProvenanceData (if provenance) + ↓ +[ResponseHandlerStep] (always runs) + • Attach flat OCR text (if include_ocr_text) + • Strip bounding boxes (if not include_geometries) + • Delete response_ix.context (never serialised) + ↓ +[ResponseIX] + ix_result.result ← structured extraction matching UseCaseResponse schema + ocr_result ← OCR details (empty pages unless include_geometries) + provenance ← field-level source map (None unless include_provenance) + error / warning ← diagnostics + metadata ← timings, hostname + ↓ +[Transport layer] + Serialise → deliver to caller + Persist (optional) → keyed by ix_id +``` + +--- + +## 17. Key design decisions + +| Decision | Rationale | +|---|---| +| **Two-method Step interface** (`validate` + `process`) | Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop | +| **Mutable `context` on ResponseIX** | Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; `context` is excluded from serialisation so it never leaks | +| **Single LLM call for provenance** | The schema is wrapped at runtime to add `segment_citations`; no second round-trip to the LLM is needed | +| **Segment IDs instead of fuzzy text matching** | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates | +| **Global page position for segment IDs** | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce `p1_l*` and `p2_l*`, not the same IDs) | +| **Provider-neutral `request_kwargs`** | The `GenAIClient` interface accepts a dict; provider-specific translation (e.g. OpenAI `image_url` format) happens inside each adapter, not in the step | +| **Normalised bounding boxes (0–1)** | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates | +| **Transport-agnostic pipeline** | Kafka, FastAPI, and DB adapters all call the same `Pipeline.start()` | +| **Config Server for use cases** | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline | diff --git a/docs/superpowers/specs/2026-04-18-ix-mvp-design.md b/docs/superpowers/specs/2026-04-18-ix-mvp-design.md new file mode 100644 index 0000000..72aacb4 --- /dev/null +++ b/docs/superpowers/specs/2026-04-18-ix-mvp-design.md @@ -0,0 +1,388 @@ +# InfoXtractor (ix) MVP — Design + +Date: 2026-04-18 +Reference: `docs/spec-core-pipeline.md` (full, aspirational spec — MVP is a strict subset) +Status: Design approved (sections 1–8 walked through and accepted 2026-04-18) + +## 0. One-paragraph summary + +ix is an on-prem, async, LLM-powered microservice that extracts structured JSON from documents (PDFs, images, text) given a named *use case* (a Pydantic schema + system prompt). It returns the extracted fields together with per-field provenance (OCR segment IDs, bounding boxes, extracted-value agreement flags) that let calling services decide how much to trust each value. The MVP ships one use case (`bank_statement_header`), one OCR engine (Surya, pluggable), one LLM backend (Ollama, pluggable), and two transports in parallel (REST with optional webhook callback + a Postgres queue). Cloud services are explicitly forbidden. The first consumer is mammon, which uses ix as a fallback for `needs_parser` documents. + +## 1. Guiding principles + +- **On-prem always.** No OpenAI, Anthropic, Azure (DI/CV/OpenAI), AWS (Bedrock/Textract), Google Document AI, Mistral, etc. LLM = Ollama (:11434). OCR = local engines only. Secrets never leave the home server. The spec's cloud references are examples to replace, not inherit. +- **Grounded extraction, not DB truth.** ix returns best-effort fields with segment citations, provenance verification, and cross-OCR agreement flags. ix does *not* claim DB-grade truth. The reliability decision (trust / stage for review / reject) belongs to the caller. +- **Transport-agnostic pipeline core.** The pipeline (`RequestIX` → `ResponseIX`) knows nothing about HTTP, queues, or databases. Adapters (REST, Postgres queue) run alongside the core; both converge on one shared job store. +- **YAGNI for all spec features the MVP doesn't need.** Kafka, Config Server, Azure/AWS clients, vision, word-level provenance, reasoning-effort routing, Prometheus/OTEL exporters: deferred. Architecture leaves the interfaces so they can be added without touching the pipeline core. + +## 2. Architecture + +``` +┌──────────────────────────────────────────────────────────────────┐ +│ infoxtractor container (Docker on 192.168.68.42, port 8994) │ +│ │ +│ ┌──────────────────┐ ┌──────────────────────────┐ │ +│ │ rest_adapter │ │ pg_queue_adapter │ │ +│ │ (FastAPI) │ │ (asyncio worker) │ │ +│ │ POST /jobs │ │ LISTEN ix_jobs_new + │ │ +│ │ GET /jobs/{id} │ │ SELECT ... FOR UPDATE │ │ +│ │ + callback_url │ │ SKIP LOCKED │ │ +│ └────────┬─────────┘ └────────┬─────────────────┘ │ +│ │ │ │ +│ └──────────┬──────────────┘ │ +│ ▼ │ +│ ┌────────────────┐ │ +│ │ ix_jobs table │ ── postgis :5431, DB=infoxtractor│ +│ └────────┬───────┘ │ +│ ▼ │ +│ ┌─────────────────────────────┐ │ +│ │ Pipeline core (spec §3–§4) │ │ +│ │ │ │ +│ │ SetupStep → OCRStep → │ │ +│ │ GenAIStep → ReliabilityStep│ │ +│ │ → ResponseHandler │ │ +│ │ │ │ +│ │ Uses: OCRClient (Surya), │ │ +│ │ GenAIClient (Ollama),│ │ +│ │ UseCaseRegistry │ │ +│ └─────────────────────────────┘ │ +│ │ +└──────────────────────────────────────────────────────────────────┘ + │ ▲ + ▼ │ + host.docker.internal:11434 mammon or any on-prem caller — + Ollama (gpt-oss:20b default) polls GET /jobs/{id} until done +``` + +**Key shapes:** +- Spec's four steps + a new fifth: `ReliabilityStep` runs between `GenAIStep` and `ResponseHandlerStep`, computes per-field `provenance_verified` and `text_agreement` flags. Isolated so callers and tests can reason about reliability signals independently. +- Single worker at MVP (`PIPELINE_WORKER_CONCURRENCY=1`). Ollama + Surya both want the GPU serially. +- Two transports, one job store. REST is the primary; pg queue is scaffolded, uses the same table, same lifecycle. +- Use case registry in-repo (`ix/use_cases/__init__.py`); adding a new use case = new module + one registry line. + +## 3. Data contracts + +Subset of spec §2 / §9.3. Dropped fields are no-ops under the MVP's feature set. + +### RequestIX + +```python +class RequestIX(BaseModel): + use_case: str # registered name, e.g. "bank_statement_header" + ix_client_id: str # caller tag for logs/metrics, e.g. "mammon" + request_id: str # caller's correlation id; echoed back + ix_id: Optional[str] # transport-assigned short hex id + context: Context + options: Options = Options() + callback_url: Optional[str] # optional webhook delivery (one-shot, no retry) + +class Context(BaseModel): + files: list[str] = [] # URLs or file:// paths + texts: list[str] = [] # extra text (e.g. Paperless OCR output) + +class Options(BaseModel): + ocr: OCROptions = OCROptions() + gen_ai: GenAIOptions = GenAIOptions() + provenance: ProvenanceOptions = ProvenanceOptions() + +class OCROptions(BaseModel): + use_ocr: bool = True + ocr_only: bool = False + include_ocr_text: bool = False + include_geometries: bool = False + service: Literal["surya"] = "surya" # kept so the adapter point is visible + +class GenAIOptions(BaseModel): + gen_ai_model_name: Optional[str] = None # None → use-case default → IX_DEFAULT_MODEL + +class ProvenanceOptions(BaseModel): + include_provenance: bool = True # default ON (reliability is the point) + max_sources_per_field: int = 10 +``` + +**Dropped from spec (no-ops under MVP):** `OCROptions.computer_vision_scaling_factor`, `include_page_tags` (always on), `GenAIOptions.use_vision`/`vision_scaling_factor`/`vision_detail`/`reasoning_effort`, `ProvenanceOptions.granularity`/`include_bounding_boxes`/`source_type`/`min_confidence`, `RequestIX.version`. + +### ResponseIX + +Identical to spec §2.2 except `FieldProvenance` gains two fields: + +```python +class FieldProvenance(BaseModel): + field_name: str + field_path: str + value: Any + sources: list[ExtractionSource] + confidence: Optional[float] = None # reserved; always None in MVP + provenance_verified: bool # NEW: cited segment actually contains value (normalized) + text_agreement: Optional[bool] # NEW: value appears in RequestIX.context.texts; None if no texts +``` + +`quality_metrics` gains two counters: `verified_fields`, `text_agreement_fields`. + +### Job envelope (in `ix_jobs` table; returned by REST) + +```python +class Job(BaseModel): + job_id: UUID + ix_id: str + client_id: str + request_id: str + status: Literal["pending", "running", "done", "error"] + request: RequestIX + response: Optional[ResponseIX] + callback_url: Optional[str] + callback_status: Optional[Literal["pending", "delivered", "failed"]] + attempts: int = 0 + created_at: datetime + started_at: Optional[datetime] + finished_at: Optional[datetime] +``` + +## 4. Job store + +```sql +CREATE DATABASE infoxtractor; -- on the existing postgis container + +CREATE TABLE ix_jobs ( + job_id UUID PRIMARY KEY, + ix_id TEXT NOT NULL, + client_id TEXT NOT NULL, + request_id TEXT NOT NULL, + status TEXT NOT NULL, + request JSONB NOT NULL, + response JSONB, + callback_url TEXT, + callback_status TEXT, + attempts INT NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + started_at TIMESTAMPTZ, + finished_at TIMESTAMPTZ +); +CREATE INDEX ix_jobs_status_created ON ix_jobs (status, created_at) WHERE status = 'pending'; +CREATE INDEX ix_jobs_client_request ON ix_jobs (client_id, request_id); +-- Postgres NOTIFY channel used by the pg_queue_adapter: 'ix_jobs_new' +``` + +Callers that prefer direct SQL (the `pg_queue_adapter` contract): insert a row with `status='pending'`, then `NOTIFY ix_jobs_new, ''`. The worker also falls back to a 10 s poll so a missed notify or ix restart doesn't strand a job. + +## 5. REST surface + +| Method | Path | Purpose | +|---|---|---| +| `POST` | `/jobs` | Body = `RequestIX` (+ optional `callback_url`). → `201 {job_id, ix_id, status: "pending"}`. Idempotent on `(ix_client_id, request_id)` — same pair returns the existing `job_id` with `200`. | +| `GET` | `/jobs/{job_id}` | → full `Job`. Source of truth regardless of submission path or callback outcome. | +| `GET` | `/jobs?client_id=…&request_id=…` | Lookup-by-correlation (caller idempotency helper). Returns latest match or `404`. | +| `GET` | `/healthz` | `{ollama: ok/fail, postgres: ok/fail, ocr: ok/fail}`. Used by `infrastructure` monitoring dashboard. | +| `GET` | `/metrics` | Counters: `jobs_pending`, `jobs_running`, `jobs_done_24h`, `jobs_error_24h`, per-use-case avg seconds. Plain JSON, no Prometheus format for MVP. | + +**Callback delivery** (when `callback_url` is set): one POST of the full `Job` body, 10 s timeout. 2xx → `callback_status='delivered'`. Anything else → `'failed'`. No retry. Callers always have `GET /jobs/{id}` as the authoritative fallback. + +## 6. Pipeline steps + +Interface per spec §3 (`async validate` + `async process`). Pipeline orchestration per spec §4 (first error aborts; each step wrapped in a `Timer` landing in `Metadata.timings`). + +### SetupStep + +- **validate**: `request_ix` non-null; `context.files` or `context.texts` non-empty. +- **process**: + - Copy `request_ix.context.texts` → `response_ix.context.texts`. + - Download each URL in `context.files` to `/tmp/ix//` in parallel. MIME detection via `python-magic`. Supported: PDF, PNG, JPEG, TIFF. Unsupported → `IX_000_005`. + - Load use case: `request_cls, response_cls = REGISTRY[request_ix.use_case]`. Store instances in `response_ix.context.use_case_request` / `use_case_response`. Echo `use_case_request.use_case_name` → `response_ix.use_case_name`. + - Build flat `response_ix.context.pages`: one entry per PDF page (via PyMuPDF), one per image frame, one per text entry. Hard cap 100 pages/PDF → `IX_000_006` on violation. + +### OCRStep + +- **validate**: returns `True` iff `(use_ocr or ocr_only or include_geometries or include_ocr_text) and context.files`. Otherwise `False` → step skipped (text-only requests). +- **process**: `ocr_result = await OCRClient.ocr(context.pages)` → `response_ix.ocr_result`. Always inject `` tags (simplifies grounding). If `include_provenance`: build `SegmentIndex` (line granularity, normalized bboxes 0-1) and store in `context.segment_index`. +- **OCRClient interface**: + ```python + class OCRClient(Protocol): + async def ocr(self, pages: list[Page]) -> OCRResult: ... + ``` + MVP implementation: `SuryaOCRClient` (GPU via `surya-ocr` PyPI package, CUDA on the RTX 3090). + +### GenAIStep + +- **validate**: `ocr_only` → `False` (skip). Use case must exist. OCR text or `context.texts` must be non-empty (else `IX_001_000`). +- **process**: + - System prompt = `use_case_request.system_prompt`. If `include_provenance`: append spec §9.2 citation instruction verbatim. + - User text: segment-tagged (`[p1_l0] …`) when provenance is on; plain concatenated OCR + texts otherwise. + - Response schema: `UseCaseResponse` directly, or the dynamic `ProvenanceWrappedResponse(result=..., segment_citations=...)` per spec §7.2e when provenance is on. + - Model: `request_ix.options.gen_ai.gen_ai_model_name` → `use_case_request.default_model` → `IX_DEFAULT_MODEL`. + - Call `GenAIClient.invoke(request_kwargs, response_schema)`; parsed model → `ix_result.result`, usage + model_name → `ix_result.meta_data`. + - If provenance: call `ProvenanceUtils.map_segment_refs_to_provenance(...)` per spec §9.4, write `response_ix.provenance`. +- **GenAIClient interface**: + ```python + class GenAIClient(Protocol): + async def invoke(self, request_kwargs: dict, response_schema: type[BaseModel]) -> GenAIInvocationResult: ... + ``` + MVP implementation: `OllamaClient` — `POST http://host.docker.internal:11434/api/chat` with `format = ` (Ollama structured outputs). + +### ReliabilityStep (new; runs when `include_provenance` is True) + +For each `FieldProvenance` in `response_ix.provenance.fields`: + +- **`provenance_verified`**: for each cited segment, compare `text_snippet` to the extracted `value` after normalization (see below). If any cited segment agrees → `True`. Else `False`. +- **`text_agreement`**: if `request_ix.context.texts` is empty → `None`. Else run the same comparison against the concatenated texts → `True` / `False`. + +**Normalization rules** (cheap, language-neutral, applied to both sides before `in`-check): + +- Strings: Unicode NFKC, casefold, collapse whitespace, strip common punctuation. +- Numbers (`int`, `float`, `Decimal` values): digits-and-sign only; strip currency symbols, thousands separators, decimal-separator variants (`.`/`,`); require exact match to 2 decimal places for amounts. +- Dates: parse to ISO `YYYY-MM-DD`; compare as strings. Accept common German / Swiss / US formats. +- IBANs: uppercase, strip spaces. +- Very short values (≤ 2 chars, or numeric |value| < 10): `text_agreement` skipped (returns `None`) — too noisy to be a useful signal. + +Records are mutations to the provenance structure only; does **not** drop fields. Caller sees every extracted field + the flags. + +Writes `quality_metrics.verified_fields` and `quality_metrics.text_agreement_fields` summary counts. + +### ResponseHandlerStep + +Per spec §8, unchanged. Attach flat OCR text when `include_ocr_text`; strip `ocr_result.pages` unless `include_geometries`; delete `context` before serialization. + +## 7. Use case registry + +``` +ix/use_cases/ + __init__.py # REGISTRY: dict[str, tuple[type[UseCaseRequest], type[UseCaseResponse]]] + bank_statement_header.py +``` + +Adding a use case = new module exporting a `Request(BaseModel)` (`use_case_name`, `default_model`, `system_prompt`) and a `UseCaseResponse(BaseModel)`, then one `REGISTRY[""] = (Request, UseCaseResponse)` line. + +### First use case: `bank_statement_header` + +```python +class BankStatementHeader(BaseModel): + bank_name: str + account_iban: Optional[str] + account_type: Optional[Literal["checking", "credit", "savings"]] + currency: str + statement_date: Optional[date] + statement_period_start: Optional[date] + statement_period_end: Optional[date] + opening_balance: Optional[Decimal] + closing_balance: Optional[Decimal] + +class Request(BaseModel): + use_case_name: str = "Bank Statement Header" + default_model: str = "gpt-oss:20b" + system_prompt: str = ( + "You extract header metadata from a single bank or credit-card statement. " + "Return only facts that appear in the document; leave a field null if uncertain. " + "Balances must use the document's numeric format (e.g. '1234.56' or '-123.45'); " + "do not invent a currency symbol. Account type: 'checking' for current/Giro accounts, " + "'credit' for credit-card statements, 'savings' otherwise. Always return the IBAN " + "with spaces removed. Never fabricate a value to fill a required-looking field." + ) +``` + +**Why these fields:** each appears at most once per document (one cite per field → strong `provenance_verified` signal); all reconcile against something mammon already stores (IBAN → `Account.iban`, period → verified-range chain, closing_balance → next month's opening_balance and `StatementBalance`); schema is flat (no nested arrays where Ollama structured output tends to drift). + +## 8. Errors and warnings + +Error-code subset from spec §12.2 (reusing codes as-is where meaning is identical): + +| Code | Trigger | +|---|---| +| `IX_000_000` | `request_ix` is None | +| `IX_000_002` | No context (neither files nor texts) | +| `IX_000_004` | OCR required but no files provided | +| `IX_000_005` | File MIME type not supported | +| `IX_000_006` | PDF page-count cap exceeded | +| `IX_001_000` | `use_case` empty, or extraction context (OCR + texts) empty after setup | +| `IX_001_001` | Use case name not in `REGISTRY` | + +Warnings (non-fatal, appended to `response_ix.warning`): empty OCR result, provenance requested with `use_ocr=False`, unknown model falling back to default. + +## 9. Configuration (`AppConfig` via `pydantic-settings`) + +| Key env var | Default | Meaning | +|---|---|---| +| `IX_POSTGRES_URL` | `postgresql+asyncpg://infoxtractor:…@host.docker.internal:5431/infoxtractor` | Job store | +| `IX_OLLAMA_URL` | `http://host.docker.internal:11434` | LLM backend | +| `IX_DEFAULT_MODEL` | `gpt-oss:20b` | Fallback model | +| `IX_OCR_ENGINE` | `surya` | Adapter selector (only value in MVP) | +| `IX_TMP_DIR` | `/tmp/ix` | Download scratch | +| `IX_PIPELINE_WORKER_CONCURRENCY` | `1` | Worker semaphore cap | +| `IX_PIPELINE_REQUEST_TIMEOUT_SECONDS` | `2700` | Per-job timeout (45 min) | +| `IX_RENDER_MAX_PIXELS_PER_PAGE` | `75000000` | Per-page render cap | +| `IX_LOG_LEVEL` | `INFO` | | +| `IX_CALLBACK_TIMEOUT_SECONDS` | `10` | Webhook POST timeout | + +No Azure, OpenAI, or AWS variables — those paths do not exist in the codebase. + +## 10. Observability (minimal) + +- **Logs**: JSON-structured via `logging` + custom formatter. Every line carries `ix_id`, `client_id`, `request_id`, `use_case`. Steps emit `step_start` / `step_end` events with elapsed ms. +- **Timings**: every step's elapsed-seconds recorded in `response_ix.metadata.timings` (same shape as spec §2). +- **Traces**: OpenTelemetry span scaffolding present, no exporter wired. Drop-in later. +- **Prometheus**: deferred. + +## 11. Deployment + +- Repo: `goldstein/infoxtractor` on Forgejo, plus `server` bare-repo remote with `post-receive` hook mirroring mammon. +- Port 8994 (LAN-only via UFW; not exposed publicly — internal service). +- Postgres: new `infoxtractor` database on existing postgis container. +- Ollama reached via `host.docker.internal:11434`. +- Monitoring label: `infrastructure.web_url=http://192.168.68.42:8994`. +- Backup: `backup.enable=true`, `backup.type=postgres`, `backup.name=infoxtractor`. +- Dockerfile: CUDA-enabled base (`nvidia/cuda:12.4-runtime-ubuntu22.04` + Python 3.12) so Surya can use the 3090. CMD: `alembic upgrade head && uvicorn ix.app:create_app --factory --host 0.0.0.0 --port 8994`. + +## 12. Testing strategy + +Strict TDD — each unit is written test-first. + +1. **Unit tests** (fast, hermetic): every `Step`, `SegmentIndex`, provenance-verification normalizers, `OCRClient` contract, `GenAIClient` contract, error mapping. No DB, no Ollama, no network. +2. **Integration tests** (DB + fakes): pipeline end-to-end with stub `OCRClient` (replays canned OCR results) and stub `GenAIClient` (replays canned LLM JSON). Covers step wiring + transports + job lifecycle + callback success/failure + pg queue notify. Run against a real postgres service container in Forgejo Actions (mammon CI pattern). +3. **E2E smoke against deployed app**: `scripts/e2e_smoke.py` on the Mac calls `POST http://192.168.68.42:8994/jobs` with a redacted bank-statement fixture (`tests/fixtures/dkb_giro_2026_03.pdf`), polls `GET /jobs/{id}` until done, asserts: + - `status == "done"` + - `provenance.fields["result.closing_balance"].provenance_verified is True` + - `text_agreement is True` when Paperless-style texts are submitted + - Timings under 60 s + Runs after every `git push server main` as the deploy gate. If it fails, the commit is reverted before merging the deploy PR. + +## 13. Mammon integration (sketch — outside this spec's scope, owned by mammon) + +Belongs in a mammon-side follow-up spec. Captured here only so readers of ix know the MVP's first consumer. + +- Paperless poller keeps current behavior for format-matched docs. +- For `needs_parser` docs: submit to ix (`use_case="bank_statement_header"`, `files=[paperless_download_url]`, `texts=[paperless_content]`). +- ix job id recorded on the `Import` row. A new poller on the mammon side checks `GET /jobs/{id}` until done. +- Result is staged (new `pending_headers` table — not `StatementBalance`). A new "Investigate" panel surfaces staged headers with per-field `provenance_verified` + `text_agreement` flags. +- User confirms → write to `StatementBalance`. Over time, when a deterministic parser is added for the bank, compare ix's past extractions against the deterministic output to measure ix accuracy. + +## 14. Deferred from full spec (explicit) + +- Kafka transport (§15) +- Config Server (§9.1 in full spec, §10 here): use cases are in-repo for MVP +- Azure DI / Computer Vision OCR backends +- OpenAI, Anthropic, AWS Bedrock GenAI backends +- S3 adapter +- `use_vision` + vision scaling/detail +- Word-level provenance granularity +- `reasoning_effort` parameter routing +- Prometheus exporter (/metrics stays JSON for MVP) +- OTEL gRPC exporter (spans present, no exporter) +- Legacy aliases (`prompt_template_base`, `kwargs_use_case`) +- Second-opinion multi-model ensembling +- Schema `version` field +- Per-request rate limiting + +Every deferred item is additive: the `OCRClient` / `GenAIClient` / transport-adapter interfaces already leave the plug points, and the pipeline core is unaware of which implementation is in use. + +## 15. Implementation workflow (habit reminder) + +Every unit of work follows the cross-project habit: + +1. `git checkout -b feat/` +2. TDD: write failing test, write code, green, refactor +3. Commit in small logical chunks; update `AGENTS.md` / `README.md` / `docs/` in the same commit as the code +4. `git push forgejo feat/` +5. Create PR via Forgejo API +6. Wait for tests to pass +7. Merge +8. `git push server main` to deploy; run `scripts/e2e_smoke.py` against the live service + +Never skip hooks, never force-push main, never bypass tests.