Initial design: on-prem LLM extraction microservice MVP

Establishes ix as an async, on-prem, LLM-powered structured extraction
microservice. Full reference spec stays in docs/spec-core-pipeline.md;
MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue
transports in parallel, in-repo use cases, provenance-based reliability
signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md.

First use case: bank_statement_header (feeds mammon's needs_parser flow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dirk Riemann 2026-04-18 10:23:17 +02:00
commit 124403252d
5 changed files with 1378 additions and 0 deletions

17
.gitignore vendored Normal file
View file

@ -0,0 +1,17 @@
__pycache__/
*.pyc
*.pyo
*.egg-info/
.venv/
venv/
.env
.env.local
.pytest_cache/
.mypy_cache/
.ruff_cache/
htmlcov/
.coverage
dist/
build/
*.log
/tmp/

44
AGENTS.md Normal file
View file

@ -0,0 +1,44 @@
# InfoXtractor (ix)
Async, on-prem, LLM-powered structured information extraction microservice. Given a document (PDF, image) or text plus a use case (a Pydantic schema), returns a structured JSON result with per-field provenance (source page, bounding box, OCR segment).
Designed to be used by other on-prem services (e.g. mammon) as a reliable fallback / second opinion for format-specific deterministic parsers.
Status: design phase. Full reference spec at `docs/spec-core-pipeline.md`. MVP spec will live at `docs/superpowers/specs/`.
## Guiding Principles
- **On-prem always.** All LLM inference, OCR, and user-data processing run on the home server (192.168.68.42). No cloud APIs — OpenAI, Anthropic, Azure, AWS Bedrock/Textract, Google Document AI, Mistral, etc. are not to be used for user data or inference. LLM backend is Ollama (:11434); OCR runs locally (pluggable `OCRClient` interface, first engine: Surya on the RTX 3090); job state lives in local Postgres on the postgis container. The spec's references to Azure / AWS / OpenAI are examples to *replace*, not inherit.
- **Grounded extraction, not DB truth.** ix returns best-effort extracted fields with segment citations, provenance, and cross-OCR agreement signals. ix does *not* claim its output is DB-grade; the calling service (e.g. mammon) owns the reliability decision (reconcile against anchors, stage for review, compare to deterministic parsers).
- **Transport-agnostic pipeline core.** The pipeline (`RequestIX` → `ResponseIX`) knows nothing about HTTP, queues, or databases. Transport adapters (REST, Postgres queue, …) run in parallel alongside the core and all converge on one job store.
## Habits
- **Feature branches + PRs.** New work: `git checkout -b feat/<name>` → commit small, logical chunks → `git push forgejo feat/<name>` → create PR via Forgejo API → **wait for tests to pass** → merge → `git push server main` to deploy.
- **Keep documentation up to date in the same commit as the code.** `README.md`, `docs/`, and `AGENTS.md` update alongside the change. Unpushed / undocumented work is work that isn't done.
- **Deploy after merging.** `git push server main` rebuilds the Docker image via `post-receive` and restarts the container. Smoke-test the live service before walking away.
- **Never skip hooks** (`--no-verify`, etc.) without explicit user approval. Prefer creating new commits over amending. Never force-push `main`.
- **Forgejo**: repo at `http://192.168.68.42:3030/goldstein/infoxtractor` (to be created). Use basic auth with `FORGEJO_USR` / `FORGEJO_PSD` from `~/Projects/infrastructure/.env`, or an API token once issued for this repo.
## Tech Stack (MVP)
- **Language**: Python 3.12, asyncio
- **Web/REST**: FastAPI + uvicorn
- **OCR (pluggable)**: Surya OCR first (GPU, shares RTX 3090 with Ollama / Immich ML)
- **LLM**: Ollama at `192.168.68.42:11434`, structured outputs via JSON schema. Initial model candidate: `qwen2.5:32b` / `gpt-oss:20b`, configurable per use case
- **State**: Postgres on the shared `postgis` container (:5431), new `infoxtractor` database
- **Deployment**: Docker, `git push server main` → post-receive rebuild (pattern from other apps)
## Repository / Deploy
- Git remotes:
- `forgejo`: `ssh://git@192.168.68.42:2222/goldstein/infoxtractor.git` (source of truth / PRs)
- `server`: bare repo with `post-receive` rebuild hook (to be set up)
- Workflow: feat branch → `git push forgejo feat/name` → PR via Forgejo API → merge → `git push server main` to deploy
- Monitoring label: `infrastructure.web_url=http://192.168.68.42:<PORT>`
- Backup opt-in: `backup.enable=true` label on the container
## Related Projects
- **mammon** (`../mammon`) — first consumer. Uses ix as a fallback / second opinion for Paperless-imported bank statements where deterministic parsers don't match.
- **infrastructure** (`../infrastructure`) — server topology, deployment pattern, Ollama setup, shared `postgis` Postgres.

17
README.md Normal file
View file

@ -0,0 +1,17 @@
# InfoXtractor (ix)
Async, on-prem, LLM-powered structured information extraction microservice.
Given a document (PDF, image, text) and a named *use case*, ix returns a structured JSON result whose shape matches the use-case schema — together with per-field provenance (OCR segment IDs, bounding boxes, cross-OCR agreement flags) that let the caller decide how much to trust each extracted value.
**Status:** design phase. Implementation about to start.
- Full reference spec: [`docs/spec-core-pipeline.md`](docs/spec-core-pipeline.md) (aspirational; MVP is a strict subset)
- **MVP design:** [`docs/superpowers/specs/2026-04-18-ix-mvp-design.md`](docs/superpowers/specs/2026-04-18-ix-mvp-design.md)
- Agent / development notes: [`AGENTS.md`](AGENTS.md)
## Principles
- **On-prem always.** LLM = Ollama, OCR = local engines (Surya first). No OpenAI / Anthropic / Azure / AWS / cloud.
- **Grounded extraction, not DB truth.** ix returns best-effort fields + provenance; the caller decides what to trust.
- **Transport-agnostic pipeline core.** REST + Postgres-queue adapters in parallel on one job store.

912
docs/spec-core-pipeline.md Normal file
View file

@ -0,0 +1,912 @@
# Core Pipeline Specification — InfoXtractor
## 1. What the system does
InfoXtractor is an **async, LLM-powered document extraction pipeline**. Given one or more
documents (PDFs, images) or plain text and a *use case* that defines what to extract, it returns
a structured JSON result whose shape matches the use-case schema.
Optional capabilities that are composed at runtime:
| Capability | Trigger | What it adds |
|---|---|---|
| OCR | `options.ocr.use_ocr = true` (default) | Converts document pages to text via cloud OCR before passing to the LLM |
| Vision | `options.gen_ai.use_vision = true` | Also passes page images to the LLM as base64 |
| Provenance | `options.provenance.include_provenance = true` | Each extracted field is annotated with the page number, bounding box, and text snippet it came from |
| OCR-only | `options.ocr.ocr_only = true` | Returns OCR result without calling the LLM |
---
## 2. Top-level data contracts
### 2.1 RequestIX — pipeline input
```python
class RequestIX:
use_case: str # Name, URL, or Base64-encoded use-case definition
ix_client_id: str # Client identifier (used for auth + metrics)
request_id: str # Correlation key set by the caller; echoed in response
ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller
context: Context # What to process
options: Options # How to process it
version: Optional[int] # Schema version (default 2)
class Context:
files: List[str] # URLs to PDFs or images (S3, HTTP)
texts: List[str] # Raw text strings (treated as pages)
class Options:
ocr: OCROptions
gen_ai: GenAIOptions
provenance: ProvenanceOptions
```
**OCROptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `use_ocr` | bool | `True` | Run cloud OCR on files |
| `ocr_only` | bool | `False` | Stop after OCR; skip LLM |
| `service` | enum | `document_intelligence` | Which OCR backend (`document_intelligence` / `computer_vision`) |
| `include_geometries` | bool | `False` | Include raw bounding boxes in `ocr_result` |
| `include_ocr_text` | bool | `False` | Attach flat OCR text to `ocr_result.result.text` |
| `include_page_tags` | bool | `True` | Inject `<page file="…" number="…">` XML markers into OCR lines |
| `computer_vision_scaling_factor` | int 110 | `1` | Scale multiplier for CV-based OCR |
**GenAIOptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `gen_ai_model_name` | str | `gpt-4o` (from config) | LLM deployment name |
| `use_vision` | bool | `False` | Include page images in the LLM message |
| `vision_scaling_factor` | int 110 | `1` | Scale factor for rendered images |
| `vision_detail` | enum | `auto` | `auto` / `low` / `high` (OpenAI vision detail) |
| `reasoning_effort` | Optional[str] | `None` | Passed to reasoning models (e.g. `medium`, `high`) |
**ProvenanceOptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `include_provenance` | bool | `False` | Enable provenance tracking |
| `granularity` | enum | `line` | `line` (only supported today) or `word` |
| `include_bounding_boxes` | bool | `True` | Attach normalised coordinates to each source |
| `max_sources_per_field` | int ≥1 | `10` | Cap on sources returned per field |
| `source_type` | enum | `value_and_context` | `value`: value segments only; `value_and_context`: value + label segments |
| `min_confidence` | float 01 | `0.0` | Minimum per-field confidence (currently a no-op; hook for future) |
---
### 2.2 ResponseIX — pipeline output
```python
class ResponseIX:
use_case: Optional[str] # Echoed from RequestIX.use_case
use_case_name: Optional[str] # Clean name from use-case definition
ix_client_id: Optional[str]
request_id: Optional[str]
ix_id: Optional[str]
error: Optional[str] # Non-None means processing failed; pipeline stopped
warning: List[str] # Non-fatal issues encountered during processing
ix_result: IXResult # LLM extraction result
ocr_result: OCRResult # OCR output (may be empty if OCR skipped)
provenance: Optional[ProvenanceData] # Field-level source tracking
metadata: Metadata # Timings, hostname
class IXResult:
result: dict # Matches the use-case response schema
result_confidence: dict # Reserved; currently empty
meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name
class OCRResult:
result: OCRDetails # pages with lines + bounding boxes
meta_data: dict
class OCRDetails:
text: Optional[str] # Flat text (populated only if include_ocr_text=True)
pages: List[Page] # Structured OCR pages
class Page:
page_no: int
width: float # Page width in points
height: float # Page height in points
angle: float # Skew angle
unit: Optional[str]
lines: List[Line]
class Line:
text: Optional[str]
bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels)
words: Words
class Metadata:
timings: List[Dict] # Step-level timing records
processed_by: Optional[str] # Hostname
use_case_truncated: Optional[bool]
```
**`ResponseIX.context`** is an internal-only mutable object that accumulates data as it passes
through pipeline steps. It is **excluded from serialisation** (`exclude=True`) and must be
stripped before the response is sent to any external consumer.
---
## 3. The Step interface
Every pipeline stage implements a two-method async interface:
```python
class Step(ABC):
@abstractmethod
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
"""
Returns True → run process()
Returns False → silently skip this step
Raises → set ResponseIX.error and abort the pipeline
"""
@abstractmethod
async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX:
"""Execute the step; mutate response_ix; return it."""
```
**Key invariants:**
- Both `validate` and `process` receive the **same** `request_ix` and `response_ix` objects.
- `process` receives the `response_ix` as enriched by all previous steps (via `context`).
- If `validate` raises, the error string is written to `response_ix.error` and no further steps run.
- If `process` raises, same: error is captured and pipeline stops.
- A step that returns `False` from `validate` leaves `response_ix` unchanged and the pipeline continues.
---
## 4. Pipeline orchestration (`Pipeline`)
```python
class Pipeline:
steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()]
debug_mode: bool # Enables memory-usage logging and GC inspection
async def start(self, request_ix: RequestIX) -> ResponseIX:
# Wraps _execute_pipeline in an observability span
...
async def _execute_pipeline(self, request_ix, obs) -> ResponseIX:
response_ix = ResponseIX()
for step in self.steps:
# wrapped in a step-level observability span + Timer
success, response_ix = await self._execute_step(step, request_ix, response_ix)
if not success:
return response_ix # abort on first error
return response_ix
async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX):
valid = await step.validate(request_ix, response_ix)
if valid:
response_ix = await step.process(request_ix, response_ix)
return True, response_ix
# exceptions caught → response_ix.error set → return False
```
The pipeline itself holds **no I/O references** — it does not know about Kafka, HTTP, or
databases. The transport layer instantiates `Pipeline`, calls `await pipeline.start(request_ix)`,
and handles routing the returned `ResponseIX`.
---
## 5. Step 1 — SetupStep
**Purpose:** Validate the request, fetch external files, load the use-case definition, and
convert all inputs into an internal `pages` representation for downstream steps.
### 5.1 validate()
Raises if:
- `request_ix` is `None`
- Both `context.files` and `context.texts` are empty
### 5.2 process()
**a) Copy text context**
```python
response_ix.context.texts = list(request_ix.context.texts)
```
**b) Download files** (parallel, async)
Each URL in `request_ix.context.files` is downloaded to a local temp path (`/tmp/ix/`).
The result is a list of `File(file_path, mime_type)` objects stored in
`response_ix.context.files`.
Supported MIME types depend on the chosen OCR service; unsupported files raise immediately.
**c) Load use case**
The use case is fetched from the Config Server (see §9) by name, URL, or Base64.
On success, the two classes `UseCaseRequest` and `UseCaseResponse` are stored in
`response_ix.context.use_case_request` and `response_ix.context.use_case_response`.
`use_case_name` is extracted from `UseCaseRequest().use_case_name` and stored in
`response_ix.use_case_name` for downstream steps and the final response.
**d) Build pages** (parallel per document)
The ingestor registers every file and classifies its MIME type. Rendering is done per document:
- **PDF** (via PyMuPDF): one `Page` object per PDF page, holding `file_path`, `item_index` (0-based index into `request_ix.context.files`), `page_no`, `width`, `height`.
- **Image** (via PIL): one `Page` per frame. Multi-frame TIFFs produce multiple pages.
- **Text**: one `Page` per entry in `context.texts`.
Maximum pages per PDF: 100 (hard cap; raises on violation).
All pages are concatenated into a flat list stored in `response_ix.context.pages`.
A `DocumentIngestor` instance is stored in `response_ix.context.ingestion` so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning).
---
## 6. Step 2 — OCRStep
**Purpose:** Convert document pages to structured text with bounding boxes. Optionally build
a `SegmentIndex` for provenance tracking.
### 6.1 validate()
Returns `True` when **any** of these flags are set AND `response_ix.context.files` is non-empty:
- `use_ocr = True`
- `include_geometries = True`
- `include_ocr_text = True`
- `ocr_only = True`
If none of the above apply (text-only request), returns `False` → step is skipped.
Raises `IX_000_004` if `ocr_only`, `include_geometries`, or `include_ocr_text` is requested but
no files were provided.
### 6.2 process()
**a) Select OCR service**
```
service == "document_intelligence" → AzureDocumentIntelligence
service == "computer_vision" → AzureComputerVision
```
**b) `perform_ocr(request_ix, response_ix)`** (async, runs against all pages)
Populates `response_ix.ocr_result` with an `OCRResult` containing pages with lines and
raw bounding boxes (8-coordinate polygons, pixel coordinates).
**c) Inject page tags** (when `include_page_tags = True`)
For each OCR page a synthetic `Line` is prepended and appended:
```
<page file="{item_index}" number="{page_no}">
… original lines …
</page>
```
These tags are visible to the LLM as structural markers. The `SegmentIndex` builder
explicitly skips them.
**d) Build SegmentIndex** (when `include_provenance = True`)
```python
response_ix.context.segment_index = await SegmentIndex.build(
response_ix.ocr_result,
granularity, # line (default) or word
pages_metadata=response_ix.context.pages,
)
```
See §8 for the full SegmentIndex specification.
---
## 7. Step 3 — GenAIStep
**Purpose:** Call the LLM with the prepared context and parse its structured output into
`response_ix.ix_result`. Optionally map the LLM's segment citations to bounding boxes.
### 7.1 validate()
- Returns `False` when `ocr_only = True`.
- Raises when:
- `use_case` is empty
- `use_ocr = True` but no OCR text was produced
- neither `use_ocr` nor `use_vision` nor plain text context is available
- model is not found in Config Server
- `use_vision = True` but the model does not support vision
If `include_provenance = True` but `use_ocr = False`, a warning is added (provenance will be
empty) but processing continues.
### 7.2 process()
**a) Prepare system prompt**
```python
prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base
```
If `include_provenance = True`, append the segment-citation instruction (see §8.2).
**b) Build text context**
When provenance is on and a `SegmentIndex` exists:
```
[p1_l0] Line text
[p1_l1] Next line
```
Otherwise: join OCR page lines with `\n\n` between pages, then append any raw texts.
**c) Build visual context** (when `use_vision = True`)
For each page:
1. Render to PIL image at `vision_scaling_factor × original_size` (capped by `render_max_pixels_per_page`).
2. Convert TIFF frames to JPEG.
3. Base64-encode.
4. Wrap as a provider-neutral image part:
```json
{"type": "image", "media_type": "image/jpeg", "data": "<base64>", "detail": "auto"}
```
**d) Assemble LLM call parameters**
```python
request_kwargs = {
"model": model_name,
"temperature": from_model_settings,
"messages": [
{"role": "system", "content": prompt_string},
{"role": "user", "content": [text_part, *image_parts]},
],
# "reasoning_effort": "medium" ← only for reasoning-capable models
}
```
**e) Determine response schema**
- When `include_provenance = False`: schema is `UseCaseResponse` directly.
- When `include_provenance = True`: a dynamic wrapper is created at runtime:
```python
ProvenanceWrappedResponse = create_model(
"ProvenanceWrappedResponse",
result=(UseCaseResponse, Field(...)),
segment_citations=(List[SegmentCitation], Field(default_factory=list)),
)
```
The use-case file is **never modified**; the wrapper is ephemeral.
**f) Invoke LLM**
```python
result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings)
# result.parsed → the Pydantic model instance
# result.usage → GenAIUsage(prompt_tokens, completion_tokens)
# result.model_name → string
```
**g) Write results**
- `response_ix.ix_result.result = parsed.result.model_dump()` (provenance mode) or `parsed.model_dump()` (normal mode)
- `response_ix.ix_result.meta_data = {token_usage, model_name}`
**h) Build provenance** (when `include_provenance = True`)
```python
response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance(
extraction_result={"result": ix_result},
segment_citations=parsed.segment_citations,
segment_index=response_ix.context.segment_index,
max_sources_per_field=opts.max_sources_per_field,
min_confidence=opts.min_confidence,
include_bounding_boxes=opts.include_bounding_boxes,
source_type=opts.source_type,
)
```
See §8.4 for the provenance resolution algorithm.
### 7.3 Provider routing
```python
provider = model_settings["provider"] # "azure" or "bedrock"
if provider == "bedrock": service = AWSGenAI()
else: service = AzureOpenAI()
```
The `GenAIClient` interface is:
```python
class GenAIClient(ABC):
def setup_gen_ai_client(self, request_ix, model_settings) -> None: ...
async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ...
```
`AzureOpenAI` translates provider-neutral image parts to OpenAI's `image_url` content format.
It caches one `AsyncOpenAI` instance per `(base_url, api_key)` pair.
---
## 8. Step 4 — ResponseHandlerStep
**Purpose:** Shape the final payload before handing it to the transport layer.
### 8.1 validate()
Always returns `True`.
### 8.2 process()
1. **Attach OCR text** (`include_ocr_text = True`): concatenate all line texts per page,
join pages with `\n\n`, store in `ocr_result.result.text`.
2. **Strip geometries** (`include_geometries = False`, default): clear `ocr_result.result.pages`
and `ocr_result.meta_data`. The structured page data is large; callers who don't need it
should not receive it.
3. **Trim context**: delete `response_ix.context` entirely. The mutable internal context is
never serialised.
---
## 9. Provenance subsystem
### 9.1 SegmentIndex
Built from `ocr_result` after OCR completes. It assigns each non-tag OCR line a unique ID:
```
p{global_page_position}_l{line_index_within_page}
```
- `global_page_position` is 1-based and refers to the **flat pages list position**, not the
page number printed in the document. This guarantees uniqueness across multi-document requests.
- Page tag lines (`<page >` / `</page>`) are explicitly excluded.
Internal storage:
```python
_id_to_position: Dict[str, Dict] = {
"p1_l0": {
"page": 1, # global 1-based position
"bbox": BoundingBox(...), # normalised 0-1 coordinates
"text": "Invoice Number: INV-001",
"file_index": 0, # 0-based index into request.context.files
},
...
}
_ordered_ids: List[str] # insertion order, for deterministic prompt text
```
**Bounding box normalisation:**
```python
# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height)
[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H]
```
Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering.
**Prompt format** (`SegmentIndex.to_prompt_text()`):
```
[p1_l0] Invoice Number: INV-001
[p1_l1] Date: 2024-01-15
[p1_l2] Total: CHF 1234.00
```
Extra plain texts (from `context.texts`) are appended untagged.
**O(1) lookup:**
```python
position = segment_index.lookup_segment("p1_l0") # None if unknown
```
### 9.2 SegmentCitation (LLM output model)
The LLM is instructed to populate `segment_citations` in its structured output:
```python
class SegmentCitation(BaseModel):
field_path: str # e.g. "result.invoice_number"
value_segment_ids: List[str] # segments containing the extracted value
context_segment_ids: List[str] # surrounding label/anchor segments (may be empty)
```
**System prompt addition (injected by GenAIStep when provenance is on):**
```
For each extracted field, you must also populate the `segment_citations` list.
Each entry maps one field to the document segments that were its source.
Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number').
Use two separate segment ID lists:
- `value_segment_ids`: segment IDs whose text directly contains the extracted value
(e.g. ['p1_l4'] for the line containing 'INV-001').
- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you
identify the field but does not contain the value itself
(e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label.
Only use segment IDs that appear in the document text.
Omit fields for which you cannot identify a source segment.
```
### 9.3 ProvenanceData (response model)
```python
class ProvenanceData(BaseModel):
fields: Dict[str, FieldProvenance] # field_path → provenance
quality_metrics: Dict[str, Any] # see below
segment_count: Optional[int]
granularity: Optional[str] # "line" or "word"
class FieldProvenance(BaseModel):
field_name: str # last segment of field_path
field_path: str # e.g. "result.invoice_number"
value: Any # actual extracted value (resolved from ix_result.result)
sources: List[ExtractionSource]
confidence: Optional[float] # None today; reserved for future
class ExtractionSource(BaseModel):
page_number: int # global 1-based position
file_index: Optional[int] # 0-based index into request.context.files
bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False
text_snippet: str # actual OCR text at this location
relevance_score: float # always 1.0 (exact segment ID match)
segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering
class BoundingBox(BaseModel):
coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised
```
**quality_metrics:**
```json
{
"fields_with_provenance": 8,
"total_fields": 10,
"coverage_rate": 0.8,
"invalid_references": 2
}
```
- `coverage_rate` = fields with at least one source ÷ total leaf fields in result
- `invalid_references` = segment IDs cited by LLM that were not found in SegmentIndex
### 9.4 Provenance resolution algorithm
```python
for citation in segment_citations:
if source_type == VALUE:
seg_ids = citation.value_segment_ids
else:
seg_ids = citation.value_segment_ids + citation.context_segment_ids
sources = []
for seg_id in seg_ids[:max_sources_per_field]:
pos = segment_index.lookup_segment(seg_id)
if pos is None:
invalid_ref_count += 1
continue
sources.append(ExtractionSource(
page_number=pos["page"],
file_index=pos["file_index"],
bounding_box=pos["bbox"] if include_bounding_boxes else None,
text_snippet=pos["text"],
relevance_score=1.0,
segment_id=seg_id,
))
if not sources:
continue # skip fields that resolve to nothing
# Resolve value: traverse result dict via dot-notation field_path
# e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"]
value = resolve_nested_path(extraction_result, citation.field_path)
provenance[citation.field_path] = FieldProvenance(
field_name=citation.field_path.split(".")[-1],
field_path=citation.field_path,
value=value,
sources=sources,
confidence=None,
)
```
Array notation is normalised before traversal: `"items[0].name"``"items.0.name"`.
---
## 10. Use-case definition contract
A use case is a Python module (or dynamically loaded class) that defines two Pydantic models.
The Config Server serves them by name; the pipeline loads them via `config_server.use_cases.load_and_validate_use_case(use_case)`.
### UseCaseRequest
```python
class UseCaseRequest(BaseModel):
use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name
system_prompt: str # System prompt sent to the LLM
# Legacy: prompt_template_base (deprecated alias for system_prompt)
# Legacy: kwargs_use_case (deprecated; triggers format(**kwargs))
```
### UseCaseResponse
A Pydantic model whose fields define the extraction schema. The LLM is instructed to return
JSON matching this schema (via `chat.completions.parse(response_format=UseCaseResponse)`).
**Example:**
```python
class InvoiceResponse(BaseModel):
invoice_number: str
invoice_date: str
total_amount: float
line_items: List[LineItem]
class LineItem(BaseModel):
description: str
quantity: int
unit_price: float
```
The schema is passed as the `response_format` argument to the OpenAI structured-output API.
When provenance is on, it is wrapped in `ProvenanceWrappedResponse` at runtime (§7.2e).
---
## 11. LLM invocation contract
### 11.1 Provider-neutral request_kwargs
```python
request_kwargs = {
"model": str, # deployment name
"temperature": Optional[float], # from model_settings; None for reasoning models
"messages": [
{"role": "system", "content": str},
{"role": "user", "content": List[ContentPart]},
],
"reasoning_effort": Optional[str], # only present when supported by model
}
ContentPart = (
{"type": "text", "text": str}
| {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str}
)
```
### 11.2 GenAIInvocationResult
```python
@dataclass
class GenAIInvocationResult:
parsed: Any # Parsed Pydantic model instance (matches response_schema)
usage: GenAIUsage # prompt_tokens, completion_tokens
model_name: str
```
### 11.3 Model settings (from Config Server)
```json
{
"provider": "azure",
"temperature": 0.0,
"vision_capability": true,
"supports_reasoning_efforts": ["low", "medium", "high"],
"default_reasoning_effort": "medium"
}
```
- `temperature` is `None` for reasoning models (OpenAI ignores it and it must be omitted).
- `vision_capability` must be `true` for `use_vision = True` requests to be accepted.
- `supports_reasoning_efforts` is the allowlist; requests for unsupported values fall back to
`default_reasoning_effort` with a warning.
---
## 12. Error model
### 12.1 IXException
All domain errors are wrapped in `IXException(message_or_Error_enum)`. The pipeline catches
any exception from `validate()` or `process()`, writes `str(e)` to `response_ix.error`, and
stops.
### 12.2 Named error codes
| Code | Trigger |
|---|---|
| `IX_000_000` | `request_ix` is `None` |
| `IX_000_002` | No context (neither files nor texts) |
| `IX_000_004` | OCR required but no files provided |
| `IX_000_005` | File MIME type not supported by chosen OCR service |
| `IX_001_000` | `use_case` is empty or missing |
| `IX_001_001` | Use case could not be loaded or validated |
| `IX_003_003` | `use_vision=True` but model has no vision capability |
| `IX_004_001` | Response too large for transport (e.g. Kafka max message size) |
### 12.3 Warnings
Non-fatal issues are appended to `response_ix.warning` and logged. Examples:
- Vision requested but no renderable documents found
- Image scale capped due to pixel limit
- Provenance requested with `use_ocr=False`
- Word document contains embedded images that will be skipped
- `use_case_name` not set in use-case definition
---
## 13. Configuration (AppConfig)
All settings are loaded from environment variables (with `.env` fallback via `pydantic-settings`).
| Category | Key env vars |
|---|---|
| Azure OCR | `DOCUMENT_INTELLIGENCE_ENDPOINT`, `DOCUMENT_INTELLIGENCE_KEY`, `COMPUTER_VISION_ENDPOINT`, `COMPUTER_VISION_SUBSCRIPTION_KEY` |
| Azure OpenAI | `OPENAI_BASE_URL`, `OPENAI_API_KEY` |
| LLM defaults | `DEFAULT_LLM` (default: `gpt-4o`) |
| Config Server | `CONFIG_SERVER_BASE_URL` (default: `http://ix-config:8998/`) |
| Pipeline | `PIPELINE_WORKER_CONCURRENCY` (110), `PIPELINE_REQUEST_TIMEOUT_SECONDS` (default: 2700) |
| Rendering | `RENDER_MAX_PIXELS_PER_PAGE` (default: 75,000,000) |
| AWS | `S3_BUCKET`, `S3_OBJECT_PREFIX`, `AWS_REGION`, `TARGET_ROLE` |
| Kafka | `PROCESSING_TOPIC`, `KAFKA_BOOTSTRAP_SERVER`, … |
| Observability | `LOG_LEVEL`, `OTEL_EXPORTER_OTLP_ENDPOINT`, `PYROSCOPE_SERVER_ADDRESS` |
---
## 14. Observability
### 14.1 Logging
Every log call carries `ix_id` as contextual metadata. The custom logger exposes:
```python
logger.info_ix(msg, request_ix)
logger.warning_ix(msg, request_ix)
logger.error_ix(msg, request_ix)
```
If `ix_id` is not yet set on `request_ix`, the logger generates one lazily on first use.
Step timing is recorded via a `Timer` context manager; results are stored in
`response_ix.metadata.timings`.
### 14.2 Tracing (OpenTelemetry)
- **Pipeline span** wraps the entire `_execute_pipeline()` call (attributes: `request_id`, `use_case`, `client_id`).
- **Step span** wraps each individual step.
- Errors are recorded on the span with `mark_failed(error_message)`.
- Exported via gRPC to `OTEL_EXPORTER_OTLP_ENDPOINT`.
### 14.3 Metrics (Prometheus)
| Metric | Labels |
|---|---|
| `pipeline_requests_total` | `client_id` |
| LLM token counter | `model`, `client_id`, `use_case`, `status` |
| LLM latency histogram | `model`, `client_id` |
| Extraction quality | `client_id`, `use_case`, `model`, `reasoning_effort` |
Exposed on the health port (default 9009).
---
## 15. Transport adapter contract
The pipeline is transport-agnostic. Any delivery mechanism must:
1. **Deserialise** the incoming payload into a `RequestIX` instance.
2. **Assign `ix_id`** (a random 16-character hex string) if not already present.
3. **Call** `await Pipeline().start(request_ix)``response_ix`.
4. **Serialise** `response_ix` (use `.model_dump(by_alias=True)` to honour `serialization_alias` on `metadata`).
5. **Deliver** the serialised response to the caller via the appropriate channel.
Optionally:
- **Persist** `request_ix` and `response_ix` to a durable store (S3, database) keyed by `ix_id`.
- **Apply a timeout**: wrap the `await pipeline.start(...)` call and write a timeout error to `response_ix.error` on expiry.
### Kafka adapter (existing)
- Consumes from `ch-infoxtractor.processing.v2`; produces to `ch-infoxtractor.outbound.v2` and client-specific topics.
- Concurrency controlled via asyncio semaphore (`PIPELINE_WORKER_CONCURRENCY`).
- Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets).
- Graceful shutdown: drains in-flight messages before stopping.
### FastAPI adapter (example)
```python
@router.post("/extract")
async def extract(body: RequestIX) -> ResponseIX:
body.ix_id = body.ix_id or secrets.token_hex(8)
response = await pipeline.start(body)
return response
```
### Database adapter (example)
```python
async def process_job(job_id: str, db: AsyncSession):
row = await db.get(Job, job_id)
request_ix = RequestIX.model_validate_json(row.payload)
request_ix.ix_id = row.ix_id
response_ix = await pipeline.start(request_ix)
row.result = response_ix.model_dump_json(by_alias=True)
row.status = "done" if not response_ix.error else "error"
await db.commit()
```
---
## 16. End-to-end data flow summary
```
[Transport layer]
↓ RequestIX (use_case, context.files/texts, options, ix_id)
[Pipeline.start()]
[SetupStep]
• Download files → local paths → response_ix.context.files (File objects)
• Load use case from Config Server → response_ix.context.use_case_request/response
• Split PDFs / load images / wrap texts → response_ix.context.pages (flat list)
[OCRStep] (skipped if no files or use_ocr=False)
• Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox)
• Inject <page> XML markers → lines prepended/appended in ocr_result.pages
• Build SegmentIndex (if provenance) → response_ix.context.segment_index
[GenAIStep] (skipped if ocr_only=True)
• Build system prompt (+ citation hint if provenance)
• Build user content: segment-tagged text OR plain OCR text + optional base64 images
• Determine response schema (wrapped if provenance)
• Call LLM via GenAIClient.invoke()
• Write ix_result.result + meta_data
• Resolve segment citations → ProvenanceData (if provenance)
[ResponseHandlerStep] (always runs)
• Attach flat OCR text (if include_ocr_text)
• Strip bounding boxes (if not include_geometries)
• Delete response_ix.context (never serialised)
[ResponseIX]
ix_result.result ← structured extraction matching UseCaseResponse schema
ocr_result ← OCR details (empty pages unless include_geometries)
provenance ← field-level source map (None unless include_provenance)
error / warning ← diagnostics
metadata ← timings, hostname
[Transport layer]
Serialise → deliver to caller
Persist (optional) → keyed by ix_id
```
---
## 17. Key design decisions
| Decision | Rationale |
|---|---|
| **Two-method Step interface** (`validate` + `process`) | Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop |
| **Mutable `context` on ResponseIX** | Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; `context` is excluded from serialisation so it never leaks |
| **Single LLM call for provenance** | The schema is wrapped at runtime to add `segment_citations`; no second round-trip to the LLM is needed |
| **Segment IDs instead of fuzzy text matching** | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates |
| **Global page position for segment IDs** | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce `p1_l*` and `p2_l*`, not the same IDs) |
| **Provider-neutral `request_kwargs`** | The `GenAIClient` interface accepts a dict; provider-specific translation (e.g. OpenAI `image_url` format) happens inside each adapter, not in the step |
| **Normalised bounding boxes (01)** | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates |
| **Transport-agnostic pipeline** | Kafka, FastAPI, and DB adapters all call the same `Pipeline.start()` |
| **Config Server for use cases** | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline |

View file

@ -0,0 +1,388 @@
# InfoXtractor (ix) MVP — Design
Date: 2026-04-18
Reference: `docs/spec-core-pipeline.md` (full, aspirational spec — MVP is a strict subset)
Status: Design approved (sections 18 walked through and accepted 2026-04-18)
## 0. One-paragraph summary
ix is an on-prem, async, LLM-powered microservice that extracts structured JSON from documents (PDFs, images, text) given a named *use case* (a Pydantic schema + system prompt). It returns the extracted fields together with per-field provenance (OCR segment IDs, bounding boxes, extracted-value agreement flags) that let calling services decide how much to trust each value. The MVP ships one use case (`bank_statement_header`), one OCR engine (Surya, pluggable), one LLM backend (Ollama, pluggable), and two transports in parallel (REST with optional webhook callback + a Postgres queue). Cloud services are explicitly forbidden. The first consumer is mammon, which uses ix as a fallback for `needs_parser` documents.
## 1. Guiding principles
- **On-prem always.** No OpenAI, Anthropic, Azure (DI/CV/OpenAI), AWS (Bedrock/Textract), Google Document AI, Mistral, etc. LLM = Ollama (:11434). OCR = local engines only. Secrets never leave the home server. The spec's cloud references are examples to replace, not inherit.
- **Grounded extraction, not DB truth.** ix returns best-effort fields with segment citations, provenance verification, and cross-OCR agreement flags. ix does *not* claim DB-grade truth. The reliability decision (trust / stage for review / reject) belongs to the caller.
- **Transport-agnostic pipeline core.** The pipeline (`RequestIX` → `ResponseIX`) knows nothing about HTTP, queues, or databases. Adapters (REST, Postgres queue) run alongside the core; both converge on one shared job store.
- **YAGNI for all spec features the MVP doesn't need.** Kafka, Config Server, Azure/AWS clients, vision, word-level provenance, reasoning-effort routing, Prometheus/OTEL exporters: deferred. Architecture leaves the interfaces so they can be added without touching the pipeline core.
## 2. Architecture
```
┌──────────────────────────────────────────────────────────────────┐
│ infoxtractor container (Docker on 192.168.68.42, port 8994) │
│ │
│ ┌──────────────────┐ ┌──────────────────────────┐ │
│ │ rest_adapter │ │ pg_queue_adapter │ │
│ │ (FastAPI) │ │ (asyncio worker) │ │
│ │ POST /jobs │ │ LISTEN ix_jobs_new + │ │
│ │ GET /jobs/{id} │ │ SELECT ... FOR UPDATE │ │
│ │ + callback_url │ │ SKIP LOCKED │ │
│ └────────┬─────────┘ └────────┬─────────────────┘ │
│ │ │ │
│ └──────────┬──────────────┘ │
│ ▼ │
│ ┌────────────────┐ │
│ │ ix_jobs table │ ── postgis :5431, DB=infoxtractor│
│ └────────┬───────┘ │
│ ▼ │
│ ┌─────────────────────────────┐ │
│ │ Pipeline core (spec §3§4) │ │
│ │ │ │
│ │ SetupStep → OCRStep → │ │
│ │ GenAIStep → ReliabilityStep│ │
│ │ → ResponseHandler │ │
│ │ │ │
│ │ Uses: OCRClient (Surya), │ │
│ │ GenAIClient (Ollama),│ │
│ │ UseCaseRegistry │ │
│ └─────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────┘
│ ▲
▼ │
host.docker.internal:11434 mammon or any on-prem caller —
Ollama (gpt-oss:20b default) polls GET /jobs/{id} until done
```
**Key shapes:**
- Spec's four steps + a new fifth: `ReliabilityStep` runs between `GenAIStep` and `ResponseHandlerStep`, computes per-field `provenance_verified` and `text_agreement` flags. Isolated so callers and tests can reason about reliability signals independently.
- Single worker at MVP (`PIPELINE_WORKER_CONCURRENCY=1`). Ollama + Surya both want the GPU serially.
- Two transports, one job store. REST is the primary; pg queue is scaffolded, uses the same table, same lifecycle.
- Use case registry in-repo (`ix/use_cases/__init__.py`); adding a new use case = new module + one registry line.
## 3. Data contracts
Subset of spec §2 / §9.3. Dropped fields are no-ops under the MVP's feature set.
### RequestIX
```python
class RequestIX(BaseModel):
use_case: str # registered name, e.g. "bank_statement_header"
ix_client_id: str # caller tag for logs/metrics, e.g. "mammon"
request_id: str # caller's correlation id; echoed back
ix_id: Optional[str] # transport-assigned short hex id
context: Context
options: Options = Options()
callback_url: Optional[str] # optional webhook delivery (one-shot, no retry)
class Context(BaseModel):
files: list[str] = [] # URLs or file:// paths
texts: list[str] = [] # extra text (e.g. Paperless OCR output)
class Options(BaseModel):
ocr: OCROptions = OCROptions()
gen_ai: GenAIOptions = GenAIOptions()
provenance: ProvenanceOptions = ProvenanceOptions()
class OCROptions(BaseModel):
use_ocr: bool = True
ocr_only: bool = False
include_ocr_text: bool = False
include_geometries: bool = False
service: Literal["surya"] = "surya" # kept so the adapter point is visible
class GenAIOptions(BaseModel):
gen_ai_model_name: Optional[str] = None # None → use-case default → IX_DEFAULT_MODEL
class ProvenanceOptions(BaseModel):
include_provenance: bool = True # default ON (reliability is the point)
max_sources_per_field: int = 10
```
**Dropped from spec (no-ops under MVP):** `OCROptions.computer_vision_scaling_factor`, `include_page_tags` (always on), `GenAIOptions.use_vision`/`vision_scaling_factor`/`vision_detail`/`reasoning_effort`, `ProvenanceOptions.granularity`/`include_bounding_boxes`/`source_type`/`min_confidence`, `RequestIX.version`.
### ResponseIX
Identical to spec §2.2 except `FieldProvenance` gains two fields:
```python
class FieldProvenance(BaseModel):
field_name: str
field_path: str
value: Any
sources: list[ExtractionSource]
confidence: Optional[float] = None # reserved; always None in MVP
provenance_verified: bool # NEW: cited segment actually contains value (normalized)
text_agreement: Optional[bool] # NEW: value appears in RequestIX.context.texts; None if no texts
```
`quality_metrics` gains two counters: `verified_fields`, `text_agreement_fields`.
### Job envelope (in `ix_jobs` table; returned by REST)
```python
class Job(BaseModel):
job_id: UUID
ix_id: str
client_id: str
request_id: str
status: Literal["pending", "running", "done", "error"]
request: RequestIX
response: Optional[ResponseIX]
callback_url: Optional[str]
callback_status: Optional[Literal["pending", "delivered", "failed"]]
attempts: int = 0
created_at: datetime
started_at: Optional[datetime]
finished_at: Optional[datetime]
```
## 4. Job store
```sql
CREATE DATABASE infoxtractor; -- on the existing postgis container
CREATE TABLE ix_jobs (
job_id UUID PRIMARY KEY,
ix_id TEXT NOT NULL,
client_id TEXT NOT NULL,
request_id TEXT NOT NULL,
status TEXT NOT NULL,
request JSONB NOT NULL,
response JSONB,
callback_url TEXT,
callback_status TEXT,
attempts INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
finished_at TIMESTAMPTZ
);
CREATE INDEX ix_jobs_status_created ON ix_jobs (status, created_at) WHERE status = 'pending';
CREATE INDEX ix_jobs_client_request ON ix_jobs (client_id, request_id);
-- Postgres NOTIFY channel used by the pg_queue_adapter: 'ix_jobs_new'
```
Callers that prefer direct SQL (the `pg_queue_adapter` contract): insert a row with `status='pending'`, then `NOTIFY ix_jobs_new, '<job_id>'`. The worker also falls back to a 10 s poll so a missed notify or ix restart doesn't strand a job.
## 5. REST surface
| Method | Path | Purpose |
|---|---|---|
| `POST` | `/jobs` | Body = `RequestIX` (+ optional `callback_url`). → `201 {job_id, ix_id, status: "pending"}`. Idempotent on `(ix_client_id, request_id)` — same pair returns the existing `job_id` with `200`. |
| `GET` | `/jobs/{job_id}` | → full `Job`. Source of truth regardless of submission path or callback outcome. |
| `GET` | `/jobs?client_id=…&request_id=…` | Lookup-by-correlation (caller idempotency helper). Returns latest match or `404`. |
| `GET` | `/healthz` | `{ollama: ok/fail, postgres: ok/fail, ocr: ok/fail}`. Used by `infrastructure` monitoring dashboard. |
| `GET` | `/metrics` | Counters: `jobs_pending`, `jobs_running`, `jobs_done_24h`, `jobs_error_24h`, per-use-case avg seconds. Plain JSON, no Prometheus format for MVP. |
**Callback delivery** (when `callback_url` is set): one POST of the full `Job` body, 10 s timeout. 2xx → `callback_status='delivered'`. Anything else → `'failed'`. No retry. Callers always have `GET /jobs/{id}` as the authoritative fallback.
## 6. Pipeline steps
Interface per spec §3 (`async validate` + `async process`). Pipeline orchestration per spec §4 (first error aborts; each step wrapped in a `Timer` landing in `Metadata.timings`).
### SetupStep
- **validate**: `request_ix` non-null; `context.files` or `context.texts` non-empty.
- **process**:
- Copy `request_ix.context.texts``response_ix.context.texts`.
- Download each URL in `context.files` to `/tmp/ix/<ix_id>/` in parallel. MIME detection via `python-magic`. Supported: PDF, PNG, JPEG, TIFF. Unsupported → `IX_000_005`.
- Load use case: `request_cls, response_cls = REGISTRY[request_ix.use_case]`. Store instances in `response_ix.context.use_case_request` / `use_case_response`. Echo `use_case_request.use_case_name``response_ix.use_case_name`.
- Build flat `response_ix.context.pages`: one entry per PDF page (via PyMuPDF), one per image frame, one per text entry. Hard cap 100 pages/PDF → `IX_000_006` on violation.
### OCRStep
- **validate**: returns `True` iff `(use_ocr or ocr_only or include_geometries or include_ocr_text) and context.files`. Otherwise `False` → step skipped (text-only requests).
- **process**: `ocr_result = await OCRClient.ocr(context.pages)``response_ix.ocr_result`. Always inject `<page file="{item_index}" number="{page_no}">` tags (simplifies grounding). If `include_provenance`: build `SegmentIndex` (line granularity, normalized bboxes 0-1) and store in `context.segment_index`.
- **OCRClient interface**:
```python
class OCRClient(Protocol):
async def ocr(self, pages: list[Page]) -> OCRResult: ...
```
MVP implementation: `SuryaOCRClient` (GPU via `surya-ocr` PyPI package, CUDA on the RTX 3090).
### GenAIStep
- **validate**: `ocr_only``False` (skip). Use case must exist. OCR text or `context.texts` must be non-empty (else `IX_001_000`).
- **process**:
- System prompt = `use_case_request.system_prompt`. If `include_provenance`: append spec §9.2 citation instruction verbatim.
- User text: segment-tagged (`[p1_l0] …`) when provenance is on; plain concatenated OCR + texts otherwise.
- Response schema: `UseCaseResponse` directly, or the dynamic `ProvenanceWrappedResponse(result=..., segment_citations=...)` per spec §7.2e when provenance is on.
- Model: `request_ix.options.gen_ai.gen_ai_model_name``use_case_request.default_model``IX_DEFAULT_MODEL`.
- Call `GenAIClient.invoke(request_kwargs, response_schema)`; parsed model → `ix_result.result`, usage + model_name → `ix_result.meta_data`.
- If provenance: call `ProvenanceUtils.map_segment_refs_to_provenance(...)` per spec §9.4, write `response_ix.provenance`.
- **GenAIClient interface**:
```python
class GenAIClient(Protocol):
async def invoke(self, request_kwargs: dict, response_schema: type[BaseModel]) -> GenAIInvocationResult: ...
```
MVP implementation: `OllamaClient``POST http://host.docker.internal:11434/api/chat` with `format = <JSON schema from Pydantic>` (Ollama structured outputs).
### ReliabilityStep (new; runs when `include_provenance` is True)
For each `FieldProvenance` in `response_ix.provenance.fields`:
- **`provenance_verified`**: for each cited segment, compare `text_snippet` to the extracted `value` after normalization (see below). If any cited segment agrees → `True`. Else `False`.
- **`text_agreement`**: if `request_ix.context.texts` is empty → `None`. Else run the same comparison against the concatenated texts → `True` / `False`.
**Normalization rules** (cheap, language-neutral, applied to both sides before `in`-check):
- Strings: Unicode NFKC, casefold, collapse whitespace, strip common punctuation.
- Numbers (`int`, `float`, `Decimal` values): digits-and-sign only; strip currency symbols, thousands separators, decimal-separator variants (`.`/`,`); require exact match to 2 decimal places for amounts.
- Dates: parse to ISO `YYYY-MM-DD`; compare as strings. Accept common German / Swiss / US formats.
- IBANs: uppercase, strip spaces.
- Very short values (≤ 2 chars, or numeric |value| < 10): `text_agreement` skipped (returns `None`) too noisy to be a useful signal.
Records are mutations to the provenance structure only; does **not** drop fields. Caller sees every extracted field + the flags.
Writes `quality_metrics.verified_fields` and `quality_metrics.text_agreement_fields` summary counts.
### ResponseHandlerStep
Per spec §8, unchanged. Attach flat OCR text when `include_ocr_text`; strip `ocr_result.pages` unless `include_geometries`; delete `context` before serialization.
## 7. Use case registry
```
ix/use_cases/
__init__.py # REGISTRY: dict[str, tuple[type[UseCaseRequest], type[UseCaseResponse]]]
bank_statement_header.py
```
Adding a use case = new module exporting a `Request(BaseModel)` (`use_case_name`, `default_model`, `system_prompt`) and a `UseCaseResponse(BaseModel)`, then one `REGISTRY["<name>"] = (Request, UseCaseResponse)` line.
### First use case: `bank_statement_header`
```python
class BankStatementHeader(BaseModel):
bank_name: str
account_iban: Optional[str]
account_type: Optional[Literal["checking", "credit", "savings"]]
currency: str
statement_date: Optional[date]
statement_period_start: Optional[date]
statement_period_end: Optional[date]
opening_balance: Optional[Decimal]
closing_balance: Optional[Decimal]
class Request(BaseModel):
use_case_name: str = "Bank Statement Header"
default_model: str = "gpt-oss:20b"
system_prompt: str = (
"You extract header metadata from a single bank or credit-card statement. "
"Return only facts that appear in the document; leave a field null if uncertain. "
"Balances must use the document's numeric format (e.g. '1234.56' or '-123.45'); "
"do not invent a currency symbol. Account type: 'checking' for current/Giro accounts, "
"'credit' for credit-card statements, 'savings' otherwise. Always return the IBAN "
"with spaces removed. Never fabricate a value to fill a required-looking field."
)
```
**Why these fields:** each appears at most once per document (one cite per field → strong `provenance_verified` signal); all reconcile against something mammon already stores (IBAN → `Account.iban`, period → verified-range chain, closing_balance → next month's opening_balance and `StatementBalance`); schema is flat (no nested arrays where Ollama structured output tends to drift).
## 8. Errors and warnings
Error-code subset from spec §12.2 (reusing codes as-is where meaning is identical):
| Code | Trigger |
|---|---|
| `IX_000_000` | `request_ix` is None |
| `IX_000_002` | No context (neither files nor texts) |
| `IX_000_004` | OCR required but no files provided |
| `IX_000_005` | File MIME type not supported |
| `IX_000_006` | PDF page-count cap exceeded |
| `IX_001_000` | `use_case` empty, or extraction context (OCR + texts) empty after setup |
| `IX_001_001` | Use case name not in `REGISTRY` |
Warnings (non-fatal, appended to `response_ix.warning`): empty OCR result, provenance requested with `use_ocr=False`, unknown model falling back to default.
## 9. Configuration (`AppConfig` via `pydantic-settings`)
| Key env var | Default | Meaning |
|---|---|---|
| `IX_POSTGRES_URL` | `postgresql+asyncpg://infoxtractor:…@host.docker.internal:5431/infoxtractor` | Job store |
| `IX_OLLAMA_URL` | `http://host.docker.internal:11434` | LLM backend |
| `IX_DEFAULT_MODEL` | `gpt-oss:20b` | Fallback model |
| `IX_OCR_ENGINE` | `surya` | Adapter selector (only value in MVP) |
| `IX_TMP_DIR` | `/tmp/ix` | Download scratch |
| `IX_PIPELINE_WORKER_CONCURRENCY` | `1` | Worker semaphore cap |
| `IX_PIPELINE_REQUEST_TIMEOUT_SECONDS` | `2700` | Per-job timeout (45 min) |
| `IX_RENDER_MAX_PIXELS_PER_PAGE` | `75000000` | Per-page render cap |
| `IX_LOG_LEVEL` | `INFO` | |
| `IX_CALLBACK_TIMEOUT_SECONDS` | `10` | Webhook POST timeout |
No Azure, OpenAI, or AWS variables — those paths do not exist in the codebase.
## 10. Observability (minimal)
- **Logs**: JSON-structured via `logging` + custom formatter. Every line carries `ix_id`, `client_id`, `request_id`, `use_case`. Steps emit `step_start` / `step_end` events with elapsed ms.
- **Timings**: every step's elapsed-seconds recorded in `response_ix.metadata.timings` (same shape as spec §2).
- **Traces**: OpenTelemetry span scaffolding present, no exporter wired. Drop-in later.
- **Prometheus**: deferred.
## 11. Deployment
- Repo: `goldstein/infoxtractor` on Forgejo, plus `server` bare-repo remote with `post-receive` hook mirroring mammon.
- Port 8994 (LAN-only via UFW; not exposed publicly — internal service).
- Postgres: new `infoxtractor` database on existing postgis container.
- Ollama reached via `host.docker.internal:11434`.
- Monitoring label: `infrastructure.web_url=http://192.168.68.42:8994`.
- Backup: `backup.enable=true`, `backup.type=postgres`, `backup.name=infoxtractor`.
- Dockerfile: CUDA-enabled base (`nvidia/cuda:12.4-runtime-ubuntu22.04` + Python 3.12) so Surya can use the 3090. CMD: `alembic upgrade head && uvicorn ix.app:create_app --factory --host 0.0.0.0 --port 8994`.
## 12. Testing strategy
Strict TDD — each unit is written test-first.
1. **Unit tests** (fast, hermetic): every `Step`, `SegmentIndex`, provenance-verification normalizers, `OCRClient` contract, `GenAIClient` contract, error mapping. No DB, no Ollama, no network.
2. **Integration tests** (DB + fakes): pipeline end-to-end with stub `OCRClient` (replays canned OCR results) and stub `GenAIClient` (replays canned LLM JSON). Covers step wiring + transports + job lifecycle + callback success/failure + pg queue notify. Run against a real postgres service container in Forgejo Actions (mammon CI pattern).
3. **E2E smoke against deployed app**: `scripts/e2e_smoke.py` on the Mac calls `POST http://192.168.68.42:8994/jobs` with a redacted bank-statement fixture (`tests/fixtures/dkb_giro_2026_03.pdf`), polls `GET /jobs/{id}` until done, asserts:
- `status == "done"`
- `provenance.fields["result.closing_balance"].provenance_verified is True`
- `text_agreement is True` when Paperless-style texts are submitted
- Timings under 60 s
Runs after every `git push server main` as the deploy gate. If it fails, the commit is reverted before merging the deploy PR.
## 13. Mammon integration (sketch — outside this spec's scope, owned by mammon)
Belongs in a mammon-side follow-up spec. Captured here only so readers of ix know the MVP's first consumer.
- Paperless poller keeps current behavior for format-matched docs.
- For `needs_parser` docs: submit to ix (`use_case="bank_statement_header"`, `files=[paperless_download_url]`, `texts=[paperless_content]`).
- ix job id recorded on the `Import` row. A new poller on the mammon side checks `GET /jobs/{id}` until done.
- Result is staged (new `pending_headers` table — not `StatementBalance`). A new "Investigate" panel surfaces staged headers with per-field `provenance_verified` + `text_agreement` flags.
- User confirms → write to `StatementBalance`. Over time, when a deterministic parser is added for the bank, compare ix's past extractions against the deterministic output to measure ix accuracy.
## 14. Deferred from full spec (explicit)
- Kafka transport (§15)
- Config Server (§9.1 in full spec, §10 here): use cases are in-repo for MVP
- Azure DI / Computer Vision OCR backends
- OpenAI, Anthropic, AWS Bedrock GenAI backends
- S3 adapter
- `use_vision` + vision scaling/detail
- Word-level provenance granularity
- `reasoning_effort` parameter routing
- Prometheus exporter (/metrics stays JSON for MVP)
- OTEL gRPC exporter (spans present, no exporter)
- Legacy aliases (`prompt_template_base`, `kwargs_use_case`)
- Second-opinion multi-model ensembling
- Schema `version` field
- Per-request rate limiting
Every deferred item is additive: the `OCRClient` / `GenAIClient` / transport-adapter interfaces already leave the plug points, and the pipeline core is unaware of which implementation is in use.
## 15. Implementation workflow (habit reminder)
Every unit of work follows the cross-project habit:
1. `git checkout -b feat/<name>`
2. TDD: write failing test, write code, green, refactor
3. Commit in small logical chunks; update `AGENTS.md` / `README.md` / `docs/` in the same commit as the code
4. `git push forgejo feat/<name>`
5. Create PR via Forgejo API
6. Wait for tests to pass
7. Merge
8. `git push server main` to deploy; run `scripts/e2e_smoke.py` against the live service
Never skip hooks, never force-push main, never bypass tests.