Establishes ix as an async, on-prem, LLM-powered structured extraction microservice. Full reference spec stays in docs/spec-core-pipeline.md; MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue transports in parallel, in-repo use cases, provenance-based reliability signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md. First use case: bank_statement_header (feeds mammon's needs_parser flow). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
32 KiB
Core Pipeline Specification — InfoXtractor
1. What the system does
InfoXtractor is an async, LLM-powered document extraction pipeline. Given one or more documents (PDFs, images) or plain text and a use case that defines what to extract, it returns a structured JSON result whose shape matches the use-case schema.
Optional capabilities that are composed at runtime:
| Capability | Trigger | What it adds |
|---|---|---|
| OCR | options.ocr.use_ocr = true (default) |
Converts document pages to text via cloud OCR before passing to the LLM |
| Vision | options.gen_ai.use_vision = true |
Also passes page images to the LLM as base64 |
| Provenance | options.provenance.include_provenance = true |
Each extracted field is annotated with the page number, bounding box, and text snippet it came from |
| OCR-only | options.ocr.ocr_only = true |
Returns OCR result without calling the LLM |
2. Top-level data contracts
2.1 RequestIX — pipeline input
class RequestIX:
use_case: str # Name, URL, or Base64-encoded use-case definition
ix_client_id: str # Client identifier (used for auth + metrics)
request_id: str # Correlation key set by the caller; echoed in response
ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller
context: Context # What to process
options: Options # How to process it
version: Optional[int] # Schema version (default 2)
class Context:
files: List[str] # URLs to PDFs or images (S3, HTTP)
texts: List[str] # Raw text strings (treated as pages)
class Options:
ocr: OCROptions
gen_ai: GenAIOptions
provenance: ProvenanceOptions
OCROptions
| Field | Type | Default | Meaning |
|---|---|---|---|
use_ocr |
bool | True |
Run cloud OCR on files |
ocr_only |
bool | False |
Stop after OCR; skip LLM |
service |
enum | document_intelligence |
Which OCR backend (document_intelligence / computer_vision) |
include_geometries |
bool | False |
Include raw bounding boxes in ocr_result |
include_ocr_text |
bool | False |
Attach flat OCR text to ocr_result.result.text |
include_page_tags |
bool | True |
Inject <page file="…" number="…"> XML markers into OCR lines |
computer_vision_scaling_factor |
int 1–10 | 1 |
Scale multiplier for CV-based OCR |
GenAIOptions
| Field | Type | Default | Meaning |
|---|---|---|---|
gen_ai_model_name |
str | gpt-4o (from config) |
LLM deployment name |
use_vision |
bool | False |
Include page images in the LLM message |
vision_scaling_factor |
int 1–10 | 1 |
Scale factor for rendered images |
vision_detail |
enum | auto |
auto / low / high (OpenAI vision detail) |
reasoning_effort |
Optional[str] | None |
Passed to reasoning models (e.g. medium, high) |
ProvenanceOptions
| Field | Type | Default | Meaning |
|---|---|---|---|
include_provenance |
bool | False |
Enable provenance tracking |
granularity |
enum | line |
line (only supported today) or word |
include_bounding_boxes |
bool | True |
Attach normalised coordinates to each source |
max_sources_per_field |
int ≥1 | 10 |
Cap on sources returned per field |
source_type |
enum | value_and_context |
value: value segments only; value_and_context: value + label segments |
min_confidence |
float 0–1 | 0.0 |
Minimum per-field confidence (currently a no-op; hook for future) |
2.2 ResponseIX — pipeline output
class ResponseIX:
use_case: Optional[str] # Echoed from RequestIX.use_case
use_case_name: Optional[str] # Clean name from use-case definition
ix_client_id: Optional[str]
request_id: Optional[str]
ix_id: Optional[str]
error: Optional[str] # Non-None means processing failed; pipeline stopped
warning: List[str] # Non-fatal issues encountered during processing
ix_result: IXResult # LLM extraction result
ocr_result: OCRResult # OCR output (may be empty if OCR skipped)
provenance: Optional[ProvenanceData] # Field-level source tracking
metadata: Metadata # Timings, hostname
class IXResult:
result: dict # Matches the use-case response schema
result_confidence: dict # Reserved; currently empty
meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name
class OCRResult:
result: OCRDetails # pages with lines + bounding boxes
meta_data: dict
class OCRDetails:
text: Optional[str] # Flat text (populated only if include_ocr_text=True)
pages: List[Page] # Structured OCR pages
class Page:
page_no: int
width: float # Page width in points
height: float # Page height in points
angle: float # Skew angle
unit: Optional[str]
lines: List[Line]
class Line:
text: Optional[str]
bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels)
words: Words
class Metadata:
timings: List[Dict] # Step-level timing records
processed_by: Optional[str] # Hostname
use_case_truncated: Optional[bool]
ResponseIX.context is an internal-only mutable object that accumulates data as it passes
through pipeline steps. It is excluded from serialisation (exclude=True) and must be
stripped before the response is sent to any external consumer.
3. The Step interface
Every pipeline stage implements a two-method async interface:
class Step(ABC):
@abstractmethod
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
"""
Returns True → run process()
Returns False → silently skip this step
Raises → set ResponseIX.error and abort the pipeline
"""
@abstractmethod
async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX:
"""Execute the step; mutate response_ix; return it."""
Key invariants:
- Both
validateandprocessreceive the samerequest_ixandresponse_ixobjects. processreceives theresponse_ixas enriched by all previous steps (viacontext).- If
validateraises, the error string is written toresponse_ix.errorand no further steps run. - If
processraises, same: error is captured and pipeline stops. - A step that returns
Falsefromvalidateleavesresponse_ixunchanged and the pipeline continues.
4. Pipeline orchestration (Pipeline)
class Pipeline:
steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()]
debug_mode: bool # Enables memory-usage logging and GC inspection
async def start(self, request_ix: RequestIX) -> ResponseIX:
# Wraps _execute_pipeline in an observability span
...
async def _execute_pipeline(self, request_ix, obs) -> ResponseIX:
response_ix = ResponseIX()
for step in self.steps:
# wrapped in a step-level observability span + Timer
success, response_ix = await self._execute_step(step, request_ix, response_ix)
if not success:
return response_ix # abort on first error
return response_ix
async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX):
valid = await step.validate(request_ix, response_ix)
if valid:
response_ix = await step.process(request_ix, response_ix)
return True, response_ix
# exceptions caught → response_ix.error set → return False
The pipeline itself holds no I/O references — it does not know about Kafka, HTTP, or
databases. The transport layer instantiates Pipeline, calls await pipeline.start(request_ix),
and handles routing the returned ResponseIX.
5. Step 1 — SetupStep
Purpose: Validate the request, fetch external files, load the use-case definition, and
convert all inputs into an internal pages representation for downstream steps.
5.1 validate()
Raises if:
request_ixisNone- Both
context.filesandcontext.textsare empty
5.2 process()
a) Copy text context
response_ix.context.texts = list(request_ix.context.texts)
b) Download files (parallel, async)
Each URL in request_ix.context.files is downloaded to a local temp path (/tmp/ix/).
The result is a list of File(file_path, mime_type) objects stored in
response_ix.context.files.
Supported MIME types depend on the chosen OCR service; unsupported files raise immediately.
c) Load use case
The use case is fetched from the Config Server (see §9) by name, URL, or Base64.
On success, the two classes UseCaseRequest and UseCaseResponse are stored in
response_ix.context.use_case_request and response_ix.context.use_case_response.
use_case_name is extracted from UseCaseRequest().use_case_name and stored in
response_ix.use_case_name for downstream steps and the final response.
d) Build pages (parallel per document)
The ingestor registers every file and classifies its MIME type. Rendering is done per document:
- PDF (via PyMuPDF): one
Pageobject per PDF page, holdingfile_path,item_index(0-based index intorequest_ix.context.files),page_no,width,height. - Image (via PIL): one
Pageper frame. Multi-frame TIFFs produce multiple pages. - Text: one
Pageper entry incontext.texts.
Maximum pages per PDF: 100 (hard cap; raises on violation).
All pages are concatenated into a flat list stored in response_ix.context.pages.
A DocumentIngestor instance is stored in response_ix.context.ingestion so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning).
6. Step 2 — OCRStep
Purpose: Convert document pages to structured text with bounding boxes. Optionally build
a SegmentIndex for provenance tracking.
6.1 validate()
Returns True when any of these flags are set AND response_ix.context.files is non-empty:
use_ocr = Trueinclude_geometries = Trueinclude_ocr_text = Trueocr_only = True
If none of the above apply (text-only request), returns False → step is skipped.
Raises IX_000_004 if ocr_only, include_geometries, or include_ocr_text is requested but
no files were provided.
6.2 process()
a) Select OCR service
service == "document_intelligence" → AzureDocumentIntelligence
service == "computer_vision" → AzureComputerVision
b) perform_ocr(request_ix, response_ix) (async, runs against all pages)
Populates response_ix.ocr_result with an OCRResult containing pages with lines and
raw bounding boxes (8-coordinate polygons, pixel coordinates).
c) Inject page tags (when include_page_tags = True)
For each OCR page a synthetic Line is prepended and appended:
<page file="{item_index}" number="{page_no}">
… original lines …
</page>
These tags are visible to the LLM as structural markers. The SegmentIndex builder
explicitly skips them.
d) Build SegmentIndex (when include_provenance = True)
response_ix.context.segment_index = await SegmentIndex.build(
response_ix.ocr_result,
granularity, # line (default) or word
pages_metadata=response_ix.context.pages,
)
See §8 for the full SegmentIndex specification.
7. Step 3 — GenAIStep
Purpose: Call the LLM with the prepared context and parse its structured output into
response_ix.ix_result. Optionally map the LLM's segment citations to bounding boxes.
7.1 validate()
- Returns
Falsewhenocr_only = True. - Raises when:
use_caseis emptyuse_ocr = Truebut no OCR text was produced- neither
use_ocrnoruse_visionnor plain text context is available - model is not found in Config Server
use_vision = Truebut the model does not support vision
If include_provenance = True but use_ocr = False, a warning is added (provenance will be
empty) but processing continues.
7.2 process()
a) Prepare system prompt
prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base
If include_provenance = True, append the segment-citation instruction (see §8.2).
b) Build text context
When provenance is on and a SegmentIndex exists:
[p1_l0] Line text
[p1_l1] Next line
…
Otherwise: join OCR page lines with \n\n between pages, then append any raw texts.
c) Build visual context (when use_vision = True)
For each page:
- Render to PIL image at
vision_scaling_factor × original_size(capped byrender_max_pixels_per_page). - Convert TIFF frames to JPEG.
- Base64-encode.
- Wrap as a provider-neutral image part:
{"type": "image", "media_type": "image/jpeg", "data": "<base64>", "detail": "auto"}
d) Assemble LLM call parameters
request_kwargs = {
"model": model_name,
"temperature": from_model_settings,
"messages": [
{"role": "system", "content": prompt_string},
{"role": "user", "content": [text_part, *image_parts]},
],
# "reasoning_effort": "medium" ← only for reasoning-capable models
}
e) Determine response schema
- When
include_provenance = False: schema isUseCaseResponsedirectly. - When
include_provenance = True: a dynamic wrapper is created at runtime:
ProvenanceWrappedResponse = create_model(
"ProvenanceWrappedResponse",
result=(UseCaseResponse, Field(...)),
segment_citations=(List[SegmentCitation], Field(default_factory=list)),
)
The use-case file is never modified; the wrapper is ephemeral.
f) Invoke LLM
result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings)
# result.parsed → the Pydantic model instance
# result.usage → GenAIUsage(prompt_tokens, completion_tokens)
# result.model_name → string
g) Write results
response_ix.ix_result.result = parsed.result.model_dump()(provenance mode) orparsed.model_dump()(normal mode)response_ix.ix_result.meta_data = {token_usage, model_name}
h) Build provenance (when include_provenance = True)
response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance(
extraction_result={"result": ix_result},
segment_citations=parsed.segment_citations,
segment_index=response_ix.context.segment_index,
max_sources_per_field=opts.max_sources_per_field,
min_confidence=opts.min_confidence,
include_bounding_boxes=opts.include_bounding_boxes,
source_type=opts.source_type,
)
See §8.4 for the provenance resolution algorithm.
7.3 Provider routing
provider = model_settings["provider"] # "azure" or "bedrock"
if provider == "bedrock": service = AWSGenAI()
else: service = AzureOpenAI()
The GenAIClient interface is:
class GenAIClient(ABC):
def setup_gen_ai_client(self, request_ix, model_settings) -> None: ...
async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ...
AzureOpenAI translates provider-neutral image parts to OpenAI's image_url content format.
It caches one AsyncOpenAI instance per (base_url, api_key) pair.
8. Step 4 — ResponseHandlerStep
Purpose: Shape the final payload before handing it to the transport layer.
8.1 validate()
Always returns True.
8.2 process()
-
Attach OCR text (
include_ocr_text = True): concatenate all line texts per page, join pages with\n\n, store inocr_result.result.text. -
Strip geometries (
include_geometries = False, default): clearocr_result.result.pagesandocr_result.meta_data. The structured page data is large; callers who don't need it should not receive it. -
Trim context: delete
response_ix.contextentirely. The mutable internal context is never serialised.
9. Provenance subsystem
9.1 SegmentIndex
Built from ocr_result after OCR completes. It assigns each non-tag OCR line a unique ID:
p{global_page_position}_l{line_index_within_page}
global_page_positionis 1-based and refers to the flat pages list position, not the page number printed in the document. This guarantees uniqueness across multi-document requests.- Page tag lines (
<page …>/</page>) are explicitly excluded.
Internal storage:
_id_to_position: Dict[str, Dict] = {
"p1_l0": {
"page": 1, # global 1-based position
"bbox": BoundingBox(...), # normalised 0-1 coordinates
"text": "Invoice Number: INV-001",
"file_index": 0, # 0-based index into request.context.files
},
...
}
_ordered_ids: List[str] # insertion order, for deterministic prompt text
Bounding box normalisation:
# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height)
[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H]
Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering.
Prompt format (SegmentIndex.to_prompt_text()):
[p1_l0] Invoice Number: INV-001
[p1_l1] Date: 2024-01-15
[p1_l2] Total: CHF 1234.00
…
Extra plain texts (from context.texts) are appended untagged.
O(1) lookup:
position = segment_index.lookup_segment("p1_l0") # None if unknown
9.2 SegmentCitation (LLM output model)
The LLM is instructed to populate segment_citations in its structured output:
class SegmentCitation(BaseModel):
field_path: str # e.g. "result.invoice_number"
value_segment_ids: List[str] # segments containing the extracted value
context_segment_ids: List[str] # surrounding label/anchor segments (may be empty)
System prompt addition (injected by GenAIStep when provenance is on):
For each extracted field, you must also populate the `segment_citations` list.
Each entry maps one field to the document segments that were its source.
Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number').
Use two separate segment ID lists:
- `value_segment_ids`: segment IDs whose text directly contains the extracted value
(e.g. ['p1_l4'] for the line containing 'INV-001').
- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you
identify the field but does not contain the value itself
(e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label.
Only use segment IDs that appear in the document text.
Omit fields for which you cannot identify a source segment.
9.3 ProvenanceData (response model)
class ProvenanceData(BaseModel):
fields: Dict[str, FieldProvenance] # field_path → provenance
quality_metrics: Dict[str, Any] # see below
segment_count: Optional[int]
granularity: Optional[str] # "line" or "word"
class FieldProvenance(BaseModel):
field_name: str # last segment of field_path
field_path: str # e.g. "result.invoice_number"
value: Any # actual extracted value (resolved from ix_result.result)
sources: List[ExtractionSource]
confidence: Optional[float] # None today; reserved for future
class ExtractionSource(BaseModel):
page_number: int # global 1-based position
file_index: Optional[int] # 0-based index into request.context.files
bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False
text_snippet: str # actual OCR text at this location
relevance_score: float # always 1.0 (exact segment ID match)
segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering
class BoundingBox(BaseModel):
coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised
quality_metrics:
{
"fields_with_provenance": 8,
"total_fields": 10,
"coverage_rate": 0.8,
"invalid_references": 2
}
coverage_rate= fields with at least one source ÷ total leaf fields in resultinvalid_references= segment IDs cited by LLM that were not found in SegmentIndex
9.4 Provenance resolution algorithm
for citation in segment_citations:
if source_type == VALUE:
seg_ids = citation.value_segment_ids
else:
seg_ids = citation.value_segment_ids + citation.context_segment_ids
sources = []
for seg_id in seg_ids[:max_sources_per_field]:
pos = segment_index.lookup_segment(seg_id)
if pos is None:
invalid_ref_count += 1
continue
sources.append(ExtractionSource(
page_number=pos["page"],
file_index=pos["file_index"],
bounding_box=pos["bbox"] if include_bounding_boxes else None,
text_snippet=pos["text"],
relevance_score=1.0,
segment_id=seg_id,
))
if not sources:
continue # skip fields that resolve to nothing
# Resolve value: traverse result dict via dot-notation field_path
# e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"]
value = resolve_nested_path(extraction_result, citation.field_path)
provenance[citation.field_path] = FieldProvenance(
field_name=citation.field_path.split(".")[-1],
field_path=citation.field_path,
value=value,
sources=sources,
confidence=None,
)
Array notation is normalised before traversal: "items[0].name" → "items.0.name".
10. Use-case definition contract
A use case is a Python module (or dynamically loaded class) that defines two Pydantic models.
The Config Server serves them by name; the pipeline loads them via config_server.use_cases.load_and_validate_use_case(use_case).
UseCaseRequest
class UseCaseRequest(BaseModel):
use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name
system_prompt: str # System prompt sent to the LLM
# Legacy: prompt_template_base (deprecated alias for system_prompt)
# Legacy: kwargs_use_case (deprecated; triggers format(**kwargs))
UseCaseResponse
A Pydantic model whose fields define the extraction schema. The LLM is instructed to return
JSON matching this schema (via chat.completions.parse(response_format=UseCaseResponse)).
Example:
class InvoiceResponse(BaseModel):
invoice_number: str
invoice_date: str
total_amount: float
line_items: List[LineItem]
class LineItem(BaseModel):
description: str
quantity: int
unit_price: float
The schema is passed as the response_format argument to the OpenAI structured-output API.
When provenance is on, it is wrapped in ProvenanceWrappedResponse at runtime (§7.2e).
11. LLM invocation contract
11.1 Provider-neutral request_kwargs
request_kwargs = {
"model": str, # deployment name
"temperature": Optional[float], # from model_settings; None for reasoning models
"messages": [
{"role": "system", "content": str},
{"role": "user", "content": List[ContentPart]},
],
"reasoning_effort": Optional[str], # only present when supported by model
}
ContentPart = (
{"type": "text", "text": str}
| {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str}
)
11.2 GenAIInvocationResult
@dataclass
class GenAIInvocationResult:
parsed: Any # Parsed Pydantic model instance (matches response_schema)
usage: GenAIUsage # prompt_tokens, completion_tokens
model_name: str
11.3 Model settings (from Config Server)
{
"provider": "azure",
"temperature": 0.0,
"vision_capability": true,
"supports_reasoning_efforts": ["low", "medium", "high"],
"default_reasoning_effort": "medium"
}
temperatureisNonefor reasoning models (OpenAI ignores it and it must be omitted).vision_capabilitymust betrueforuse_vision = Truerequests to be accepted.supports_reasoning_effortsis the allowlist; requests for unsupported values fall back todefault_reasoning_effortwith a warning.
12. Error model
12.1 IXException
All domain errors are wrapped in IXException(message_or_Error_enum). The pipeline catches
any exception from validate() or process(), writes str(e) to response_ix.error, and
stops.
12.2 Named error codes
| Code | Trigger |
|---|---|
IX_000_000 |
request_ix is None |
IX_000_002 |
No context (neither files nor texts) |
IX_000_004 |
OCR required but no files provided |
IX_000_005 |
File MIME type not supported by chosen OCR service |
IX_001_000 |
use_case is empty or missing |
IX_001_001 |
Use case could not be loaded or validated |
IX_003_003 |
use_vision=True but model has no vision capability |
IX_004_001 |
Response too large for transport (e.g. Kafka max message size) |
12.3 Warnings
Non-fatal issues are appended to response_ix.warning and logged. Examples:
- Vision requested but no renderable documents found
- Image scale capped due to pixel limit
- Provenance requested with
use_ocr=False - Word document contains embedded images that will be skipped
use_case_namenot set in use-case definition
13. Configuration (AppConfig)
All settings are loaded from environment variables (with .env fallback via pydantic-settings).
| Category | Key env vars |
|---|---|
| Azure OCR | DOCUMENT_INTELLIGENCE_ENDPOINT, DOCUMENT_INTELLIGENCE_KEY, COMPUTER_VISION_ENDPOINT, COMPUTER_VISION_SUBSCRIPTION_KEY |
| Azure OpenAI | OPENAI_BASE_URL, OPENAI_API_KEY |
| LLM defaults | DEFAULT_LLM (default: gpt-4o) |
| Config Server | CONFIG_SERVER_BASE_URL (default: http://ix-config:8998/) |
| Pipeline | PIPELINE_WORKER_CONCURRENCY (1–10), PIPELINE_REQUEST_TIMEOUT_SECONDS (default: 2700) |
| Rendering | RENDER_MAX_PIXELS_PER_PAGE (default: 75,000,000) |
| AWS | S3_BUCKET, S3_OBJECT_PREFIX, AWS_REGION, TARGET_ROLE |
| Kafka | PROCESSING_TOPIC, KAFKA_BOOTSTRAP_SERVER, … |
| Observability | LOG_LEVEL, OTEL_EXPORTER_OTLP_ENDPOINT, PYROSCOPE_SERVER_ADDRESS |
14. Observability
14.1 Logging
Every log call carries ix_id as contextual metadata. The custom logger exposes:
logger.info_ix(msg, request_ix)
logger.warning_ix(msg, request_ix)
logger.error_ix(msg, request_ix)
If ix_id is not yet set on request_ix, the logger generates one lazily on first use.
Step timing is recorded via a Timer context manager; results are stored in
response_ix.metadata.timings.
14.2 Tracing (OpenTelemetry)
- Pipeline span wraps the entire
_execute_pipeline()call (attributes:request_id,use_case,client_id). - Step span wraps each individual step.
- Errors are recorded on the span with
mark_failed(error_message). - Exported via gRPC to
OTEL_EXPORTER_OTLP_ENDPOINT.
14.3 Metrics (Prometheus)
| Metric | Labels |
|---|---|
pipeline_requests_total |
client_id |
| LLM token counter | model, client_id, use_case, status |
| LLM latency histogram | model, client_id |
| Extraction quality | client_id, use_case, model, reasoning_effort |
Exposed on the health port (default 9009).
15. Transport adapter contract
The pipeline is transport-agnostic. Any delivery mechanism must:
- Deserialise the incoming payload into a
RequestIXinstance. - Assign
ix_id(a random 16-character hex string) if not already present. - Call
await Pipeline().start(request_ix)→response_ix. - Serialise
response_ix(use.model_dump(by_alias=True)to honourserialization_aliasonmetadata). - Deliver the serialised response to the caller via the appropriate channel.
Optionally:
- Persist
request_ixandresponse_ixto a durable store (S3, database) keyed byix_id. - Apply a timeout: wrap the
await pipeline.start(...)call and write a timeout error toresponse_ix.erroron expiry.
Kafka adapter (existing)
- Consumes from
ch-infoxtractor.processing.v2; produces toch-infoxtractor.outbound.v2and client-specific topics. - Concurrency controlled via asyncio semaphore (
PIPELINE_WORKER_CONCURRENCY). - Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets).
- Graceful shutdown: drains in-flight messages before stopping.
FastAPI adapter (example)
@router.post("/extract")
async def extract(body: RequestIX) -> ResponseIX:
body.ix_id = body.ix_id or secrets.token_hex(8)
response = await pipeline.start(body)
return response
Database adapter (example)
async def process_job(job_id: str, db: AsyncSession):
row = await db.get(Job, job_id)
request_ix = RequestIX.model_validate_json(row.payload)
request_ix.ix_id = row.ix_id
response_ix = await pipeline.start(request_ix)
row.result = response_ix.model_dump_json(by_alias=True)
row.status = "done" if not response_ix.error else "error"
await db.commit()
16. End-to-end data flow summary
[Transport layer]
↓ RequestIX (use_case, context.files/texts, options, ix_id)
↓
[Pipeline.start()]
↓
[SetupStep]
• Download files → local paths → response_ix.context.files (File objects)
• Load use case from Config Server → response_ix.context.use_case_request/response
• Split PDFs / load images / wrap texts → response_ix.context.pages (flat list)
↓
[OCRStep] (skipped if no files or use_ocr=False)
• Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox)
• Inject <page> XML markers → lines prepended/appended in ocr_result.pages
• Build SegmentIndex (if provenance) → response_ix.context.segment_index
↓
[GenAIStep] (skipped if ocr_only=True)
• Build system prompt (+ citation hint if provenance)
• Build user content: segment-tagged text OR plain OCR text + optional base64 images
• Determine response schema (wrapped if provenance)
• Call LLM via GenAIClient.invoke()
• Write ix_result.result + meta_data
• Resolve segment citations → ProvenanceData (if provenance)
↓
[ResponseHandlerStep] (always runs)
• Attach flat OCR text (if include_ocr_text)
• Strip bounding boxes (if not include_geometries)
• Delete response_ix.context (never serialised)
↓
[ResponseIX]
ix_result.result ← structured extraction matching UseCaseResponse schema
ocr_result ← OCR details (empty pages unless include_geometries)
provenance ← field-level source map (None unless include_provenance)
error / warning ← diagnostics
metadata ← timings, hostname
↓
[Transport layer]
Serialise → deliver to caller
Persist (optional) → keyed by ix_id
17. Key design decisions
| Decision | Rationale |
|---|---|
Two-method Step interface (validate + process) |
Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop |
Mutable context on ResponseIX |
Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; context is excluded from serialisation so it never leaks |
| Single LLM call for provenance | The schema is wrapped at runtime to add segment_citations; no second round-trip to the LLM is needed |
| Segment IDs instead of fuzzy text matching | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates |
| Global page position for segment IDs | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce p1_l* and p2_l*, not the same IDs) |
Provider-neutral request_kwargs |
The GenAIClient interface accepts a dict; provider-specific translation (e.g. OpenAI image_url format) happens inside each adapter, not in the step |
| Normalised bounding boxes (0–1) | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates |
| Transport-agnostic pipeline | Kafka, FastAPI, and DB adapters all call the same Pipeline.start() |
| Config Server for use cases | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline |