# Core Pipeline Specification — InfoXtractor ## 1. What the system does InfoXtractor is an **async, LLM-powered document extraction pipeline**. Given one or more documents (PDFs, images) or plain text and a *use case* that defines what to extract, it returns a structured JSON result whose shape matches the use-case schema. Optional capabilities that are composed at runtime: | Capability | Trigger | What it adds | |---|---|---| | OCR | `options.ocr.use_ocr = true` (default) | Converts document pages to text via cloud OCR before passing to the LLM | | Vision | `options.gen_ai.use_vision = true` | Also passes page images to the LLM as base64 | | Provenance | `options.provenance.include_provenance = true` | Each extracted field is annotated with the page number, bounding box, and text snippet it came from | | OCR-only | `options.ocr.ocr_only = true` | Returns OCR result without calling the LLM | --- ## 2. Top-level data contracts ### 2.1 RequestIX — pipeline input ```python class RequestIX: use_case: str # Name, URL, or Base64-encoded use-case definition ix_client_id: str # Client identifier (used for auth + metrics) request_id: str # Correlation key set by the caller; echoed in response ix_id: Optional[str] # Internal tracking ID; set by the transport layer, not the caller context: Context # What to process options: Options # How to process it version: Optional[int] # Schema version (default 2) class Context: files: List[str] # URLs to PDFs or images (S3, HTTP) texts: List[str] # Raw text strings (treated as pages) class Options: ocr: OCROptions gen_ai: GenAIOptions provenance: ProvenanceOptions ``` **OCROptions** | Field | Type | Default | Meaning | |---|---|---|---| | `use_ocr` | bool | `True` | Run cloud OCR on files | | `ocr_only` | bool | `False` | Stop after OCR; skip LLM | | `service` | enum | `document_intelligence` | Which OCR backend (`document_intelligence` / `computer_vision`) | | `include_geometries` | bool | `False` | Include raw bounding boxes in `ocr_result` | | `include_ocr_text` | bool | `False` | Attach flat OCR text to `ocr_result.result.text` | | `include_page_tags` | bool | `True` | Inject `` XML markers into OCR lines | | `computer_vision_scaling_factor` | int 1–10 | `1` | Scale multiplier for CV-based OCR | **GenAIOptions** | Field | Type | Default | Meaning | |---|---|---|---| | `gen_ai_model_name` | str | `gpt-4o` (from config) | LLM deployment name | | `use_vision` | bool | `False` | Include page images in the LLM message | | `vision_scaling_factor` | int 1–10 | `1` | Scale factor for rendered images | | `vision_detail` | enum | `auto` | `auto` / `low` / `high` (OpenAI vision detail) | | `reasoning_effort` | Optional[str] | `None` | Passed to reasoning models (e.g. `medium`, `high`) | **ProvenanceOptions** | Field | Type | Default | Meaning | |---|---|---|---| | `include_provenance` | bool | `False` | Enable provenance tracking | | `granularity` | enum | `line` | `line` (only supported today) or `word` | | `include_bounding_boxes` | bool | `True` | Attach normalised coordinates to each source | | `max_sources_per_field` | int ≥1 | `10` | Cap on sources returned per field | | `source_type` | enum | `value_and_context` | `value`: value segments only; `value_and_context`: value + label segments | | `min_confidence` | float 0–1 | `0.0` | Minimum per-field confidence (currently a no-op; hook for future) | --- ### 2.2 ResponseIX — pipeline output ```python class ResponseIX: use_case: Optional[str] # Echoed from RequestIX.use_case use_case_name: Optional[str] # Clean name from use-case definition ix_client_id: Optional[str] request_id: Optional[str] ix_id: Optional[str] error: Optional[str] # Non-None means processing failed; pipeline stopped warning: List[str] # Non-fatal issues encountered during processing ix_result: IXResult # LLM extraction result ocr_result: OCRResult # OCR output (may be empty if OCR skipped) provenance: Optional[ProvenanceData] # Field-level source tracking metadata: Metadata # Timings, hostname class IXResult: result: dict # Matches the use-case response schema result_confidence: dict # Reserved; currently empty meta_data: dict # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name class OCRResult: result: OCRDetails # pages with lines + bounding boxes meta_data: dict class OCRDetails: text: Optional[str] # Flat text (populated only if include_ocr_text=True) pages: List[Page] # Structured OCR pages class Page: page_no: int width: float # Page width in points height: float # Page height in points angle: float # Skew angle unit: Optional[str] lines: List[Line] class Line: text: Optional[str] bounding_box: List[float] # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels) words: Words class Metadata: timings: List[Dict] # Step-level timing records processed_by: Optional[str] # Hostname use_case_truncated: Optional[bool] ``` **`ResponseIX.context`** is an internal-only mutable object that accumulates data as it passes through pipeline steps. It is **excluded from serialisation** (`exclude=True`) and must be stripped before the response is sent to any external consumer. --- ## 3. The Step interface Every pipeline stage implements a two-method async interface: ```python class Step(ABC): @abstractmethod async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool: """ Returns True → run process() Returns False → silently skip this step Raises → set ResponseIX.error and abort the pipeline """ @abstractmethod async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX: """Execute the step; mutate response_ix; return it.""" ``` **Key invariants:** - Both `validate` and `process` receive the **same** `request_ix` and `response_ix` objects. - `process` receives the `response_ix` as enriched by all previous steps (via `context`). - If `validate` raises, the error string is written to `response_ix.error` and no further steps run. - If `process` raises, same: error is captured and pipeline stops. - A step that returns `False` from `validate` leaves `response_ix` unchanged and the pipeline continues. --- ## 4. Pipeline orchestration (`Pipeline`) ```python class Pipeline: steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()] debug_mode: bool # Enables memory-usage logging and GC inspection async def start(self, request_ix: RequestIX) -> ResponseIX: # Wraps _execute_pipeline in an observability span ... async def _execute_pipeline(self, request_ix, obs) -> ResponseIX: response_ix = ResponseIX() for step in self.steps: # wrapped in a step-level observability span + Timer success, response_ix = await self._execute_step(step, request_ix, response_ix) if not success: return response_ix # abort on first error return response_ix async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX): valid = await step.validate(request_ix, response_ix) if valid: response_ix = await step.process(request_ix, response_ix) return True, response_ix # exceptions caught → response_ix.error set → return False ``` The pipeline itself holds **no I/O references** — it does not know about Kafka, HTTP, or databases. The transport layer instantiates `Pipeline`, calls `await pipeline.start(request_ix)`, and handles routing the returned `ResponseIX`. --- ## 5. Step 1 — SetupStep **Purpose:** Validate the request, fetch external files, load the use-case definition, and convert all inputs into an internal `pages` representation for downstream steps. ### 5.1 validate() Raises if: - `request_ix` is `None` - Both `context.files` and `context.texts` are empty ### 5.2 process() **a) Copy text context** ```python response_ix.context.texts = list(request_ix.context.texts) ``` **b) Download files** (parallel, async) Each URL in `request_ix.context.files` is downloaded to a local temp path (`/tmp/ix/`). The result is a list of `File(file_path, mime_type)` objects stored in `response_ix.context.files`. Supported MIME types depend on the chosen OCR service; unsupported files raise immediately. **c) Load use case** The use case is fetched from the Config Server (see §9) by name, URL, or Base64. On success, the two classes `UseCaseRequest` and `UseCaseResponse` are stored in `response_ix.context.use_case_request` and `response_ix.context.use_case_response`. `use_case_name` is extracted from `UseCaseRequest().use_case_name` and stored in `response_ix.use_case_name` for downstream steps and the final response. **d) Build pages** (parallel per document) The ingestor registers every file and classifies its MIME type. Rendering is done per document: - **PDF** (via PyMuPDF): one `Page` object per PDF page, holding `file_path`, `item_index` (0-based index into `request_ix.context.files`), `page_no`, `width`, `height`. - **Image** (via PIL): one `Page` per frame. Multi-frame TIFFs produce multiple pages. - **Text**: one `Page` per entry in `context.texts`. Maximum pages per PDF: 100 (hard cap; raises on violation). All pages are concatenated into a flat list stored in `response_ix.context.pages`. A `DocumentIngestor` instance is stored in `response_ix.context.ingestion` so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning). --- ## 6. Step 2 — OCRStep **Purpose:** Convert document pages to structured text with bounding boxes. Optionally build a `SegmentIndex` for provenance tracking. ### 6.1 validate() Returns `True` when **any** of these flags are set AND `response_ix.context.files` is non-empty: - `use_ocr = True` - `include_geometries = True` - `include_ocr_text = True` - `ocr_only = True` If none of the above apply (text-only request), returns `False` → step is skipped. Raises `IX_000_004` if `ocr_only`, `include_geometries`, or `include_ocr_text` is requested but no files were provided. ### 6.2 process() **a) Select OCR service** ``` service == "document_intelligence" → AzureDocumentIntelligence service == "computer_vision" → AzureComputerVision ``` **b) `perform_ocr(request_ix, response_ix)`** (async, runs against all pages) Populates `response_ix.ocr_result` with an `OCRResult` containing pages with lines and raw bounding boxes (8-coordinate polygons, pixel coordinates). **c) Inject page tags** (when `include_page_tags = True`) For each OCR page a synthetic `Line` is prepended and appended: ``` … original lines … ``` These tags are visible to the LLM as structural markers. The `SegmentIndex` builder explicitly skips them. **d) Build SegmentIndex** (when `include_provenance = True`) ```python response_ix.context.segment_index = await SegmentIndex.build( response_ix.ocr_result, granularity, # line (default) or word pages_metadata=response_ix.context.pages, ) ``` See §8 for the full SegmentIndex specification. --- ## 7. Step 3 — GenAIStep **Purpose:** Call the LLM with the prepared context and parse its structured output into `response_ix.ix_result`. Optionally map the LLM's segment citations to bounding boxes. ### 7.1 validate() - Returns `False` when `ocr_only = True`. - Raises when: - `use_case` is empty - `use_ocr = True` but no OCR text was produced - neither `use_ocr` nor `use_vision` nor plain text context is available - model is not found in Config Server - `use_vision = True` but the model does not support vision If `include_provenance = True` but `use_ocr = False`, a warning is added (provenance will be empty) but processing continues. ### 7.2 process() **a) Prepare system prompt** ```python prompt = UseCaseRequest().system_prompt # falls back to prompt_template_base ``` If `include_provenance = True`, append the segment-citation instruction (see §8.2). **b) Build text context** When provenance is on and a `SegmentIndex` exists: ``` [p1_l0] Line text [p1_l1] Next line … ``` Otherwise: join OCR page lines with `\n\n` between pages, then append any raw texts. **c) Build visual context** (when `use_vision = True`) For each page: 1. Render to PIL image at `vision_scaling_factor × original_size` (capped by `render_max_pixels_per_page`). 2. Convert TIFF frames to JPEG. 3. Base64-encode. 4. Wrap as a provider-neutral image part: ```json {"type": "image", "media_type": "image/jpeg", "data": "", "detail": "auto"} ``` **d) Assemble LLM call parameters** ```python request_kwargs = { "model": model_name, "temperature": from_model_settings, "messages": [ {"role": "system", "content": prompt_string}, {"role": "user", "content": [text_part, *image_parts]}, ], # "reasoning_effort": "medium" ← only for reasoning-capable models } ``` **e) Determine response schema** - When `include_provenance = False`: schema is `UseCaseResponse` directly. - When `include_provenance = True`: a dynamic wrapper is created at runtime: ```python ProvenanceWrappedResponse = create_model( "ProvenanceWrappedResponse", result=(UseCaseResponse, Field(...)), segment_citations=(List[SegmentCitation], Field(default_factory=list)), ) ``` The use-case file is **never modified**; the wrapper is ephemeral. **f) Invoke LLM** ```python result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings) # result.parsed → the Pydantic model instance # result.usage → GenAIUsage(prompt_tokens, completion_tokens) # result.model_name → string ``` **g) Write results** - `response_ix.ix_result.result = parsed.result.model_dump()` (provenance mode) or `parsed.model_dump()` (normal mode) - `response_ix.ix_result.meta_data = {token_usage, model_name}` **h) Build provenance** (when `include_provenance = True`) ```python response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance( extraction_result={"result": ix_result}, segment_citations=parsed.segment_citations, segment_index=response_ix.context.segment_index, max_sources_per_field=opts.max_sources_per_field, min_confidence=opts.min_confidence, include_bounding_boxes=opts.include_bounding_boxes, source_type=opts.source_type, ) ``` See §8.4 for the provenance resolution algorithm. ### 7.3 Provider routing ```python provider = model_settings["provider"] # "azure" or "bedrock" if provider == "bedrock": service = AWSGenAI() else: service = AzureOpenAI() ``` The `GenAIClient` interface is: ```python class GenAIClient(ABC): def setup_gen_ai_client(self, request_ix, model_settings) -> None: ... async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ... ``` `AzureOpenAI` translates provider-neutral image parts to OpenAI's `image_url` content format. It caches one `AsyncOpenAI` instance per `(base_url, api_key)` pair. --- ## 8. Step 4 — ResponseHandlerStep **Purpose:** Shape the final payload before handing it to the transport layer. ### 8.1 validate() Always returns `True`. ### 8.2 process() 1. **Attach OCR text** (`include_ocr_text = True`): concatenate all line texts per page, join pages with `\n\n`, store in `ocr_result.result.text`. 2. **Strip geometries** (`include_geometries = False`, default): clear `ocr_result.result.pages` and `ocr_result.meta_data`. The structured page data is large; callers who don't need it should not receive it. 3. **Trim context**: delete `response_ix.context` entirely. The mutable internal context is never serialised. --- ## 9. Provenance subsystem ### 9.1 SegmentIndex Built from `ocr_result` after OCR completes. It assigns each non-tag OCR line a unique ID: ``` p{global_page_position}_l{line_index_within_page} ``` - `global_page_position` is 1-based and refers to the **flat pages list position**, not the page number printed in the document. This guarantees uniqueness across multi-document requests. - Page tag lines (`` / ``) are explicitly excluded. Internal storage: ```python _id_to_position: Dict[str, Dict] = { "p1_l0": { "page": 1, # global 1-based position "bbox": BoundingBox(...), # normalised 0-1 coordinates "text": "Invoice Number: INV-001", "file_index": 0, # 0-based index into request.context.files }, ... } _ordered_ids: List[str] # insertion order, for deterministic prompt text ``` **Bounding box normalisation:** ```python # 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height) [x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H] ``` Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering. **Prompt format** (`SegmentIndex.to_prompt_text()`): ``` [p1_l0] Invoice Number: INV-001 [p1_l1] Date: 2024-01-15 [p1_l2] Total: CHF 1234.00 … ``` Extra plain texts (from `context.texts`) are appended untagged. **O(1) lookup:** ```python position = segment_index.lookup_segment("p1_l0") # None if unknown ``` ### 9.2 SegmentCitation (LLM output model) The LLM is instructed to populate `segment_citations` in its structured output: ```python class SegmentCitation(BaseModel): field_path: str # e.g. "result.invoice_number" value_segment_ids: List[str] # segments containing the extracted value context_segment_ids: List[str] # surrounding label/anchor segments (may be empty) ``` **System prompt addition (injected by GenAIStep when provenance is on):** ``` For each extracted field, you must also populate the `segment_citations` list. Each entry maps one field to the document segments that were its source. Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number'). Use two separate segment ID lists: - `value_segment_ids`: segment IDs whose text directly contains the extracted value (e.g. ['p1_l4'] for the line containing 'INV-001'). - `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you identify the field but does not contain the value itself (e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label. Only use segment IDs that appear in the document text. Omit fields for which you cannot identify a source segment. ``` ### 9.3 ProvenanceData (response model) ```python class ProvenanceData(BaseModel): fields: Dict[str, FieldProvenance] # field_path → provenance quality_metrics: Dict[str, Any] # see below segment_count: Optional[int] granularity: Optional[str] # "line" or "word" class FieldProvenance(BaseModel): field_name: str # last segment of field_path field_path: str # e.g. "result.invoice_number" value: Any # actual extracted value (resolved from ix_result.result) sources: List[ExtractionSource] confidence: Optional[float] # None today; reserved for future class ExtractionSource(BaseModel): page_number: int # global 1-based position file_index: Optional[int] # 0-based index into request.context.files bounding_box: Optional[BoundingBox] # None when include_bounding_boxes=False text_snippet: str # actual OCR text at this location relevance_score: float # always 1.0 (exact segment ID match) segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering class BoundingBox(BaseModel): coordinates: List[float] # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised ``` **quality_metrics:** ```json { "fields_with_provenance": 8, "total_fields": 10, "coverage_rate": 0.8, "invalid_references": 2 } ``` - `coverage_rate` = fields with at least one source ÷ total leaf fields in result - `invalid_references` = segment IDs cited by LLM that were not found in SegmentIndex ### 9.4 Provenance resolution algorithm ```python for citation in segment_citations: if source_type == VALUE: seg_ids = citation.value_segment_ids else: seg_ids = citation.value_segment_ids + citation.context_segment_ids sources = [] for seg_id in seg_ids[:max_sources_per_field]: pos = segment_index.lookup_segment(seg_id) if pos is None: invalid_ref_count += 1 continue sources.append(ExtractionSource( page_number=pos["page"], file_index=pos["file_index"], bounding_box=pos["bbox"] if include_bounding_boxes else None, text_snippet=pos["text"], relevance_score=1.0, segment_id=seg_id, )) if not sources: continue # skip fields that resolve to nothing # Resolve value: traverse result dict via dot-notation field_path # e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"] value = resolve_nested_path(extraction_result, citation.field_path) provenance[citation.field_path] = FieldProvenance( field_name=citation.field_path.split(".")[-1], field_path=citation.field_path, value=value, sources=sources, confidence=None, ) ``` Array notation is normalised before traversal: `"items[0].name"` → `"items.0.name"`. --- ## 10. Use-case definition contract A use case is a Python module (or dynamically loaded class) that defines two Pydantic models. The Config Server serves them by name; the pipeline loads them via `config_server.use_cases.load_and_validate_use_case(use_case)`. ### UseCaseRequest ```python class UseCaseRequest(BaseModel): use_case_name: str # Human-readable name, stored in ResponseIX.use_case_name system_prompt: str # System prompt sent to the LLM # Legacy: prompt_template_base (deprecated alias for system_prompt) # Legacy: kwargs_use_case (deprecated; triggers format(**kwargs)) ``` ### UseCaseResponse A Pydantic model whose fields define the extraction schema. The LLM is instructed to return JSON matching this schema (via `chat.completions.parse(response_format=UseCaseResponse)`). **Example:** ```python class InvoiceResponse(BaseModel): invoice_number: str invoice_date: str total_amount: float line_items: List[LineItem] class LineItem(BaseModel): description: str quantity: int unit_price: float ``` The schema is passed as the `response_format` argument to the OpenAI structured-output API. When provenance is on, it is wrapped in `ProvenanceWrappedResponse` at runtime (§7.2e). --- ## 11. LLM invocation contract ### 11.1 Provider-neutral request_kwargs ```python request_kwargs = { "model": str, # deployment name "temperature": Optional[float], # from model_settings; None for reasoning models "messages": [ {"role": "system", "content": str}, {"role": "user", "content": List[ContentPart]}, ], "reasoning_effort": Optional[str], # only present when supported by model } ContentPart = ( {"type": "text", "text": str} | {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str} ) ``` ### 11.2 GenAIInvocationResult ```python @dataclass class GenAIInvocationResult: parsed: Any # Parsed Pydantic model instance (matches response_schema) usage: GenAIUsage # prompt_tokens, completion_tokens model_name: str ``` ### 11.3 Model settings (from Config Server) ```json { "provider": "azure", "temperature": 0.0, "vision_capability": true, "supports_reasoning_efforts": ["low", "medium", "high"], "default_reasoning_effort": "medium" } ``` - `temperature` is `None` for reasoning models (OpenAI ignores it and it must be omitted). - `vision_capability` must be `true` for `use_vision = True` requests to be accepted. - `supports_reasoning_efforts` is the allowlist; requests for unsupported values fall back to `default_reasoning_effort` with a warning. --- ## 12. Error model ### 12.1 IXException All domain errors are wrapped in `IXException(message_or_Error_enum)`. The pipeline catches any exception from `validate()` or `process()`, writes `str(e)` to `response_ix.error`, and stops. ### 12.2 Named error codes | Code | Trigger | |---|---| | `IX_000_000` | `request_ix` is `None` | | `IX_000_002` | No context (neither files nor texts) | | `IX_000_004` | OCR required but no files provided | | `IX_000_005` | File MIME type not supported by chosen OCR service | | `IX_001_000` | `use_case` is empty or missing | | `IX_001_001` | Use case could not be loaded or validated | | `IX_003_003` | `use_vision=True` but model has no vision capability | | `IX_004_001` | Response too large for transport (e.g. Kafka max message size) | ### 12.3 Warnings Non-fatal issues are appended to `response_ix.warning` and logged. Examples: - Vision requested but no renderable documents found - Image scale capped due to pixel limit - Provenance requested with `use_ocr=False` - Word document contains embedded images that will be skipped - `use_case_name` not set in use-case definition --- ## 13. Configuration (AppConfig) All settings are loaded from environment variables (with `.env` fallback via `pydantic-settings`). | Category | Key env vars | |---|---| | Azure OCR | `DOCUMENT_INTELLIGENCE_ENDPOINT`, `DOCUMENT_INTELLIGENCE_KEY`, `COMPUTER_VISION_ENDPOINT`, `COMPUTER_VISION_SUBSCRIPTION_KEY` | | Azure OpenAI | `OPENAI_BASE_URL`, `OPENAI_API_KEY` | | LLM defaults | `DEFAULT_LLM` (default: `gpt-4o`) | | Config Server | `CONFIG_SERVER_BASE_URL` (default: `http://ix-config:8998/`) | | Pipeline | `PIPELINE_WORKER_CONCURRENCY` (1–10), `PIPELINE_REQUEST_TIMEOUT_SECONDS` (default: 2700) | | Rendering | `RENDER_MAX_PIXELS_PER_PAGE` (default: 75,000,000) | | AWS | `S3_BUCKET`, `S3_OBJECT_PREFIX`, `AWS_REGION`, `TARGET_ROLE` | | Kafka | `PROCESSING_TOPIC`, `KAFKA_BOOTSTRAP_SERVER`, … | | Observability | `LOG_LEVEL`, `OTEL_EXPORTER_OTLP_ENDPOINT`, `PYROSCOPE_SERVER_ADDRESS` | --- ## 14. Observability ### 14.1 Logging Every log call carries `ix_id` as contextual metadata. The custom logger exposes: ```python logger.info_ix(msg, request_ix) logger.warning_ix(msg, request_ix) logger.error_ix(msg, request_ix) ``` If `ix_id` is not yet set on `request_ix`, the logger generates one lazily on first use. Step timing is recorded via a `Timer` context manager; results are stored in `response_ix.metadata.timings`. ### 14.2 Tracing (OpenTelemetry) - **Pipeline span** wraps the entire `_execute_pipeline()` call (attributes: `request_id`, `use_case`, `client_id`). - **Step span** wraps each individual step. - Errors are recorded on the span with `mark_failed(error_message)`. - Exported via gRPC to `OTEL_EXPORTER_OTLP_ENDPOINT`. ### 14.3 Metrics (Prometheus) | Metric | Labels | |---|---| | `pipeline_requests_total` | `client_id` | | LLM token counter | `model`, `client_id`, `use_case`, `status` | | LLM latency histogram | `model`, `client_id` | | Extraction quality | `client_id`, `use_case`, `model`, `reasoning_effort` | Exposed on the health port (default 9009). --- ## 15. Transport adapter contract The pipeline is transport-agnostic. Any delivery mechanism must: 1. **Deserialise** the incoming payload into a `RequestIX` instance. 2. **Assign `ix_id`** (a random 16-character hex string) if not already present. 3. **Call** `await Pipeline().start(request_ix)` → `response_ix`. 4. **Serialise** `response_ix` (use `.model_dump(by_alias=True)` to honour `serialization_alias` on `metadata`). 5. **Deliver** the serialised response to the caller via the appropriate channel. Optionally: - **Persist** `request_ix` and `response_ix` to a durable store (S3, database) keyed by `ix_id`. - **Apply a timeout**: wrap the `await pipeline.start(...)` call and write a timeout error to `response_ix.error` on expiry. ### Kafka adapter (existing) - Consumes from `ch-infoxtractor.processing.v2`; produces to `ch-infoxtractor.outbound.v2` and client-specific topics. - Concurrency controlled via asyncio semaphore (`PIPELINE_WORKER_CONCURRENCY`). - Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets). - Graceful shutdown: drains in-flight messages before stopping. ### FastAPI adapter (example) ```python @router.post("/extract") async def extract(body: RequestIX) -> ResponseIX: body.ix_id = body.ix_id or secrets.token_hex(8) response = await pipeline.start(body) return response ``` ### Database adapter (example) ```python async def process_job(job_id: str, db: AsyncSession): row = await db.get(Job, job_id) request_ix = RequestIX.model_validate_json(row.payload) request_ix.ix_id = row.ix_id response_ix = await pipeline.start(request_ix) row.result = response_ix.model_dump_json(by_alias=True) row.status = "done" if not response_ix.error else "error" await db.commit() ``` --- ## 16. End-to-end data flow summary ``` [Transport layer] ↓ RequestIX (use_case, context.files/texts, options, ix_id) ↓ [Pipeline.start()] ↓ [SetupStep] • Download files → local paths → response_ix.context.files (File objects) • Load use case from Config Server → response_ix.context.use_case_request/response • Split PDFs / load images / wrap texts → response_ix.context.pages (flat list) ↓ [OCRStep] (skipped if no files or use_ocr=False) • Call Azure OCR per page → response_ix.ocr_result (pages + lines + bbox) • Inject XML markers → lines prepended/appended in ocr_result.pages • Build SegmentIndex (if provenance) → response_ix.context.segment_index ↓ [GenAIStep] (skipped if ocr_only=True) • Build system prompt (+ citation hint if provenance) • Build user content: segment-tagged text OR plain OCR text + optional base64 images • Determine response schema (wrapped if provenance) • Call LLM via GenAIClient.invoke() • Write ix_result.result + meta_data • Resolve segment citations → ProvenanceData (if provenance) ↓ [ResponseHandlerStep] (always runs) • Attach flat OCR text (if include_ocr_text) • Strip bounding boxes (if not include_geometries) • Delete response_ix.context (never serialised) ↓ [ResponseIX] ix_result.result ← structured extraction matching UseCaseResponse schema ocr_result ← OCR details (empty pages unless include_geometries) provenance ← field-level source map (None unless include_provenance) error / warning ← diagnostics metadata ← timings, hostname ↓ [Transport layer] Serialise → deliver to caller Persist (optional) → keyed by ix_id ``` --- ## 17. Key design decisions | Decision | Rationale | |---|---| | **Two-method Step interface** (`validate` + `process`) | Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop | | **Mutable `context` on ResponseIX** | Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; `context` is excluded from serialisation so it never leaks | | **Single LLM call for provenance** | The schema is wrapped at runtime to add `segment_citations`; no second round-trip to the LLM is needed | | **Segment IDs instead of fuzzy text matching** | IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates | | **Global page position for segment IDs** | Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce `p1_l*` and `p2_l*`, not the same IDs) | | **Provider-neutral `request_kwargs`** | The `GenAIClient` interface accepts a dict; provider-specific translation (e.g. OpenAI `image_url` format) happens inside each adapter, not in the step | | **Normalised bounding boxes (0–1)** | Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates | | **Transport-agnostic pipeline** | Kafka, FastAPI, and DB adapters all call the same `Pipeline.start()` | | **Config Server for use cases** | Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline |