Establishes ix as an async, on-prem, LLM-powered structured extraction microservice. Full reference spec stays in docs/spec-core-pipeline.md; MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue transports in parallel, in-repo use cases, provenance-based reliability signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md. First use case: bank_statement_header (feeds mammon's needs_parser flow). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
912 lines
32 KiB
Markdown
912 lines
32 KiB
Markdown
# Core Pipeline Specification — InfoXtractor
|
||
|
||
|
||
## 1. What the system does
|
||
|
||
InfoXtractor is an **async, LLM-powered document extraction pipeline**. Given one or more
|
||
documents (PDFs, images) or plain text and a *use case* that defines what to extract, it returns
|
||
a structured JSON result whose shape matches the use-case schema.
|
||
|
||
Optional capabilities that are composed at runtime:
|
||
|
||
| Capability | Trigger | What it adds |
|
||
|---|---|---|
|
||
| OCR | `options.ocr.use_ocr = true` (default) | Converts document pages to text via cloud OCR before passing to the LLM |
|
||
| Vision | `options.gen_ai.use_vision = true` | Also passes page images to the LLM as base64 |
|
||
| Provenance | `options.provenance.include_provenance = true` | Each extracted field is annotated with the page number, bounding box, and text snippet it came from |
|
||
| OCR-only | `options.ocr.ocr_only = true` | Returns OCR result without calling the LLM |
|
||
|
||
---
|
||
|
||
## 2. Top-level data contracts
|
||
|
||
### 2.1 RequestIX — pipeline input
|
||
|
||
```python
|
||
class RequestIX:
|
||
use_case: str # Name, URL, or Base64-encoded use-case definition
|
||
ix_client_id: str # Client identifier (used for auth + metrics)
|
||
request_id: str # Correlation key set by the caller; echoed in response
|
||
ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller
|
||
context: Context # What to process
|
||
options: Options # How to process it
|
||
version: Optional[int] # Schema version (default 2)
|
||
|
||
class Context:
|
||
files: List[str] # URLs to PDFs or images (S3, HTTP)
|
||
texts: List[str] # Raw text strings (treated as pages)
|
||
|
||
class Options:
|
||
ocr: OCROptions
|
||
gen_ai: GenAIOptions
|
||
provenance: ProvenanceOptions
|
||
```
|
||
|
||
**OCROptions**
|
||
|
||
| Field | Type | Default | Meaning |
|
||
|---|---|---|---|
|
||
| `use_ocr` | bool | `True` | Run cloud OCR on files |
|
||
| `ocr_only` | bool | `False` | Stop after OCR; skip LLM |
|
||
| `service` | enum | `document_intelligence` | Which OCR backend (`document_intelligence` / `computer_vision`) |
|
||
| `include_geometries` | bool | `False` | Include raw bounding boxes in `ocr_result` |
|
||
| `include_ocr_text` | bool | `False` | Attach flat OCR text to `ocr_result.result.text` |
|
||
| `include_page_tags` | bool | `True` | Inject `<page file="…" number="…">` XML markers into OCR lines |
|
||
| `computer_vision_scaling_factor` | int 1–10 | `1` | Scale multiplier for CV-based OCR |
|
||
|
||
**GenAIOptions**
|
||
|
||
| Field | Type | Default | Meaning |
|
||
|---|---|---|---|
|
||
| `gen_ai_model_name` | str | `gpt-4o` (from config) | LLM deployment name |
|
||
| `use_vision` | bool | `False` | Include page images in the LLM message |
|
||
| `vision_scaling_factor` | int 1–10 | `1` | Scale factor for rendered images |
|
||
| `vision_detail` | enum | `auto` | `auto` / `low` / `high` (OpenAI vision detail) |
|
||
| `reasoning_effort` | Optional[str] | `None` | Passed to reasoning models (e.g. `medium`, `high`) |
|
||
|
||
**ProvenanceOptions**
|
||
|
||
| Field | Type | Default | Meaning |
|
||
|---|---|---|---|
|
||
| `include_provenance` | bool | `False` | Enable provenance tracking |
|
||
| `granularity` | enum | `line` | `line` (only supported today) or `word` |
|
||
| `include_bounding_boxes` | bool | `True` | Attach normalised coordinates to each source |
|
||
| `max_sources_per_field` | int ≥1 | `10` | Cap on sources returned per field |
|
||
| `source_type` | enum | `value_and_context` | `value`: value segments only; `value_and_context`: value + label segments |
|
||
| `min_confidence` | float 0–1 | `0.0` | Minimum per-field confidence (currently a no-op; hook for future) |
|
||
|
||
---
|
||
|
||
### 2.2 ResponseIX — pipeline output
|
||
|
||
```python
|
||
class ResponseIX:
|
||
use_case: Optional[str] # Echoed from RequestIX.use_case
|
||
use_case_name: Optional[str] # Clean name from use-case definition
|
||
ix_client_id: Optional[str]
|
||
request_id: Optional[str]
|
||
ix_id: Optional[str]
|
||
error: Optional[str] # Non-None means processing failed; pipeline stopped
|
||
warning: List[str] # Non-fatal issues encountered during processing
|
||
ix_result: IXResult # LLM extraction result
|
||
ocr_result: OCRResult # OCR output (may be empty if OCR skipped)
|
||
provenance: Optional[ProvenanceData] # Field-level source tracking
|
||
metadata: Metadata # Timings, hostname
|
||
|
||
class IXResult:
|
||
result: dict # Matches the use-case response schema
|
||
result_confidence: dict # Reserved; currently empty
|
||
meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name
|
||
|
||
class OCRResult:
|
||
result: OCRDetails # pages with lines + bounding boxes
|
||
meta_data: dict
|
||
|
||
class OCRDetails:
|
||
text: Optional[str] # Flat text (populated only if include_ocr_text=True)
|
||
pages: List[Page] # Structured OCR pages
|
||
|
||
class Page:
|
||
page_no: int
|
||
width: float # Page width in points
|
||
height: float # Page height in points
|
||
angle: float # Skew angle
|
||
unit: Optional[str]
|
||
lines: List[Line]
|
||
|
||
class Line:
|
||
text: Optional[str]
|
||
bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels)
|
||
words: Words
|
||
|
||
class Metadata:
|
||
timings: List[Dict] # Step-level timing records
|
||
processed_by: Optional[str] # Hostname
|
||
use_case_truncated: Optional[bool]
|
||
```
|
||
|
||
**`ResponseIX.context`** is an internal-only mutable object that accumulates data as it passes
|
||
through pipeline steps. It is **excluded from serialisation** (`exclude=True`) and must be
|
||
stripped before the response is sent to any external consumer.
|
||
|
||
---
|
||
|
||
## 3. The Step interface
|
||
|
||
Every pipeline stage implements a two-method async interface:
|
||
|
||
```python
|
||
class Step(ABC):
|
||
@abstractmethod
|
||
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||
"""
|
||
Returns True → run process()
|
||
Returns False → silently skip this step
|
||
Raises → set ResponseIX.error and abort the pipeline
|
||
"""
|
||
|
||
@abstractmethod
|
||
async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX:
|
||
"""Execute the step; mutate response_ix; return it."""
|
||
```
|
||
|
||
**Key invariants:**
|
||
|
||
- Both `validate` and `process` receive the **same** `request_ix` and `response_ix` objects.
|
||
- `process` receives the `response_ix` as enriched by all previous steps (via `context`).
|
||
- If `validate` raises, the error string is written to `response_ix.error` and no further steps run.
|
||
- If `process` raises, same: error is captured and pipeline stops.
|
||
- A step that returns `False` from `validate` leaves `response_ix` unchanged and the pipeline continues.
|
||
|
||
---
|
||
|
||
## 4. Pipeline orchestration (`Pipeline`)
|
||
|
||
```python
|
||
class Pipeline:
|
||
steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()]
|
||
debug_mode: bool # Enables memory-usage logging and GC inspection
|
||
|
||
async def start(self, request_ix: RequestIX) -> ResponseIX:
|
||
# Wraps _execute_pipeline in an observability span
|
||
...
|
||
|
||
async def _execute_pipeline(self, request_ix, obs) -> ResponseIX:
|
||
response_ix = ResponseIX()
|
||
for step in self.steps:
|
||
# wrapped in a step-level observability span + Timer
|
||
success, response_ix = await self._execute_step(step, request_ix, response_ix)
|
||
if not success:
|
||
return response_ix # abort on first error
|
||
return response_ix
|
||
|
||
async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX):
|
||
valid = await step.validate(request_ix, response_ix)
|
||
if valid:
|
||
response_ix = await step.process(request_ix, response_ix)
|
||
return True, response_ix
|
||
# exceptions caught → response_ix.error set → return False
|
||
```
|
||
|
||
The pipeline itself holds **no I/O references** — it does not know about Kafka, HTTP, or
|
||
databases. The transport layer instantiates `Pipeline`, calls `await pipeline.start(request_ix)`,
|
||
and handles routing the returned `ResponseIX`.
|
||
|
||
---
|
||
|
||
## 5. Step 1 — SetupStep
|
||
|
||
**Purpose:** Validate the request, fetch external files, load the use-case definition, and
|
||
convert all inputs into an internal `pages` representation for downstream steps.
|
||
|
||
### 5.1 validate()
|
||
|
||
Raises if:
|
||
- `request_ix` is `None`
|
||
- Both `context.files` and `context.texts` are empty
|
||
|
||
### 5.2 process()
|
||
|
||
**a) Copy text context**
|
||
|
||
```python
|
||
response_ix.context.texts = list(request_ix.context.texts)
|
||
```
|
||
|
||
**b) Download files** (parallel, async)
|
||
|
||
Each URL in `request_ix.context.files` is downloaded to a local temp path (`/tmp/ix/`).
|
||
The result is a list of `File(file_path, mime_type)` objects stored in
|
||
`response_ix.context.files`.
|
||
|
||
Supported MIME types depend on the chosen OCR service; unsupported files raise immediately.
|
||
|
||
**c) Load use case**
|
||
|
||
The use case is fetched from the Config Server (see §9) by name, URL, or Base64.
|
||
On success, the two classes `UseCaseRequest` and `UseCaseResponse` are stored in
|
||
`response_ix.context.use_case_request` and `response_ix.context.use_case_response`.
|
||
|
||
`use_case_name` is extracted from `UseCaseRequest().use_case_name` and stored in
|
||
`response_ix.use_case_name` for downstream steps and the final response.
|
||
|
||
**d) Build pages** (parallel per document)
|
||
|
||
The ingestor registers every file and classifies its MIME type. Rendering is done per document:
|
||
|
||
- **PDF** (via PyMuPDF): one `Page` object per PDF page, holding `file_path`, `item_index` (0-based index into `request_ix.context.files`), `page_no`, `width`, `height`.
|
||
- **Image** (via PIL): one `Page` per frame. Multi-frame TIFFs produce multiple pages.
|
||
- **Text**: one `Page` per entry in `context.texts`.
|
||
|
||
Maximum pages per PDF: 100 (hard cap; raises on violation).
|
||
|
||
All pages are concatenated into a flat list stored in `response_ix.context.pages`.
|
||
|
||
A `DocumentIngestor` instance is stored in `response_ix.context.ingestion` so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning).
|
||
|
||
---
|
||
|
||
## 6. Step 2 — OCRStep
|
||
|
||
**Purpose:** Convert document pages to structured text with bounding boxes. Optionally build
|
||
a `SegmentIndex` for provenance tracking.
|
||
|
||
### 6.1 validate()
|
||
|
||
Returns `True` when **any** of these flags are set AND `response_ix.context.files` is non-empty:
|
||
|
||
- `use_ocr = True`
|
||
- `include_geometries = True`
|
||
- `include_ocr_text = True`
|
||
- `ocr_only = True`
|
||
|
||
If none of the above apply (text-only request), returns `False` → step is skipped.
|
||
|
||
Raises `IX_000_004` if `ocr_only`, `include_geometries`, or `include_ocr_text` is requested but
|
||
no files were provided.
|
||
|
||
### 6.2 process()
|
||
|
||
**a) Select OCR service**
|
||
|
||
```
|
||
service == "document_intelligence" → AzureDocumentIntelligence
|
||
service == "computer_vision" → AzureComputerVision
|
||
```
|
||
|
||
**b) `perform_ocr(request_ix, response_ix)`** (async, runs against all pages)
|
||
|
||
Populates `response_ix.ocr_result` with an `OCRResult` containing pages with lines and
|
||
raw bounding boxes (8-coordinate polygons, pixel coordinates).
|
||
|
||
**c) Inject page tags** (when `include_page_tags = True`)
|
||
|
||
For each OCR page a synthetic `Line` is prepended and appended:
|
||
|
||
```
|
||
<page file="{item_index}" number="{page_no}">
|
||
… original lines …
|
||
</page>
|
||
```
|
||
|
||
These tags are visible to the LLM as structural markers. The `SegmentIndex` builder
|
||
explicitly skips them.
|
||
|
||
**d) Build SegmentIndex** (when `include_provenance = True`)
|
||
|
||
```python
|
||
response_ix.context.segment_index = await SegmentIndex.build(
|
||
response_ix.ocr_result,
|
||
granularity, # line (default) or word
|
||
pages_metadata=response_ix.context.pages,
|
||
)
|
||
```
|
||
|
||
See §8 for the full SegmentIndex specification.
|
||
|
||
---
|
||
|
||
## 7. Step 3 — GenAIStep
|
||
|
||
**Purpose:** Call the LLM with the prepared context and parse its structured output into
|
||
`response_ix.ix_result`. Optionally map the LLM's segment citations to bounding boxes.
|
||
|
||
### 7.1 validate()
|
||
|
||
- Returns `False` when `ocr_only = True`.
|
||
- Raises when:
|
||
- `use_case` is empty
|
||
- `use_ocr = True` but no OCR text was produced
|
||
- neither `use_ocr` nor `use_vision` nor plain text context is available
|
||
- model is not found in Config Server
|
||
- `use_vision = True` but the model does not support vision
|
||
|
||
If `include_provenance = True` but `use_ocr = False`, a warning is added (provenance will be
|
||
empty) but processing continues.
|
||
|
||
### 7.2 process()
|
||
|
||
**a) Prepare system prompt**
|
||
|
||
```python
|
||
prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base
|
||
```
|
||
|
||
If `include_provenance = True`, append the segment-citation instruction (see §8.2).
|
||
|
||
**b) Build text context**
|
||
|
||
When provenance is on and a `SegmentIndex` exists:
|
||
|
||
```
|
||
[p1_l0] Line text
|
||
[p1_l1] Next line
|
||
…
|
||
```
|
||
|
||
Otherwise: join OCR page lines with `\n\n` between pages, then append any raw texts.
|
||
|
||
**c) Build visual context** (when `use_vision = True`)
|
||
|
||
For each page:
|
||
1. Render to PIL image at `vision_scaling_factor × original_size` (capped by `render_max_pixels_per_page`).
|
||
2. Convert TIFF frames to JPEG.
|
||
3. Base64-encode.
|
||
4. Wrap as a provider-neutral image part:
|
||
|
||
```json
|
||
{"type": "image", "media_type": "image/jpeg", "data": "<base64>", "detail": "auto"}
|
||
```
|
||
|
||
**d) Assemble LLM call parameters**
|
||
|
||
```python
|
||
request_kwargs = {
|
||
"model": model_name,
|
||
"temperature": from_model_settings,
|
||
"messages": [
|
||
{"role": "system", "content": prompt_string},
|
||
{"role": "user", "content": [text_part, *image_parts]},
|
||
],
|
||
# "reasoning_effort": "medium" ← only for reasoning-capable models
|
||
}
|
||
```
|
||
|
||
**e) Determine response schema**
|
||
|
||
- When `include_provenance = False`: schema is `UseCaseResponse` directly.
|
||
- When `include_provenance = True`: a dynamic wrapper is created at runtime:
|
||
|
||
```python
|
||
ProvenanceWrappedResponse = create_model(
|
||
"ProvenanceWrappedResponse",
|
||
result=(UseCaseResponse, Field(...)),
|
||
segment_citations=(List[SegmentCitation], Field(default_factory=list)),
|
||
)
|
||
```
|
||
|
||
The use-case file is **never modified**; the wrapper is ephemeral.
|
||
|
||
**f) Invoke LLM**
|
||
|
||
```python
|
||
result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings)
|
||
# result.parsed → the Pydantic model instance
|
||
# result.usage → GenAIUsage(prompt_tokens, completion_tokens)
|
||
# result.model_name → string
|
||
```
|
||
|
||
**g) Write results**
|
||
|
||
- `response_ix.ix_result.result = parsed.result.model_dump()` (provenance mode) or `parsed.model_dump()` (normal mode)
|
||
- `response_ix.ix_result.meta_data = {token_usage, model_name}`
|
||
|
||
**h) Build provenance** (when `include_provenance = True`)
|
||
|
||
```python
|
||
response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance(
|
||
extraction_result={"result": ix_result},
|
||
segment_citations=parsed.segment_citations,
|
||
segment_index=response_ix.context.segment_index,
|
||
max_sources_per_field=opts.max_sources_per_field,
|
||
min_confidence=opts.min_confidence,
|
||
include_bounding_boxes=opts.include_bounding_boxes,
|
||
source_type=opts.source_type,
|
||
)
|
||
```
|
||
|
||
See §8.4 for the provenance resolution algorithm.
|
||
|
||
### 7.3 Provider routing
|
||
|
||
```python
|
||
provider = model_settings["provider"] # "azure" or "bedrock"
|
||
if provider == "bedrock": service = AWSGenAI()
|
||
else: service = AzureOpenAI()
|
||
```
|
||
|
||
The `GenAIClient` interface is:
|
||
|
||
```python
|
||
class GenAIClient(ABC):
|
||
def setup_gen_ai_client(self, request_ix, model_settings) -> None: ...
|
||
async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ...
|
||
```
|
||
|
||
`AzureOpenAI` translates provider-neutral image parts to OpenAI's `image_url` content format.
|
||
It caches one `AsyncOpenAI` instance per `(base_url, api_key)` pair.
|
||
|
||
---
|
||
|
||
## 8. Step 4 — ResponseHandlerStep
|
||
|
||
**Purpose:** Shape the final payload before handing it to the transport layer.
|
||
|
||
### 8.1 validate()
|
||
|
||
Always returns `True`.
|
||
|
||
### 8.2 process()
|
||
|
||
1. **Attach OCR text** (`include_ocr_text = True`): concatenate all line texts per page,
|
||
join pages with `\n\n`, store in `ocr_result.result.text`.
|
||
|
||
2. **Strip geometries** (`include_geometries = False`, default): clear `ocr_result.result.pages`
|
||
and `ocr_result.meta_data`. The structured page data is large; callers who don't need it
|
||
should not receive it.
|
||
|
||
3. **Trim context**: delete `response_ix.context` entirely. The mutable internal context is
|
||
never serialised.
|
||
|
||
---
|
||
|
||
## 9. Provenance subsystem
|
||
|
||
### 9.1 SegmentIndex
|
||
|
||
Built from `ocr_result` after OCR completes. It assigns each non-tag OCR line a unique ID:
|
||
|
||
```
|
||
p{global_page_position}_l{line_index_within_page}
|
||
```
|
||
|
||
- `global_page_position` is 1-based and refers to the **flat pages list position**, not the
|
||
page number printed in the document. This guarantees uniqueness across multi-document requests.
|
||
- Page tag lines (`<page …>` / `</page>`) are explicitly excluded.
|
||
|
||
Internal storage:
|
||
|
||
```python
|
||
_id_to_position: Dict[str, Dict] = {
|
||
"p1_l0": {
|
||
"page": 1, # global 1-based position
|
||
"bbox": BoundingBox(...), # normalised 0-1 coordinates
|
||
"text": "Invoice Number: INV-001",
|
||
"file_index": 0, # 0-based index into request.context.files
|
||
},
|
||
...
|
||
}
|
||
_ordered_ids: List[str] # insertion order, for deterministic prompt text
|
||
```
|
||
|
||
**Bounding box normalisation:**
|
||
|
||
```python
|
||
# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height)
|
||
[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H]
|
||
```
|
||
|
||
Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering.
|
||
|
||
**Prompt format** (`SegmentIndex.to_prompt_text()`):
|
||
|
||
```
|
||
[p1_l0] Invoice Number: INV-001
|
||
[p1_l1] Date: 2024-01-15
|
||
[p1_l2] Total: CHF 1234.00
|
||
…
|
||
```
|
||
|
||
Extra plain texts (from `context.texts`) are appended untagged.
|
||
|
||
**O(1) lookup:**
|
||
|
||
```python
|
||
position = segment_index.lookup_segment("p1_l0") # None if unknown
|
||
```
|
||
|
||
### 9.2 SegmentCitation (LLM output model)
|
||
|
||
The LLM is instructed to populate `segment_citations` in its structured output:
|
||
|
||
```python
|
||
class SegmentCitation(BaseModel):
|
||
field_path: str # e.g. "result.invoice_number"
|
||
value_segment_ids: List[str] # segments containing the extracted value
|
||
context_segment_ids: List[str] # surrounding label/anchor segments (may be empty)
|
||
```
|
||
|
||
**System prompt addition (injected by GenAIStep when provenance is on):**
|
||
|
||
```
|
||
For each extracted field, you must also populate the `segment_citations` list.
|
||
Each entry maps one field to the document segments that were its source.
|
||
Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number').
|
||
Use two separate segment ID lists:
|
||
- `value_segment_ids`: segment IDs whose text directly contains the extracted value
|
||
(e.g. ['p1_l4'] for the line containing 'INV-001').
|
||
- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you
|
||
identify the field but does not contain the value itself
|
||
(e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label.
|
||
Only use segment IDs that appear in the document text.
|
||
Omit fields for which you cannot identify a source segment.
|
||
```
|
||
|
||
### 9.3 ProvenanceData (response model)
|
||
|
||
```python
|
||
class ProvenanceData(BaseModel):
|
||
fields: Dict[str, FieldProvenance] # field_path → provenance
|
||
quality_metrics: Dict[str, Any] # see below
|
||
segment_count: Optional[int]
|
||
granularity: Optional[str] # "line" or "word"
|
||
|
||
class FieldProvenance(BaseModel):
|
||
field_name: str # last segment of field_path
|
||
field_path: str # e.g. "result.invoice_number"
|
||
value: Any # actual extracted value (resolved from ix_result.result)
|
||
sources: List[ExtractionSource]
|
||
confidence: Optional[float] # None today; reserved for future
|
||
|
||
class ExtractionSource(BaseModel):
|
||
page_number: int # global 1-based position
|
||
file_index: Optional[int] # 0-based index into request.context.files
|
||
bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False
|
||
text_snippet: str # actual OCR text at this location
|
||
relevance_score: float # always 1.0 (exact segment ID match)
|
||
segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering
|
||
|
||
class BoundingBox(BaseModel):
|
||
coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised
|
||
```
|
||
|
||
**quality_metrics:**
|
||
|
||
```json
|
||
{
|
||
"fields_with_provenance": 8,
|
||
"total_fields": 10,
|
||
"coverage_rate": 0.8,
|
||
"invalid_references": 2
|
||
}
|
||
```
|
||
|
||
- `coverage_rate` = fields with at least one source ÷ total leaf fields in result
|
||
- `invalid_references` = segment IDs cited by LLM that were not found in SegmentIndex
|
||
|
||
### 9.4 Provenance resolution algorithm
|
||
|
||
```python
|
||
for citation in segment_citations:
|
||
if source_type == VALUE:
|
||
seg_ids = citation.value_segment_ids
|
||
else:
|
||
seg_ids = citation.value_segment_ids + citation.context_segment_ids
|
||
|
||
sources = []
|
||
for seg_id in seg_ids[:max_sources_per_field]:
|
||
pos = segment_index.lookup_segment(seg_id)
|
||
if pos is None:
|
||
invalid_ref_count += 1
|
||
continue
|
||
sources.append(ExtractionSource(
|
||
page_number=pos["page"],
|
||
file_index=pos["file_index"],
|
||
bounding_box=pos["bbox"] if include_bounding_boxes else None,
|
||
text_snippet=pos["text"],
|
||
relevance_score=1.0,
|
||
segment_id=seg_id,
|
||
))
|
||
|
||
if not sources:
|
||
continue # skip fields that resolve to nothing
|
||
|
||
# Resolve value: traverse result dict via dot-notation field_path
|
||
# e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"]
|
||
value = resolve_nested_path(extraction_result, citation.field_path)
|
||
|
||
provenance[citation.field_path] = FieldProvenance(
|
||
field_name=citation.field_path.split(".")[-1],
|
||
field_path=citation.field_path,
|
||
value=value,
|
||
sources=sources,
|
||
confidence=None,
|
||
)
|
||
```
|
||
|
||
Array notation is normalised before traversal: `"items[0].name"` → `"items.0.name"`.
|
||
|
||
---
|
||
|
||
## 10. Use-case definition contract
|
||
|
||
A use case is a Python module (or dynamically loaded class) that defines two Pydantic models.
|
||
The Config Server serves them by name; the pipeline loads them via `config_server.use_cases.load_and_validate_use_case(use_case)`.
|
||
|
||
### UseCaseRequest
|
||
|
||
```python
|
||
class UseCaseRequest(BaseModel):
|
||
use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name
|
||
system_prompt: str # System prompt sent to the LLM
|
||
# Legacy: prompt_template_base (deprecated alias for system_prompt)
|
||
# Legacy: kwargs_use_case (deprecated; triggers format(**kwargs))
|
||
```
|
||
|
||
### UseCaseResponse
|
||
|
||
A Pydantic model whose fields define the extraction schema. The LLM is instructed to return
|
||
JSON matching this schema (via `chat.completions.parse(response_format=UseCaseResponse)`).
|
||
|
||
**Example:**
|
||
|
||
```python
|
||
class InvoiceResponse(BaseModel):
|
||
invoice_number: str
|
||
invoice_date: str
|
||
total_amount: float
|
||
line_items: List[LineItem]
|
||
|
||
class LineItem(BaseModel):
|
||
description: str
|
||
quantity: int
|
||
unit_price: float
|
||
```
|
||
|
||
The schema is passed as the `response_format` argument to the OpenAI structured-output API.
|
||
When provenance is on, it is wrapped in `ProvenanceWrappedResponse` at runtime (§7.2e).
|
||
|
||
---
|
||
|
||
## 11. LLM invocation contract
|
||
|
||
### 11.1 Provider-neutral request_kwargs
|
||
|
||
```python
|
||
request_kwargs = {
|
||
"model": str, # deployment name
|
||
"temperature": Optional[float], # from model_settings; None for reasoning models
|
||
"messages": [
|
||
{"role": "system", "content": str},
|
||
{"role": "user", "content": List[ContentPart]},
|
||
],
|
||
"reasoning_effort": Optional[str], # only present when supported by model
|
||
}
|
||
|
||
ContentPart = (
|
||
{"type": "text", "text": str}
|
||
| {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str}
|
||
)
|
||
```
|
||
|
||
### 11.2 GenAIInvocationResult
|
||
|
||
```python
|
||
@dataclass
|
||
class GenAIInvocationResult:
|
||
parsed: Any # Parsed Pydantic model instance (matches response_schema)
|
||
usage: GenAIUsage # prompt_tokens, completion_tokens
|
||
model_name: str
|
||
```
|
||
|
||
### 11.3 Model settings (from Config Server)
|
||
|
||
```json
|
||
{
|
||
"provider": "azure",
|
||
"temperature": 0.0,
|
||
"vision_capability": true,
|
||
"supports_reasoning_efforts": ["low", "medium", "high"],
|
||
"default_reasoning_effort": "medium"
|
||
}
|
||
```
|
||
|
||
- `temperature` is `None` for reasoning models (OpenAI ignores it and it must be omitted).
|
||
- `vision_capability` must be `true` for `use_vision = True` requests to be accepted.
|
||
- `supports_reasoning_efforts` is the allowlist; requests for unsupported values fall back to
|
||
`default_reasoning_effort` with a warning.
|
||
|
||
---
|
||
|
||
## 12. Error model
|
||
|
||
### 12.1 IXException
|
||
|
||
All domain errors are wrapped in `IXException(message_or_Error_enum)`. The pipeline catches
|
||
any exception from `validate()` or `process()`, writes `str(e)` to `response_ix.error`, and
|
||
stops.
|
||
|
||
### 12.2 Named error codes
|
||
|
||
| Code | Trigger |
|
||
|---|---|
|
||
| `IX_000_000` | `request_ix` is `None` |
|
||
| `IX_000_002` | No context (neither files nor texts) |
|
||
| `IX_000_004` | OCR required but no files provided |
|
||
| `IX_000_005` | File MIME type not supported by chosen OCR service |
|
||
| `IX_001_000` | `use_case` is empty or missing |
|
||
| `IX_001_001` | Use case could not be loaded or validated |
|
||
| `IX_003_003` | `use_vision=True` but model has no vision capability |
|
||
| `IX_004_001` | Response too large for transport (e.g. Kafka max message size) |
|
||
|
||
### 12.3 Warnings
|
||
|
||
Non-fatal issues are appended to `response_ix.warning` and logged. Examples:
|
||
|
||
- Vision requested but no renderable documents found
|
||
- Image scale capped due to pixel limit
|
||
- Provenance requested with `use_ocr=False`
|
||
- Word document contains embedded images that will be skipped
|
||
- `use_case_name` not set in use-case definition
|
||
|
||
---
|
||
|
||
## 13. Configuration (AppConfig)
|
||
|
||
All settings are loaded from environment variables (with `.env` fallback via `pydantic-settings`).
|
||
|
||
| Category | Key env vars |
|
||
|---|---|
|
||
| Azure OCR | `DOCUMENT_INTELLIGENCE_ENDPOINT`, `DOCUMENT_INTELLIGENCE_KEY`, `COMPUTER_VISION_ENDPOINT`, `COMPUTER_VISION_SUBSCRIPTION_KEY` |
|
||
| Azure OpenAI | `OPENAI_BASE_URL`, `OPENAI_API_KEY` |
|
||
| LLM defaults | `DEFAULT_LLM` (default: `gpt-4o`) |
|
||
| Config Server | `CONFIG_SERVER_BASE_URL` (default: `http://ix-config:8998/`) |
|
||
| Pipeline | `PIPELINE_WORKER_CONCURRENCY` (1–10), `PIPELINE_REQUEST_TIMEOUT_SECONDS` (default: 2700) |
|
||
| Rendering | `RENDER_MAX_PIXELS_PER_PAGE` (default: 75,000,000) |
|
||
| AWS | `S3_BUCKET`, `S3_OBJECT_PREFIX`, `AWS_REGION`, `TARGET_ROLE` |
|
||
| Kafka | `PROCESSING_TOPIC`, `KAFKA_BOOTSTRAP_SERVER`, … |
|
||
| Observability | `LOG_LEVEL`, `OTEL_EXPORTER_OTLP_ENDPOINT`, `PYROSCOPE_SERVER_ADDRESS` |
|
||
|
||
---
|
||
|
||
## 14. Observability
|
||
|
||
### 14.1 Logging
|
||
|
||
Every log call carries `ix_id` as contextual metadata. The custom logger exposes:
|
||
|
||
```python
|
||
logger.info_ix(msg, request_ix)
|
||
logger.warning_ix(msg, request_ix)
|
||
logger.error_ix(msg, request_ix)
|
||
```
|
||
|
||
If `ix_id` is not yet set on `request_ix`, the logger generates one lazily on first use.
|
||
|
||
Step timing is recorded via a `Timer` context manager; results are stored in
|
||
`response_ix.metadata.timings`.
|
||
|
||
### 14.2 Tracing (OpenTelemetry)
|
||
|
||
- **Pipeline span** wraps the entire `_execute_pipeline()` call (attributes: `request_id`, `use_case`, `client_id`).
|
||
- **Step span** wraps each individual step.
|
||
- Errors are recorded on the span with `mark_failed(error_message)`.
|
||
- Exported via gRPC to `OTEL_EXPORTER_OTLP_ENDPOINT`.
|
||
|
||
### 14.3 Metrics (Prometheus)
|
||
|
||
| Metric | Labels |
|
||
|---|---|
|
||
| `pipeline_requests_total` | `client_id` |
|
||
| LLM token counter | `model`, `client_id`, `use_case`, `status` |
|
||
| LLM latency histogram | `model`, `client_id` |
|
||
| Extraction quality | `client_id`, `use_case`, `model`, `reasoning_effort` |
|
||
|
||
Exposed on the health port (default 9009).
|
||
|
||
---
|
||
|
||
## 15. Transport adapter contract
|
||
|
||
The pipeline is transport-agnostic. Any delivery mechanism must:
|
||
|
||
1. **Deserialise** the incoming payload into a `RequestIX` instance.
|
||
2. **Assign `ix_id`** (a random 16-character hex string) if not already present.
|
||
3. **Call** `await Pipeline().start(request_ix)` → `response_ix`.
|
||
4. **Serialise** `response_ix` (use `.model_dump(by_alias=True)` to honour `serialization_alias` on `metadata`).
|
||
5. **Deliver** the serialised response to the caller via the appropriate channel.
|
||
|
||
Optionally:
|
||
- **Persist** `request_ix` and `response_ix` to a durable store (S3, database) keyed by `ix_id`.
|
||
- **Apply a timeout**: wrap the `await pipeline.start(...)` call and write a timeout error to `response_ix.error` on expiry.
|
||
|
||
### Kafka adapter (existing)
|
||
|
||
- Consumes from `ch-infoxtractor.processing.v2`; produces to `ch-infoxtractor.outbound.v2` and client-specific topics.
|
||
- Concurrency controlled via asyncio semaphore (`PIPELINE_WORKER_CONCURRENCY`).
|
||
- Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets).
|
||
- Graceful shutdown: drains in-flight messages before stopping.
|
||
|
||
### FastAPI adapter (example)
|
||
|
||
```python
|
||
@router.post("/extract")
|
||
async def extract(body: RequestIX) -> ResponseIX:
|
||
body.ix_id = body.ix_id or secrets.token_hex(8)
|
||
response = await pipeline.start(body)
|
||
return response
|
||
```
|
||
|
||
### Database adapter (example)
|
||
|
||
```python
|
||
async def process_job(job_id: str, db: AsyncSession):
|
||
row = await db.get(Job, job_id)
|
||
request_ix = RequestIX.model_validate_json(row.payload)
|
||
request_ix.ix_id = row.ix_id
|
||
response_ix = await pipeline.start(request_ix)
|
||
row.result = response_ix.model_dump_json(by_alias=True)
|
||
row.status = "done" if not response_ix.error else "error"
|
||
await db.commit()
|
||
```
|
||
|
||
---
|
||
|
||
## 16. End-to-end data flow summary
|
||
|
||
```
|
||
[Transport layer]
|
||
↓ RequestIX (use_case, context.files/texts, options, ix_id)
|
||
↓
|
||
[Pipeline.start()]
|
||
↓
|
||
[SetupStep]
|
||
• Download files → local paths → response_ix.context.files (File objects)
|
||
• Load use case from Config Server → response_ix.context.use_case_request/response
|
||
• Split PDFs / load images / wrap texts → response_ix.context.pages (flat list)
|
||
↓
|
||
[OCRStep] (skipped if no files or use_ocr=False)
|
||
• Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox)
|
||
• Inject <page> XML markers → lines prepended/appended in ocr_result.pages
|
||
• Build SegmentIndex (if provenance) → response_ix.context.segment_index
|
||
↓
|
||
[GenAIStep] (skipped if ocr_only=True)
|
||
• Build system prompt (+ citation hint if provenance)
|
||
• Build user content: segment-tagged text OR plain OCR text + optional base64 images
|
||
• Determine response schema (wrapped if provenance)
|
||
• Call LLM via GenAIClient.invoke()
|
||
• Write ix_result.result + meta_data
|
||
• Resolve segment citations → ProvenanceData (if provenance)
|
||
↓
|
||
[ResponseHandlerStep] (always runs)
|
||
• Attach flat OCR text (if include_ocr_text)
|
||
• Strip bounding boxes (if not include_geometries)
|
||
• Delete response_ix.context (never serialised)
|
||
↓
|
||
[ResponseIX]
|
||
ix_result.result ← structured extraction matching UseCaseResponse schema
|
||
ocr_result ← OCR details (empty pages unless include_geometries)
|
||
provenance ← field-level source map (None unless include_provenance)
|
||
error / warning ← diagnostics
|
||
metadata ← timings, hostname
|
||
↓
|
||
[Transport layer]
|
||
Serialise → deliver to caller
|
||
Persist (optional) → keyed by ix_id
|
||
```
|
||
|
||
---
|
||
|
||
## 17. Key design decisions
|
||
|
||
| Decision | Rationale |
|
||
|---|---|
|
||
| **Two-method Step interface** (`validate` + `process`) | Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop |
|
||
| **Mutable `context` on ResponseIX** | Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; `context` is excluded from serialisation so it never leaks |
|
||
| **Single LLM call for provenance** | The schema is wrapped at runtime to add `segment_citations`; no second round-trip to the LLM is needed |
|
||
| **Segment IDs instead of fuzzy text matching** | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates |
|
||
| **Global page position for segment IDs** | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce `p1_l*` and `p2_l*`, not the same IDs) |
|
||
| **Provider-neutral `request_kwargs`** | The `GenAIClient` interface accepts a dict; provider-specific translation (e.g. OpenAI `image_url` format) happens inside each adapter, not in the step |
|
||
| **Normalised bounding boxes (0–1)** | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates |
|
||
| **Transport-agnostic pipeline** | Kafka, FastAPI, and DB adapters all call the same `Pipeline.start()` |
|
||
| **Config Server for use cases** | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline |
|