infoxtractor/docs/spec-core-pipeline.md
Dirk Riemann 124403252d Initial design: on-prem LLM extraction microservice MVP
Establishes ix as an async, on-prem, LLM-powered structured extraction
microservice. Full reference spec stays in docs/spec-core-pipeline.md;
MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue
transports in parallel, in-repo use cases, provenance-based reliability
signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md.

First use case: bank_statement_header (feeds mammon's needs_parser flow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 10:23:17 +02:00

912 lines
32 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Core Pipeline Specification — InfoXtractor
## 1. What the system does
InfoXtractor is an **async, LLM-powered document extraction pipeline**. Given one or more
documents (PDFs, images) or plain text and a *use case* that defines what to extract, it returns
a structured JSON result whose shape matches the use-case schema.
Optional capabilities that are composed at runtime:
| Capability | Trigger | What it adds |
|---|---|---|
| OCR | `options.ocr.use_ocr = true` (default) | Converts document pages to text via cloud OCR before passing to the LLM |
| Vision | `options.gen_ai.use_vision = true` | Also passes page images to the LLM as base64 |
| Provenance | `options.provenance.include_provenance = true` | Each extracted field is annotated with the page number, bounding box, and text snippet it came from |
| OCR-only | `options.ocr.ocr_only = true` | Returns OCR result without calling the LLM |
---
## 2. Top-level data contracts
### 2.1 RequestIX — pipeline input
```python
class RequestIX:
use_case: str # Name, URL, or Base64-encoded use-case definition
ix_client_id: str # Client identifier (used for auth + metrics)
request_id: str # Correlation key set by the caller; echoed in response
ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller
context: Context # What to process
options: Options # How to process it
version: Optional[int] # Schema version (default 2)
class Context:
files: List[str] # URLs to PDFs or images (S3, HTTP)
texts: List[str] # Raw text strings (treated as pages)
class Options:
ocr: OCROptions
gen_ai: GenAIOptions
provenance: ProvenanceOptions
```
**OCROptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `use_ocr` | bool | `True` | Run cloud OCR on files |
| `ocr_only` | bool | `False` | Stop after OCR; skip LLM |
| `service` | enum | `document_intelligence` | Which OCR backend (`document_intelligence` / `computer_vision`) |
| `include_geometries` | bool | `False` | Include raw bounding boxes in `ocr_result` |
| `include_ocr_text` | bool | `False` | Attach flat OCR text to `ocr_result.result.text` |
| `include_page_tags` | bool | `True` | Inject `<page file="…" number="…">` XML markers into OCR lines |
| `computer_vision_scaling_factor` | int 110 | `1` | Scale multiplier for CV-based OCR |
**GenAIOptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `gen_ai_model_name` | str | `gpt-4o` (from config) | LLM deployment name |
| `use_vision` | bool | `False` | Include page images in the LLM message |
| `vision_scaling_factor` | int 110 | `1` | Scale factor for rendered images |
| `vision_detail` | enum | `auto` | `auto` / `low` / `high` (OpenAI vision detail) |
| `reasoning_effort` | Optional[str] | `None` | Passed to reasoning models (e.g. `medium`, `high`) |
**ProvenanceOptions**
| Field | Type | Default | Meaning |
|---|---|---|---|
| `include_provenance` | bool | `False` | Enable provenance tracking |
| `granularity` | enum | `line` | `line` (only supported today) or `word` |
| `include_bounding_boxes` | bool | `True` | Attach normalised coordinates to each source |
| `max_sources_per_field` | int ≥1 | `10` | Cap on sources returned per field |
| `source_type` | enum | `value_and_context` | `value`: value segments only; `value_and_context`: value + label segments |
| `min_confidence` | float 01 | `0.0` | Minimum per-field confidence (currently a no-op; hook for future) |
---
### 2.2 ResponseIX — pipeline output
```python
class ResponseIX:
use_case: Optional[str] # Echoed from RequestIX.use_case
use_case_name: Optional[str] # Clean name from use-case definition
ix_client_id: Optional[str]
request_id: Optional[str]
ix_id: Optional[str]
error: Optional[str] # Non-None means processing failed; pipeline stopped
warning: List[str] # Non-fatal issues encountered during processing
ix_result: IXResult # LLM extraction result
ocr_result: OCRResult # OCR output (may be empty if OCR skipped)
provenance: Optional[ProvenanceData] # Field-level source tracking
metadata: Metadata # Timings, hostname
class IXResult:
result: dict # Matches the use-case response schema
result_confidence: dict # Reserved; currently empty
meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name
class OCRResult:
result: OCRDetails # pages with lines + bounding boxes
meta_data: dict
class OCRDetails:
text: Optional[str] # Flat text (populated only if include_ocr_text=True)
pages: List[Page] # Structured OCR pages
class Page:
page_no: int
width: float # Page width in points
height: float # Page height in points
angle: float # Skew angle
unit: Optional[str]
lines: List[Line]
class Line:
text: Optional[str]
bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels)
words: Words
class Metadata:
timings: List[Dict] # Step-level timing records
processed_by: Optional[str] # Hostname
use_case_truncated: Optional[bool]
```
**`ResponseIX.context`** is an internal-only mutable object that accumulates data as it passes
through pipeline steps. It is **excluded from serialisation** (`exclude=True`) and must be
stripped before the response is sent to any external consumer.
---
## 3. The Step interface
Every pipeline stage implements a two-method async interface:
```python
class Step(ABC):
@abstractmethod
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
"""
Returns True → run process()
Returns False → silently skip this step
Raises → set ResponseIX.error and abort the pipeline
"""
@abstractmethod
async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX:
"""Execute the step; mutate response_ix; return it."""
```
**Key invariants:**
- Both `validate` and `process` receive the **same** `request_ix` and `response_ix` objects.
- `process` receives the `response_ix` as enriched by all previous steps (via `context`).
- If `validate` raises, the error string is written to `response_ix.error` and no further steps run.
- If `process` raises, same: error is captured and pipeline stops.
- A step that returns `False` from `validate` leaves `response_ix` unchanged and the pipeline continues.
---
## 4. Pipeline orchestration (`Pipeline`)
```python
class Pipeline:
steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()]
debug_mode: bool # Enables memory-usage logging and GC inspection
async def start(self, request_ix: RequestIX) -> ResponseIX:
# Wraps _execute_pipeline in an observability span
...
async def _execute_pipeline(self, request_ix, obs) -> ResponseIX:
response_ix = ResponseIX()
for step in self.steps:
# wrapped in a step-level observability span + Timer
success, response_ix = await self._execute_step(step, request_ix, response_ix)
if not success:
return response_ix # abort on first error
return response_ix
async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX):
valid = await step.validate(request_ix, response_ix)
if valid:
response_ix = await step.process(request_ix, response_ix)
return True, response_ix
# exceptions caught → response_ix.error set → return False
```
The pipeline itself holds **no I/O references** — it does not know about Kafka, HTTP, or
databases. The transport layer instantiates `Pipeline`, calls `await pipeline.start(request_ix)`,
and handles routing the returned `ResponseIX`.
---
## 5. Step 1 — SetupStep
**Purpose:** Validate the request, fetch external files, load the use-case definition, and
convert all inputs into an internal `pages` representation for downstream steps.
### 5.1 validate()
Raises if:
- `request_ix` is `None`
- Both `context.files` and `context.texts` are empty
### 5.2 process()
**a) Copy text context**
```python
response_ix.context.texts = list(request_ix.context.texts)
```
**b) Download files** (parallel, async)
Each URL in `request_ix.context.files` is downloaded to a local temp path (`/tmp/ix/`).
The result is a list of `File(file_path, mime_type)` objects stored in
`response_ix.context.files`.
Supported MIME types depend on the chosen OCR service; unsupported files raise immediately.
**c) Load use case**
The use case is fetched from the Config Server (see §9) by name, URL, or Base64.
On success, the two classes `UseCaseRequest` and `UseCaseResponse` are stored in
`response_ix.context.use_case_request` and `response_ix.context.use_case_response`.
`use_case_name` is extracted from `UseCaseRequest().use_case_name` and stored in
`response_ix.use_case_name` for downstream steps and the final response.
**d) Build pages** (parallel per document)
The ingestor registers every file and classifies its MIME type. Rendering is done per document:
- **PDF** (via PyMuPDF): one `Page` object per PDF page, holding `file_path`, `item_index` (0-based index into `request_ix.context.files`), `page_no`, `width`, `height`.
- **Image** (via PIL): one `Page` per frame. Multi-frame TIFFs produce multiple pages.
- **Text**: one `Page` per entry in `context.texts`.
Maximum pages per PDF: 100 (hard cap; raises on violation).
All pages are concatenated into a flat list stored in `response_ix.context.pages`.
A `DocumentIngestor` instance is stored in `response_ix.context.ingestion` so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning).
---
## 6. Step 2 — OCRStep
**Purpose:** Convert document pages to structured text with bounding boxes. Optionally build
a `SegmentIndex` for provenance tracking.
### 6.1 validate()
Returns `True` when **any** of these flags are set AND `response_ix.context.files` is non-empty:
- `use_ocr = True`
- `include_geometries = True`
- `include_ocr_text = True`
- `ocr_only = True`
If none of the above apply (text-only request), returns `False` → step is skipped.
Raises `IX_000_004` if `ocr_only`, `include_geometries`, or `include_ocr_text` is requested but
no files were provided.
### 6.2 process()
**a) Select OCR service**
```
service == "document_intelligence" → AzureDocumentIntelligence
service == "computer_vision" → AzureComputerVision
```
**b) `perform_ocr(request_ix, response_ix)`** (async, runs against all pages)
Populates `response_ix.ocr_result` with an `OCRResult` containing pages with lines and
raw bounding boxes (8-coordinate polygons, pixel coordinates).
**c) Inject page tags** (when `include_page_tags = True`)
For each OCR page a synthetic `Line` is prepended and appended:
```
<page file="{item_index}" number="{page_no}">
… original lines …
</page>
```
These tags are visible to the LLM as structural markers. The `SegmentIndex` builder
explicitly skips them.
**d) Build SegmentIndex** (when `include_provenance = True`)
```python
response_ix.context.segment_index = await SegmentIndex.build(
response_ix.ocr_result,
granularity, # line (default) or word
pages_metadata=response_ix.context.pages,
)
```
See §8 for the full SegmentIndex specification.
---
## 7. Step 3 — GenAIStep
**Purpose:** Call the LLM with the prepared context and parse its structured output into
`response_ix.ix_result`. Optionally map the LLM's segment citations to bounding boxes.
### 7.1 validate()
- Returns `False` when `ocr_only = True`.
- Raises when:
- `use_case` is empty
- `use_ocr = True` but no OCR text was produced
- neither `use_ocr` nor `use_vision` nor plain text context is available
- model is not found in Config Server
- `use_vision = True` but the model does not support vision
If `include_provenance = True` but `use_ocr = False`, a warning is added (provenance will be
empty) but processing continues.
### 7.2 process()
**a) Prepare system prompt**
```python
prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base
```
If `include_provenance = True`, append the segment-citation instruction (see §8.2).
**b) Build text context**
When provenance is on and a `SegmentIndex` exists:
```
[p1_l0] Line text
[p1_l1] Next line
```
Otherwise: join OCR page lines with `\n\n` between pages, then append any raw texts.
**c) Build visual context** (when `use_vision = True`)
For each page:
1. Render to PIL image at `vision_scaling_factor × original_size` (capped by `render_max_pixels_per_page`).
2. Convert TIFF frames to JPEG.
3. Base64-encode.
4. Wrap as a provider-neutral image part:
```json
{"type": "image", "media_type": "image/jpeg", "data": "<base64>", "detail": "auto"}
```
**d) Assemble LLM call parameters**
```python
request_kwargs = {
"model": model_name,
"temperature": from_model_settings,
"messages": [
{"role": "system", "content": prompt_string},
{"role": "user", "content": [text_part, *image_parts]},
],
# "reasoning_effort": "medium" ← only for reasoning-capable models
}
```
**e) Determine response schema**
- When `include_provenance = False`: schema is `UseCaseResponse` directly.
- When `include_provenance = True`: a dynamic wrapper is created at runtime:
```python
ProvenanceWrappedResponse = create_model(
"ProvenanceWrappedResponse",
result=(UseCaseResponse, Field(...)),
segment_citations=(List[SegmentCitation], Field(default_factory=list)),
)
```
The use-case file is **never modified**; the wrapper is ephemeral.
**f) Invoke LLM**
```python
result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings)
# result.parsed → the Pydantic model instance
# result.usage → GenAIUsage(prompt_tokens, completion_tokens)
# result.model_name → string
```
**g) Write results**
- `response_ix.ix_result.result = parsed.result.model_dump()` (provenance mode) or `parsed.model_dump()` (normal mode)
- `response_ix.ix_result.meta_data = {token_usage, model_name}`
**h) Build provenance** (when `include_provenance = True`)
```python
response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance(
extraction_result={"result": ix_result},
segment_citations=parsed.segment_citations,
segment_index=response_ix.context.segment_index,
max_sources_per_field=opts.max_sources_per_field,
min_confidence=opts.min_confidence,
include_bounding_boxes=opts.include_bounding_boxes,
source_type=opts.source_type,
)
```
See §8.4 for the provenance resolution algorithm.
### 7.3 Provider routing
```python
provider = model_settings["provider"] # "azure" or "bedrock"
if provider == "bedrock": service = AWSGenAI()
else: service = AzureOpenAI()
```
The `GenAIClient` interface is:
```python
class GenAIClient(ABC):
def setup_gen_ai_client(self, request_ix, model_settings) -> None: ...
async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ...
```
`AzureOpenAI` translates provider-neutral image parts to OpenAI's `image_url` content format.
It caches one `AsyncOpenAI` instance per `(base_url, api_key)` pair.
---
## 8. Step 4 — ResponseHandlerStep
**Purpose:** Shape the final payload before handing it to the transport layer.
### 8.1 validate()
Always returns `True`.
### 8.2 process()
1. **Attach OCR text** (`include_ocr_text = True`): concatenate all line texts per page,
join pages with `\n\n`, store in `ocr_result.result.text`.
2. **Strip geometries** (`include_geometries = False`, default): clear `ocr_result.result.pages`
and `ocr_result.meta_data`. The structured page data is large; callers who don't need it
should not receive it.
3. **Trim context**: delete `response_ix.context` entirely. The mutable internal context is
never serialised.
---
## 9. Provenance subsystem
### 9.1 SegmentIndex
Built from `ocr_result` after OCR completes. It assigns each non-tag OCR line a unique ID:
```
p{global_page_position}_l{line_index_within_page}
```
- `global_page_position` is 1-based and refers to the **flat pages list position**, not the
page number printed in the document. This guarantees uniqueness across multi-document requests.
- Page tag lines (`<page >` / `</page>`) are explicitly excluded.
Internal storage:
```python
_id_to_position: Dict[str, Dict] = {
"p1_l0": {
"page": 1, # global 1-based position
"bbox": BoundingBox(...), # normalised 0-1 coordinates
"text": "Invoice Number: INV-001",
"file_index": 0, # 0-based index into request.context.files
},
...
}
_ordered_ids: List[str] # insertion order, for deterministic prompt text
```
**Bounding box normalisation:**
```python
# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height)
[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H]
```
Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering.
**Prompt format** (`SegmentIndex.to_prompt_text()`):
```
[p1_l0] Invoice Number: INV-001
[p1_l1] Date: 2024-01-15
[p1_l2] Total: CHF 1234.00
```
Extra plain texts (from `context.texts`) are appended untagged.
**O(1) lookup:**
```python
position = segment_index.lookup_segment("p1_l0") # None if unknown
```
### 9.2 SegmentCitation (LLM output model)
The LLM is instructed to populate `segment_citations` in its structured output:
```python
class SegmentCitation(BaseModel):
field_path: str # e.g. "result.invoice_number"
value_segment_ids: List[str] # segments containing the extracted value
context_segment_ids: List[str] # surrounding label/anchor segments (may be empty)
```
**System prompt addition (injected by GenAIStep when provenance is on):**
```
For each extracted field, you must also populate the `segment_citations` list.
Each entry maps one field to the document segments that were its source.
Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number').
Use two separate segment ID lists:
- `value_segment_ids`: segment IDs whose text directly contains the extracted value
(e.g. ['p1_l4'] for the line containing 'INV-001').
- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you
identify the field but does not contain the value itself
(e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label.
Only use segment IDs that appear in the document text.
Omit fields for which you cannot identify a source segment.
```
### 9.3 ProvenanceData (response model)
```python
class ProvenanceData(BaseModel):
fields: Dict[str, FieldProvenance] # field_path → provenance
quality_metrics: Dict[str, Any] # see below
segment_count: Optional[int]
granularity: Optional[str] # "line" or "word"
class FieldProvenance(BaseModel):
field_name: str # last segment of field_path
field_path: str # e.g. "result.invoice_number"
value: Any # actual extracted value (resolved from ix_result.result)
sources: List[ExtractionSource]
confidence: Optional[float] # None today; reserved for future
class ExtractionSource(BaseModel):
page_number: int # global 1-based position
file_index: Optional[int] # 0-based index into request.context.files
bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False
text_snippet: str # actual OCR text at this location
relevance_score: float # always 1.0 (exact segment ID match)
segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering
class BoundingBox(BaseModel):
coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised
```
**quality_metrics:**
```json
{
"fields_with_provenance": 8,
"total_fields": 10,
"coverage_rate": 0.8,
"invalid_references": 2
}
```
- `coverage_rate` = fields with at least one source ÷ total leaf fields in result
- `invalid_references` = segment IDs cited by LLM that were not found in SegmentIndex
### 9.4 Provenance resolution algorithm
```python
for citation in segment_citations:
if source_type == VALUE:
seg_ids = citation.value_segment_ids
else:
seg_ids = citation.value_segment_ids + citation.context_segment_ids
sources = []
for seg_id in seg_ids[:max_sources_per_field]:
pos = segment_index.lookup_segment(seg_id)
if pos is None:
invalid_ref_count += 1
continue
sources.append(ExtractionSource(
page_number=pos["page"],
file_index=pos["file_index"],
bounding_box=pos["bbox"] if include_bounding_boxes else None,
text_snippet=pos["text"],
relevance_score=1.0,
segment_id=seg_id,
))
if not sources:
continue # skip fields that resolve to nothing
# Resolve value: traverse result dict via dot-notation field_path
# e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"]
value = resolve_nested_path(extraction_result, citation.field_path)
provenance[citation.field_path] = FieldProvenance(
field_name=citation.field_path.split(".")[-1],
field_path=citation.field_path,
value=value,
sources=sources,
confidence=None,
)
```
Array notation is normalised before traversal: `"items[0].name"``"items.0.name"`.
---
## 10. Use-case definition contract
A use case is a Python module (or dynamically loaded class) that defines two Pydantic models.
The Config Server serves them by name; the pipeline loads them via `config_server.use_cases.load_and_validate_use_case(use_case)`.
### UseCaseRequest
```python
class UseCaseRequest(BaseModel):
use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name
system_prompt: str # System prompt sent to the LLM
# Legacy: prompt_template_base (deprecated alias for system_prompt)
# Legacy: kwargs_use_case (deprecated; triggers format(**kwargs))
```
### UseCaseResponse
A Pydantic model whose fields define the extraction schema. The LLM is instructed to return
JSON matching this schema (via `chat.completions.parse(response_format=UseCaseResponse)`).
**Example:**
```python
class InvoiceResponse(BaseModel):
invoice_number: str
invoice_date: str
total_amount: float
line_items: List[LineItem]
class LineItem(BaseModel):
description: str
quantity: int
unit_price: float
```
The schema is passed as the `response_format` argument to the OpenAI structured-output API.
When provenance is on, it is wrapped in `ProvenanceWrappedResponse` at runtime (§7.2e).
---
## 11. LLM invocation contract
### 11.1 Provider-neutral request_kwargs
```python
request_kwargs = {
"model": str, # deployment name
"temperature": Optional[float], # from model_settings; None for reasoning models
"messages": [
{"role": "system", "content": str},
{"role": "user", "content": List[ContentPart]},
],
"reasoning_effort": Optional[str], # only present when supported by model
}
ContentPart = (
{"type": "text", "text": str}
| {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str}
)
```
### 11.2 GenAIInvocationResult
```python
@dataclass
class GenAIInvocationResult:
parsed: Any # Parsed Pydantic model instance (matches response_schema)
usage: GenAIUsage # prompt_tokens, completion_tokens
model_name: str
```
### 11.3 Model settings (from Config Server)
```json
{
"provider": "azure",
"temperature": 0.0,
"vision_capability": true,
"supports_reasoning_efforts": ["low", "medium", "high"],
"default_reasoning_effort": "medium"
}
```
- `temperature` is `None` for reasoning models (OpenAI ignores it and it must be omitted).
- `vision_capability` must be `true` for `use_vision = True` requests to be accepted.
- `supports_reasoning_efforts` is the allowlist; requests for unsupported values fall back to
`default_reasoning_effort` with a warning.
---
## 12. Error model
### 12.1 IXException
All domain errors are wrapped in `IXException(message_or_Error_enum)`. The pipeline catches
any exception from `validate()` or `process()`, writes `str(e)` to `response_ix.error`, and
stops.
### 12.2 Named error codes
| Code | Trigger |
|---|---|
| `IX_000_000` | `request_ix` is `None` |
| `IX_000_002` | No context (neither files nor texts) |
| `IX_000_004` | OCR required but no files provided |
| `IX_000_005` | File MIME type not supported by chosen OCR service |
| `IX_001_000` | `use_case` is empty or missing |
| `IX_001_001` | Use case could not be loaded or validated |
| `IX_003_003` | `use_vision=True` but model has no vision capability |
| `IX_004_001` | Response too large for transport (e.g. Kafka max message size) |
### 12.3 Warnings
Non-fatal issues are appended to `response_ix.warning` and logged. Examples:
- Vision requested but no renderable documents found
- Image scale capped due to pixel limit
- Provenance requested with `use_ocr=False`
- Word document contains embedded images that will be skipped
- `use_case_name` not set in use-case definition
---
## 13. Configuration (AppConfig)
All settings are loaded from environment variables (with `.env` fallback via `pydantic-settings`).
| Category | Key env vars |
|---|---|
| Azure OCR | `DOCUMENT_INTELLIGENCE_ENDPOINT`, `DOCUMENT_INTELLIGENCE_KEY`, `COMPUTER_VISION_ENDPOINT`, `COMPUTER_VISION_SUBSCRIPTION_KEY` |
| Azure OpenAI | `OPENAI_BASE_URL`, `OPENAI_API_KEY` |
| LLM defaults | `DEFAULT_LLM` (default: `gpt-4o`) |
| Config Server | `CONFIG_SERVER_BASE_URL` (default: `http://ix-config:8998/`) |
| Pipeline | `PIPELINE_WORKER_CONCURRENCY` (110), `PIPELINE_REQUEST_TIMEOUT_SECONDS` (default: 2700) |
| Rendering | `RENDER_MAX_PIXELS_PER_PAGE` (default: 75,000,000) |
| AWS | `S3_BUCKET`, `S3_OBJECT_PREFIX`, `AWS_REGION`, `TARGET_ROLE` |
| Kafka | `PROCESSING_TOPIC`, `KAFKA_BOOTSTRAP_SERVER`, … |
| Observability | `LOG_LEVEL`, `OTEL_EXPORTER_OTLP_ENDPOINT`, `PYROSCOPE_SERVER_ADDRESS` |
---
## 14. Observability
### 14.1 Logging
Every log call carries `ix_id` as contextual metadata. The custom logger exposes:
```python
logger.info_ix(msg, request_ix)
logger.warning_ix(msg, request_ix)
logger.error_ix(msg, request_ix)
```
If `ix_id` is not yet set on `request_ix`, the logger generates one lazily on first use.
Step timing is recorded via a `Timer` context manager; results are stored in
`response_ix.metadata.timings`.
### 14.2 Tracing (OpenTelemetry)
- **Pipeline span** wraps the entire `_execute_pipeline()` call (attributes: `request_id`, `use_case`, `client_id`).
- **Step span** wraps each individual step.
- Errors are recorded on the span with `mark_failed(error_message)`.
- Exported via gRPC to `OTEL_EXPORTER_OTLP_ENDPOINT`.
### 14.3 Metrics (Prometheus)
| Metric | Labels |
|---|---|
| `pipeline_requests_total` | `client_id` |
| LLM token counter | `model`, `client_id`, `use_case`, `status` |
| LLM latency histogram | `model`, `client_id` |
| Extraction quality | `client_id`, `use_case`, `model`, `reasoning_effort` |
Exposed on the health port (default 9009).
---
## 15. Transport adapter contract
The pipeline is transport-agnostic. Any delivery mechanism must:
1. **Deserialise** the incoming payload into a `RequestIX` instance.
2. **Assign `ix_id`** (a random 16-character hex string) if not already present.
3. **Call** `await Pipeline().start(request_ix)``response_ix`.
4. **Serialise** `response_ix` (use `.model_dump(by_alias=True)` to honour `serialization_alias` on `metadata`).
5. **Deliver** the serialised response to the caller via the appropriate channel.
Optionally:
- **Persist** `request_ix` and `response_ix` to a durable store (S3, database) keyed by `ix_id`.
- **Apply a timeout**: wrap the `await pipeline.start(...)` call and write a timeout error to `response_ix.error` on expiry.
### Kafka adapter (existing)
- Consumes from `ch-infoxtractor.processing.v2`; produces to `ch-infoxtractor.outbound.v2` and client-specific topics.
- Concurrency controlled via asyncio semaphore (`PIPELINE_WORKER_CONCURRENCY`).
- Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets).
- Graceful shutdown: drains in-flight messages before stopping.
### FastAPI adapter (example)
```python
@router.post("/extract")
async def extract(body: RequestIX) -> ResponseIX:
body.ix_id = body.ix_id or secrets.token_hex(8)
response = await pipeline.start(body)
return response
```
### Database adapter (example)
```python
async def process_job(job_id: str, db: AsyncSession):
row = await db.get(Job, job_id)
request_ix = RequestIX.model_validate_json(row.payload)
request_ix.ix_id = row.ix_id
response_ix = await pipeline.start(request_ix)
row.result = response_ix.model_dump_json(by_alias=True)
row.status = "done" if not response_ix.error else "error"
await db.commit()
```
---
## 16. End-to-end data flow summary
```
[Transport layer]
↓ RequestIX (use_case, context.files/texts, options, ix_id)
[Pipeline.start()]
[SetupStep]
• Download files → local paths → response_ix.context.files (File objects)
• Load use case from Config Server → response_ix.context.use_case_request/response
• Split PDFs / load images / wrap texts → response_ix.context.pages (flat list)
[OCRStep] (skipped if no files or use_ocr=False)
• Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox)
• Inject <page> XML markers → lines prepended/appended in ocr_result.pages
• Build SegmentIndex (if provenance) → response_ix.context.segment_index
[GenAIStep] (skipped if ocr_only=True)
• Build system prompt (+ citation hint if provenance)
• Build user content: segment-tagged text OR plain OCR text + optional base64 images
• Determine response schema (wrapped if provenance)
• Call LLM via GenAIClient.invoke()
• Write ix_result.result + meta_data
• Resolve segment citations → ProvenanceData (if provenance)
[ResponseHandlerStep] (always runs)
• Attach flat OCR text (if include_ocr_text)
• Strip bounding boxes (if not include_geometries)
• Delete response_ix.context (never serialised)
[ResponseIX]
ix_result.result ← structured extraction matching UseCaseResponse schema
ocr_result ← OCR details (empty pages unless include_geometries)
provenance ← field-level source map (None unless include_provenance)
error / warning ← diagnostics
metadata ← timings, hostname
[Transport layer]
Serialise → deliver to caller
Persist (optional) → keyed by ix_id
```
---
## 17. Key design decisions
| Decision | Rationale |
|---|---|
| **Two-method Step interface** (`validate` + `process`) | Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop |
| **Mutable `context` on ResponseIX** | Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; `context` is excluded from serialisation so it never leaks |
| **Single LLM call for provenance** | The schema is wrapped at runtime to add `segment_citations`; no second round-trip to the LLM is needed |
| **Segment IDs instead of fuzzy text matching** | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates |
| **Global page position for segment IDs** | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce `p1_l*` and `p2_l*`, not the same IDs) |
| **Provider-neutral `request_kwargs`** | The `GenAIClient` interface accepts a dict; provider-specific translation (e.g. OpenAI `image_url` format) happens inside each adapter, not in the step |
| **Normalised bounding boxes (01)** | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates |
| **Transport-agnostic pipeline** | Kafka, FastAPI, and DB adapters all call the same `Pipeline.start()` |
| **Config Server for use cases** | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline |