infoxtractor/docs/spec-core-pipeline.md
Dirk Riemann 124403252d Initial design: on-prem LLM extraction microservice MVP
Establishes ix as an async, on-prem, LLM-powered structured extraction
microservice. Full reference spec stays in docs/spec-core-pipeline.md;
MVP spec (strict subset — Ollama only, Surya OCR, REST + Postgres-queue
transports in parallel, in-repo use cases, provenance-based reliability
signals) lives at docs/superpowers/specs/2026-04-18-ix-mvp-design.md.

First use case: bank_statement_header (feeds mammon's needs_parser flow).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 10:23:17 +02:00

32 KiB
Raw Blame History

Core Pipeline Specification — InfoXtractor

1. What the system does

InfoXtractor is an async, LLM-powered document extraction pipeline. Given one or more documents (PDFs, images) or plain text and a use case that defines what to extract, it returns a structured JSON result whose shape matches the use-case schema.

Optional capabilities that are composed at runtime:

Capability Trigger What it adds
OCR options.ocr.use_ocr = true (default) Converts document pages to text via cloud OCR before passing to the LLM
Vision options.gen_ai.use_vision = true Also passes page images to the LLM as base64
Provenance options.provenance.include_provenance = true Each extracted field is annotated with the page number, bounding box, and text snippet it came from
OCR-only options.ocr.ocr_only = true Returns OCR result without calling the LLM

2. Top-level data contracts

2.1 RequestIX — pipeline input

class RequestIX:
    use_case: str            # Name, URL, or Base64-encoded use-case definition
    ix_client_id: str        # Client identifier (used for auth + metrics)
    request_id: str          # Correlation key set by the caller; echoed in response
    ix_id: Optional[str]     # Internal tracking ID; set by the transport layer, not the caller
    context: Context         # What to process
    options: Options         # How to process it
    version: Optional[int]   # Schema version (default 2)

class Context:
    files: List[str]   # URLs to PDFs or images (S3, HTTP)
    texts: List[str]   # Raw text strings (treated as pages)

class Options:
    ocr: OCROptions
    gen_ai: GenAIOptions
    provenance: ProvenanceOptions

OCROptions

Field Type Default Meaning
use_ocr bool True Run cloud OCR on files
ocr_only bool False Stop after OCR; skip LLM
service enum document_intelligence Which OCR backend (document_intelligence / computer_vision)
include_geometries bool False Include raw bounding boxes in ocr_result
include_ocr_text bool False Attach flat OCR text to ocr_result.result.text
include_page_tags bool True Inject <page file="…" number="…"> XML markers into OCR lines
computer_vision_scaling_factor int 110 1 Scale multiplier for CV-based OCR

GenAIOptions

Field Type Default Meaning
gen_ai_model_name str gpt-4o (from config) LLM deployment name
use_vision bool False Include page images in the LLM message
vision_scaling_factor int 110 1 Scale factor for rendered images
vision_detail enum auto auto / low / high (OpenAI vision detail)
reasoning_effort Optional[str] None Passed to reasoning models (e.g. medium, high)

ProvenanceOptions

Field Type Default Meaning
include_provenance bool False Enable provenance tracking
granularity enum line line (only supported today) or word
include_bounding_boxes bool True Attach normalised coordinates to each source
max_sources_per_field int ≥1 10 Cap on sources returned per field
source_type enum value_and_context value: value segments only; value_and_context: value + label segments
min_confidence float 01 0.0 Minimum per-field confidence (currently a no-op; hook for future)

2.2 ResponseIX — pipeline output

class ResponseIX:
    use_case: Optional[str]       # Echoed from RequestIX.use_case
    use_case_name: Optional[str]  # Clean name from use-case definition
    ix_client_id: Optional[str]
    request_id: Optional[str]
    ix_id: Optional[str]
    error: Optional[str]          # Non-None means processing failed; pipeline stopped
    warning: List[str]            # Non-fatal issues encountered during processing
    ix_result: IXResult           # LLM extraction result
    ocr_result: OCRResult         # OCR output (may be empty if OCR skipped)
    provenance: Optional[ProvenanceData]  # Field-level source tracking
    metadata: Metadata            # Timings, hostname

class IXResult:
    result: dict       # Matches the use-case response schema
    result_confidence: dict  # Reserved; currently empty
    meta_data: dict    # token_usage {prompt_tokens, completion_tokens, total_tokens}, model_name

class OCRResult:
    result: OCRDetails      # pages with lines + bounding boxes
    meta_data: dict

class OCRDetails:
    text: Optional[str]    # Flat text (populated only if include_ocr_text=True)
    pages: List[Page]      # Structured OCR pages

class Page:
    page_no: int
    width: float           # Page width in points
    height: float          # Page height in points
    angle: float           # Skew angle
    unit: Optional[str]
    lines: List[Line]

class Line:
    text: Optional[str]
    bounding_box: List[float]  # 8-coordinate polygon [x1,y1, x2,y2, x3,y3, x4,y4] (raw pixels)
    words: Words

class Metadata:
    timings: List[Dict]          # Step-level timing records
    processed_by: Optional[str]  # Hostname
    use_case_truncated: Optional[bool]

ResponseIX.context is an internal-only mutable object that accumulates data as it passes through pipeline steps. It is excluded from serialisation (exclude=True) and must be stripped before the response is sent to any external consumer.


3. The Step interface

Every pipeline stage implements a two-method async interface:

class Step(ABC):
    @abstractmethod
    async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
        """
        Returns True  → run process()
        Returns False → silently skip this step
        Raises        → set ResponseIX.error and abort the pipeline
        """

    @abstractmethod
    async def process(self, request_ix: RequestIX, response_ix: ResponseIX) -> ResponseIX:
        """Execute the step; mutate response_ix; return it."""

Key invariants:

  • Both validate and process receive the same request_ix and response_ix objects.
  • process receives the response_ix as enriched by all previous steps (via context).
  • If validate raises, the error string is written to response_ix.error and no further steps run.
  • If process raises, same: error is captured and pipeline stops.
  • A step that returns False from validate leaves response_ix unchanged and the pipeline continues.

4. Pipeline orchestration (Pipeline)

class Pipeline:
    steps = [SetupStep(), OCRStep(), GenAIStep(), ResponseHandlerStep()]
    debug_mode: bool  # Enables memory-usage logging and GC inspection

    async def start(self, request_ix: RequestIX) -> ResponseIX:
        # Wraps _execute_pipeline in an observability span
        ...

    async def _execute_pipeline(self, request_ix, obs) -> ResponseIX:
        response_ix = ResponseIX()
        for step in self.steps:
            # wrapped in a step-level observability span + Timer
            success, response_ix = await self._execute_step(step, request_ix, response_ix)
            if not success:
                return response_ix   # abort on first error
        return response_ix

    async def _execute_step(self, step, request_ix, response_ix) -> (bool, ResponseIX):
        valid = await step.validate(request_ix, response_ix)
        if valid:
            response_ix = await step.process(request_ix, response_ix)
        return True, response_ix
        # exceptions caught → response_ix.error set → return False

The pipeline itself holds no I/O references — it does not know about Kafka, HTTP, or databases. The transport layer instantiates Pipeline, calls await pipeline.start(request_ix), and handles routing the returned ResponseIX.


5. Step 1 — SetupStep

Purpose: Validate the request, fetch external files, load the use-case definition, and convert all inputs into an internal pages representation for downstream steps.

5.1 validate()

Raises if:

  • request_ix is None
  • Both context.files and context.texts are empty

5.2 process()

a) Copy text context

response_ix.context.texts = list(request_ix.context.texts)

b) Download files (parallel, async)

Each URL in request_ix.context.files is downloaded to a local temp path (/tmp/ix/). The result is a list of File(file_path, mime_type) objects stored in response_ix.context.files.

Supported MIME types depend on the chosen OCR service; unsupported files raise immediately.

c) Load use case

The use case is fetched from the Config Server (see §9) by name, URL, or Base64. On success, the two classes UseCaseRequest and UseCaseResponse are stored in response_ix.context.use_case_request and response_ix.context.use_case_response.

use_case_name is extracted from UseCaseRequest().use_case_name and stored in response_ix.use_case_name for downstream steps and the final response.

d) Build pages (parallel per document)

The ingestor registers every file and classifies its MIME type. Rendering is done per document:

  • PDF (via PyMuPDF): one Page object per PDF page, holding file_path, item_index (0-based index into request_ix.context.files), page_no, width, height.
  • Image (via PIL): one Page per frame. Multi-frame TIFFs produce multiple pages.
  • Text: one Page per entry in context.texts.

Maximum pages per PDF: 100 (hard cap; raises on violation).

All pages are concatenated into a flat list stored in response_ix.context.pages.

A DocumentIngestor instance is stored in response_ix.context.ingestion so OCRStep can make format-specific decisions (e.g. skip Word-embedded images with a warning).


6. Step 2 — OCRStep

Purpose: Convert document pages to structured text with bounding boxes. Optionally build a SegmentIndex for provenance tracking.

6.1 validate()

Returns True when any of these flags are set AND response_ix.context.files is non-empty:

  • use_ocr = True
  • include_geometries = True
  • include_ocr_text = True
  • ocr_only = True

If none of the above apply (text-only request), returns False → step is skipped.

Raises IX_000_004 if ocr_only, include_geometries, or include_ocr_text is requested but no files were provided.

6.2 process()

a) Select OCR service

service == "document_intelligence" → AzureDocumentIntelligence
service == "computer_vision"       → AzureComputerVision

b) perform_ocr(request_ix, response_ix) (async, runs against all pages)

Populates response_ix.ocr_result with an OCRResult containing pages with lines and raw bounding boxes (8-coordinate polygons, pixel coordinates).

c) Inject page tags (when include_page_tags = True)

For each OCR page a synthetic Line is prepended and appended:

<page file="{item_index}" number="{page_no}">
… original lines …
</page>

These tags are visible to the LLM as structural markers. The SegmentIndex builder explicitly skips them.

d) Build SegmentIndex (when include_provenance = True)

response_ix.context.segment_index = await SegmentIndex.build(
    response_ix.ocr_result,
    granularity,           # line (default) or word
    pages_metadata=response_ix.context.pages,
)

See §8 for the full SegmentIndex specification.


7. Step 3 — GenAIStep

Purpose: Call the LLM with the prepared context and parse its structured output into response_ix.ix_result. Optionally map the LLM's segment citations to bounding boxes.

7.1 validate()

  • Returns False when ocr_only = True.
  • Raises when:
    • use_case is empty
    • use_ocr = True but no OCR text was produced
    • neither use_ocr nor use_vision nor plain text context is available
    • model is not found in Config Server
    • use_vision = True but the model does not support vision

If include_provenance = True but use_ocr = False, a warning is added (provenance will be empty) but processing continues.

7.2 process()

a) Prepare system prompt

prompt = UseCaseRequest().system_prompt  # falls back to prompt_template_base

If include_provenance = True, append the segment-citation instruction (see §8.2).

b) Build text context

When provenance is on and a SegmentIndex exists:

[p1_l0] Line text
[p1_l1] Next line
…

Otherwise: join OCR page lines with \n\n between pages, then append any raw texts.

c) Build visual context (when use_vision = True)

For each page:

  1. Render to PIL image at vision_scaling_factor × original_size (capped by render_max_pixels_per_page).
  2. Convert TIFF frames to JPEG.
  3. Base64-encode.
  4. Wrap as a provider-neutral image part:
{"type": "image", "media_type": "image/jpeg", "data": "<base64>", "detail": "auto"}

d) Assemble LLM call parameters

request_kwargs = {
    "model": model_name,
    "temperature": from_model_settings,
    "messages": [
        {"role": "system", "content": prompt_string},
        {"role": "user",   "content": [text_part, *image_parts]},
    ],
    # "reasoning_effort": "medium"  ← only for reasoning-capable models
}

e) Determine response schema

  • When include_provenance = False: schema is UseCaseResponse directly.
  • When include_provenance = True: a dynamic wrapper is created at runtime:
ProvenanceWrappedResponse = create_model(
    "ProvenanceWrappedResponse",
    result=(UseCaseResponse, Field(...)),
    segment_citations=(List[SegmentCitation], Field(default_factory=list)),
)

The use-case file is never modified; the wrapper is ephemeral.

f) Invoke LLM

result = await gen_ai_service.invoke(request_kwargs, response_schema, model_settings)
# result.parsed → the Pydantic model instance
# result.usage  → GenAIUsage(prompt_tokens, completion_tokens)
# result.model_name → string

g) Write results

  • response_ix.ix_result.result = parsed.result.model_dump() (provenance mode) or parsed.model_dump() (normal mode)
  • response_ix.ix_result.meta_data = {token_usage, model_name}

h) Build provenance (when include_provenance = True)

response_ix.provenance = ProvenanceUtils.map_segment_refs_to_provenance(
    extraction_result={"result": ix_result},
    segment_citations=parsed.segment_citations,
    segment_index=response_ix.context.segment_index,
    max_sources_per_field=opts.max_sources_per_field,
    min_confidence=opts.min_confidence,
    include_bounding_boxes=opts.include_bounding_boxes,
    source_type=opts.source_type,
)

See §8.4 for the provenance resolution algorithm.

7.3 Provider routing

provider = model_settings["provider"]   # "azure" or "bedrock"
if provider == "bedrock": service = AWSGenAI()
else:                      service = AzureOpenAI()

The GenAIClient interface is:

class GenAIClient(ABC):
    def setup_gen_ai_client(self, request_ix, model_settings) -> None: ...
    async def invoke(self, request_kwargs, response_schema, model_settings) -> GenAIInvocationResult: ...

AzureOpenAI translates provider-neutral image parts to OpenAI's image_url content format. It caches one AsyncOpenAI instance per (base_url, api_key) pair.


8. Step 4 — ResponseHandlerStep

Purpose: Shape the final payload before handing it to the transport layer.

8.1 validate()

Always returns True.

8.2 process()

  1. Attach OCR text (include_ocr_text = True): concatenate all line texts per page, join pages with \n\n, store in ocr_result.result.text.

  2. Strip geometries (include_geometries = False, default): clear ocr_result.result.pages and ocr_result.meta_data. The structured page data is large; callers who don't need it should not receive it.

  3. Trim context: delete response_ix.context entirely. The mutable internal context is never serialised.


9. Provenance subsystem

9.1 SegmentIndex

Built from ocr_result after OCR completes. It assigns each non-tag OCR line a unique ID:

p{global_page_position}_l{line_index_within_page}
  • global_page_position is 1-based and refers to the flat pages list position, not the page number printed in the document. This guarantees uniqueness across multi-document requests.
  • Page tag lines (<page …> / </page>) are explicitly excluded.

Internal storage:

_id_to_position: Dict[str, Dict] = {
    "p1_l0": {
        "page": 1,                  # global 1-based position
        "bbox": BoundingBox(...),   # normalised 0-1 coordinates
        "text": "Invoice Number: INV-001",
        "file_index": 0,            # 0-based index into request.context.files
    },
    ...
}
_ordered_ids: List[str]   # insertion order, for deterministic prompt text

Bounding box normalisation:

# 8-coordinate polygon normalised to 0-1 (divide x-coords by page.width, y-coords by page.height)
[x1/W, y1/H, x2/W, y2/H, x3/W, y3/H, x4/W, y4/H]

Multiply back by rendered page dimensions to obtain pixel coordinates for overlay rendering.

Prompt format (SegmentIndex.to_prompt_text()):

[p1_l0] Invoice Number: INV-001
[p1_l1] Date: 2024-01-15
[p1_l2] Total: CHF 1234.00
…

Extra plain texts (from context.texts) are appended untagged.

O(1) lookup:

position = segment_index.lookup_segment("p1_l0")  # None if unknown

9.2 SegmentCitation (LLM output model)

The LLM is instructed to populate segment_citations in its structured output:

class SegmentCitation(BaseModel):
    field_path: str             # e.g. "result.invoice_number"
    value_segment_ids: List[str]   # segments containing the extracted value
    context_segment_ids: List[str] # surrounding label/anchor segments (may be empty)

System prompt addition (injected by GenAIStep when provenance is on):

For each extracted field, you must also populate the `segment_citations` list.
Each entry maps one field to the document segments that were its source.
Set `field_path` to the dot-separated JSON path of the field (e.g. 'result.invoice_number').
Use two separate segment ID lists:
- `value_segment_ids`: segment IDs whose text directly contains the extracted value
  (e.g. ['p1_l4'] for the line containing 'INV-001').
- `context_segment_ids`: segment IDs for surrounding label or anchor text that helped you
  identify the field but does not contain the value itself
  (e.g. ['p1_l3'] for a label like 'Invoice Number:'). Leave empty if there is no distinct label.
Only use segment IDs that appear in the document text.
Omit fields for which you cannot identify a source segment.

9.3 ProvenanceData (response model)

class ProvenanceData(BaseModel):
    fields: Dict[str, FieldProvenance]   # field_path → provenance
    quality_metrics: Dict[str, Any]      # see below
    segment_count: Optional[int]
    granularity: Optional[str]           # "line" or "word"

class FieldProvenance(BaseModel):
    field_name: str          # last segment of field_path
    field_path: str          # e.g. "result.invoice_number"
    value: Any               # actual extracted value (resolved from ix_result.result)
    sources: List[ExtractionSource]
    confidence: Optional[float]   # None today; reserved for future

class ExtractionSource(BaseModel):
    page_number: int          # global 1-based position
    file_index: Optional[int] # 0-based index into request.context.files
    bounding_box: Optional[BoundingBox]   # None when include_bounding_boxes=False
    text_snippet: str         # actual OCR text at this location
    relevance_score: float    # always 1.0 (exact segment ID match)
    segment_id: Optional[str] # e.g. "p1_l3"; for internal use, not for rendering

class BoundingBox(BaseModel):
    coordinates: List[float]  # [x1,y1, x2,y2, x3,y3, x4,y4], all 0-1 normalised

quality_metrics:

{
  "fields_with_provenance": 8,
  "total_fields": 10,
  "coverage_rate": 0.8,
  "invalid_references": 2
}
  • coverage_rate = fields with at least one source ÷ total leaf fields in result
  • invalid_references = segment IDs cited by LLM that were not found in SegmentIndex

9.4 Provenance resolution algorithm

for citation in segment_citations:
    if source_type == VALUE:
        seg_ids = citation.value_segment_ids
    else:
        seg_ids = citation.value_segment_ids + citation.context_segment_ids

    sources = []
    for seg_id in seg_ids[:max_sources_per_field]:
        pos = segment_index.lookup_segment(seg_id)
        if pos is None:
            invalid_ref_count += 1
            continue
        sources.append(ExtractionSource(
            page_number=pos["page"],
            file_index=pos["file_index"],
            bounding_box=pos["bbox"] if include_bounding_boxes else None,
            text_snippet=pos["text"],
            relevance_score=1.0,
            segment_id=seg_id,
        ))

    if not sources:
        continue  # skip fields that resolve to nothing

    # Resolve value: traverse result dict via dot-notation field_path
    # e.g. "result.insured_vehicle.vin" → result["insured_vehicle"]["vin"]
    value = resolve_nested_path(extraction_result, citation.field_path)

    provenance[citation.field_path] = FieldProvenance(
        field_name=citation.field_path.split(".")[-1],
        field_path=citation.field_path,
        value=value,
        sources=sources,
        confidence=None,
    )

Array notation is normalised before traversal: "items[0].name""items.0.name".


10. Use-case definition contract

A use case is a Python module (or dynamically loaded class) that defines two Pydantic models. The Config Server serves them by name; the pipeline loads them via config_server.use_cases.load_and_validate_use_case(use_case).

UseCaseRequest

class UseCaseRequest(BaseModel):
    use_case_name: str          # Human-readable name, stored in ResponseIX.use_case_name
    system_prompt: str          # System prompt sent to the LLM
    # Legacy: prompt_template_base (deprecated alias for system_prompt)
    # Legacy: kwargs_use_case (deprecated; triggers format(**kwargs))

UseCaseResponse

A Pydantic model whose fields define the extraction schema. The LLM is instructed to return JSON matching this schema (via chat.completions.parse(response_format=UseCaseResponse)).

Example:

class InvoiceResponse(BaseModel):
    invoice_number: str
    invoice_date: str
    total_amount: float
    line_items: List[LineItem]

class LineItem(BaseModel):
    description: str
    quantity: int
    unit_price: float

The schema is passed as the response_format argument to the OpenAI structured-output API. When provenance is on, it is wrapped in ProvenanceWrappedResponse at runtime (§7.2e).


11. LLM invocation contract

11.1 Provider-neutral request_kwargs

request_kwargs = {
    "model":       str,                    # deployment name
    "temperature": Optional[float],        # from model_settings; None for reasoning models
    "messages": [
        {"role": "system", "content": str},
        {"role": "user",   "content": List[ContentPart]},
    ],
    "reasoning_effort": Optional[str],     # only present when supported by model
}

ContentPart = (
    {"type": "text",  "text": str}
    | {"type": "image", "media_type": "image/jpeg", "data": str, "detail": str}
)

11.2 GenAIInvocationResult

@dataclass
class GenAIInvocationResult:
    parsed:     Any           # Parsed Pydantic model instance (matches response_schema)
    usage:      GenAIUsage    # prompt_tokens, completion_tokens
    model_name: str

11.3 Model settings (from Config Server)

{
  "provider": "azure",
  "temperature": 0.0,
  "vision_capability": true,
  "supports_reasoning_efforts": ["low", "medium", "high"],
  "default_reasoning_effort": "medium"
}
  • temperature is None for reasoning models (OpenAI ignores it and it must be omitted).
  • vision_capability must be true for use_vision = True requests to be accepted.
  • supports_reasoning_efforts is the allowlist; requests for unsupported values fall back to default_reasoning_effort with a warning.

12. Error model

12.1 IXException

All domain errors are wrapped in IXException(message_or_Error_enum). The pipeline catches any exception from validate() or process(), writes str(e) to response_ix.error, and stops.

12.2 Named error codes

Code Trigger
IX_000_000 request_ix is None
IX_000_002 No context (neither files nor texts)
IX_000_004 OCR required but no files provided
IX_000_005 File MIME type not supported by chosen OCR service
IX_001_000 use_case is empty or missing
IX_001_001 Use case could not be loaded or validated
IX_003_003 use_vision=True but model has no vision capability
IX_004_001 Response too large for transport (e.g. Kafka max message size)

12.3 Warnings

Non-fatal issues are appended to response_ix.warning and logged. Examples:

  • Vision requested but no renderable documents found
  • Image scale capped due to pixel limit
  • Provenance requested with use_ocr=False
  • Word document contains embedded images that will be skipped
  • use_case_name not set in use-case definition

13. Configuration (AppConfig)

All settings are loaded from environment variables (with .env fallback via pydantic-settings).

Category Key env vars
Azure OCR DOCUMENT_INTELLIGENCE_ENDPOINT, DOCUMENT_INTELLIGENCE_KEY, COMPUTER_VISION_ENDPOINT, COMPUTER_VISION_SUBSCRIPTION_KEY
Azure OpenAI OPENAI_BASE_URL, OPENAI_API_KEY
LLM defaults DEFAULT_LLM (default: gpt-4o)
Config Server CONFIG_SERVER_BASE_URL (default: http://ix-config:8998/)
Pipeline PIPELINE_WORKER_CONCURRENCY (110), PIPELINE_REQUEST_TIMEOUT_SECONDS (default: 2700)
Rendering RENDER_MAX_PIXELS_PER_PAGE (default: 75,000,000)
AWS S3_BUCKET, S3_OBJECT_PREFIX, AWS_REGION, TARGET_ROLE
Kafka PROCESSING_TOPIC, KAFKA_BOOTSTRAP_SERVER, …
Observability LOG_LEVEL, OTEL_EXPORTER_OTLP_ENDPOINT, PYROSCOPE_SERVER_ADDRESS

14. Observability

14.1 Logging

Every log call carries ix_id as contextual metadata. The custom logger exposes:

logger.info_ix(msg, request_ix)
logger.warning_ix(msg, request_ix)
logger.error_ix(msg, request_ix)

If ix_id is not yet set on request_ix, the logger generates one lazily on first use.

Step timing is recorded via a Timer context manager; results are stored in response_ix.metadata.timings.

14.2 Tracing (OpenTelemetry)

  • Pipeline span wraps the entire _execute_pipeline() call (attributes: request_id, use_case, client_id).
  • Step span wraps each individual step.
  • Errors are recorded on the span with mark_failed(error_message).
  • Exported via gRPC to OTEL_EXPORTER_OTLP_ENDPOINT.

14.3 Metrics (Prometheus)

Metric Labels
pipeline_requests_total client_id
LLM token counter model, client_id, use_case, status
LLM latency histogram model, client_id
Extraction quality client_id, use_case, model, reasoning_effort

Exposed on the health port (default 9009).


15. Transport adapter contract

The pipeline is transport-agnostic. Any delivery mechanism must:

  1. Deserialise the incoming payload into a RequestIX instance.
  2. Assign ix_id (a random 16-character hex string) if not already present.
  3. Call await Pipeline().start(request_ix)response_ix.
  4. Serialise response_ix (use .model_dump(by_alias=True) to honour serialization_alias on metadata).
  5. Deliver the serialised response to the caller via the appropriate channel.

Optionally:

  • Persist request_ix and response_ix to a durable store (S3, database) keyed by ix_id.
  • Apply a timeout: wrap the await pipeline.start(...) call and write a timeout error to response_ix.error on expiry.

Kafka adapter (existing)

  • Consumes from ch-infoxtractor.processing.v2; produces to ch-infoxtractor.outbound.v2 and client-specific topics.
  • Concurrency controlled via asyncio semaphore (PIPELINE_WORKER_CONCURRENCY).
  • Per-partition offset commit tracking (gap-safe: only commits contiguous completed offsets).
  • Graceful shutdown: drains in-flight messages before stopping.

FastAPI adapter (example)

@router.post("/extract")
async def extract(body: RequestIX) -> ResponseIX:
    body.ix_id = body.ix_id or secrets.token_hex(8)
    response = await pipeline.start(body)
    return response

Database adapter (example)

async def process_job(job_id: str, db: AsyncSession):
    row = await db.get(Job, job_id)
    request_ix = RequestIX.model_validate_json(row.payload)
    request_ix.ix_id = row.ix_id
    response_ix = await pipeline.start(request_ix)
    row.result = response_ix.model_dump_json(by_alias=True)
    row.status = "done" if not response_ix.error else "error"
    await db.commit()

16. End-to-end data flow summary

[Transport layer]
    ↓ RequestIX (use_case, context.files/texts, options, ix_id)
    ↓
[Pipeline.start()]
    ↓
[SetupStep]
    • Download files → local paths         → response_ix.context.files (File objects)
    • Load use case from Config Server      → response_ix.context.use_case_request/response
    • Split PDFs / load images / wrap texts → response_ix.context.pages (flat list)
    ↓
[OCRStep]  (skipped if no files or use_ocr=False)
    • Call Azure OCR per page               → response_ix.ocr_result (pages + lines + bbox)
    • Inject <page> XML markers             → lines prepended/appended in ocr_result.pages
    • Build SegmentIndex (if provenance)    → response_ix.context.segment_index
    ↓
[GenAIStep]  (skipped if ocr_only=True)
    • Build system prompt (+ citation hint if provenance)
    • Build user content: segment-tagged text OR plain OCR text + optional base64 images
    • Determine response schema (wrapped if provenance)
    • Call LLM via GenAIClient.invoke()
    • Write ix_result.result + meta_data
    • Resolve segment citations → ProvenanceData (if provenance)
    ↓
[ResponseHandlerStep]  (always runs)
    • Attach flat OCR text (if include_ocr_text)
    • Strip bounding boxes (if not include_geometries)
    • Delete response_ix.context (never serialised)
    ↓
[ResponseIX]
    ix_result.result     ← structured extraction matching UseCaseResponse schema
    ocr_result           ← OCR details (empty pages unless include_geometries)
    provenance           ← field-level source map (None unless include_provenance)
    error / warning      ← diagnostics
    metadata             ← timings, hostname
    ↓
[Transport layer]
    Serialise → deliver to caller
    Persist (optional) → keyed by ix_id

17. Key design decisions

Decision Rationale
Two-method Step interface (validate + process) Keeps routing logic (should this step run?) separate from execution; steps can be reordered or replaced without touching the pipeline loop
Mutable context on ResponseIX Avoids passing a growing set of intermediate values as extra arguments; the entire state is in one object; context is excluded from serialisation so it never leaks
Single LLM call for provenance The schema is wrapped at runtime to add segment_citations; no second round-trip to the LLM is needed
Segment IDs instead of fuzzy text matching IDs are injected into the prompt; the LLM cites them; lookup is O(1); no approximate matching, no hallucinated coordinates
Global page position for segment IDs Ensures uniqueness across multi-document requests (two documents that both have "page 1" produce p1_l* and p2_l*, not the same IDs)
Provider-neutral request_kwargs The GenAIClient interface accepts a dict; provider-specific translation (e.g. OpenAI image_url format) happens inside each adapter, not in the step
Normalised bounding boxes (01) Resolution-independent; frontend multiplies by rendered page dimensions to get pixel coordinates
Transport-agnostic pipeline Kafka, FastAPI, and DB adapters all call the same Pipeline.start()
Config Server for use cases Decouples use-case authoring from deployment; allows use cases to be updated without redeploying the pipeline