infoxtractor/src/ix/pipeline/setup_step.py
Dirk Riemann 97aa24f478
All checks were successful
tests / test (push) Successful in 1m13s
tests / test (pull_request) Successful in 1m19s
feat(pipeline): SetupStep — validate + fetch + MIME + pages (spec §6.1)
First pipeline step. Validates the request (IX_000_002 on empty context),
normalises every Context.files entry to a FileRef, downloads them in
parallel via asyncio.gather, byte-sniffs MIMEs (IX_000_005 for
unsupported), loads the use-case pair from REGISTRY (IX_001_001 on
miss), and builds the flat pages + page_metadata list on
response_ix.context.

Fetcher / ingestor / MIME detector / tmp_dir / fetch_config all inject
via the constructor so unit tests stay hermetic — production wires the
real ix.ingestion defaults via the app factory.

7 unit tests in tests/unit/test_setup_step.py cover validate errors,
happy path (fetcher + ingestor invoked correctly, context populated,
use_case_name echoed), FileRef headers pass through, unsupported MIME
-> IX_000_005, unknown use case -> IX_001_001, text-only request, and
the _InternalContext type assertion.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:14:04 +02:00

164 lines
5.8 KiB
Python

"""SetupStep — request validation + file ingestion (spec §6.1).
Runs first in the pipeline. Responsibilities:
* Reject nonsense requests up front (``IX_000_000`` / ``IX_000_002``).
* Normalise ``Context.files`` entries: plain ``str`` → ``FileRef(url=s, headers={})``.
* Download every file in parallel (``asyncio.gather``) via the injected
``fetcher`` callable (default: :func:`ix.ingestion.fetch.fetch_file`).
* Byte-sniff MIMEs; gate unsupported ones via ``IX_000_005``.
* Load the use case pair from :data:`ix.use_cases.REGISTRY`
(``IX_001_001`` on miss).
* Hand the fetched files + raw texts to the injected ``ingestor`` so
``context.pages`` + ``context.page_metadata`` are ready for the OCRStep.
Dependency-inject the fetcher/ingestor/mime-detector so unit tests stay
hermetic — production wires the defaults via the app factory.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Protocol
from ix.contracts import FileRef, RequestIX, ResponseIX
from ix.contracts.response import _InternalContext
from ix.errors import IXErrorCode, IXException
from ix.ingestion import (
DocumentIngestor,
FetchConfig,
detect_mime,
fetch_file,
require_supported,
)
from ix.pipeline.step import Step
from ix.use_cases import get_use_case
class _Fetcher(Protocol):
"""Async callable that downloads one file to ``tmp_dir``."""
async def __call__(
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
) -> Path: ...
class _Ingestor(Protocol):
"""Turns fetched files + raw texts into a flat Page list."""
def build_pages(
self,
files: list[tuple[Path, str]],
texts: list[str],
) -> tuple[list[Any], list[Any]]: ...
class _MimeDetector(Protocol):
"""Returns the canonical MIME string for a local file."""
def __call__(self, path: Path) -> str: ...
class SetupStep(Step):
"""First pipeline step: validate + fetch + MIME + use-case load + pages."""
def __init__(
self,
fetcher: _Fetcher | None = None,
ingestor: _Ingestor | None = None,
tmp_dir: Path | None = None,
fetch_config: FetchConfig | None = None,
mime_detector: _MimeDetector | None = None,
) -> None:
self._fetcher: _Fetcher = fetcher or fetch_file
self._ingestor: _Ingestor = ingestor or DocumentIngestor()
self._tmp_dir = tmp_dir or Path("/tmp/ix")
self._fetch_config = fetch_config
self._mime_detector: _MimeDetector = mime_detector or detect_mime
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
if request_ix is None: # pragma: no cover - runtime sanity; Pydantic rejects it earlier
raise IXException(IXErrorCode.IX_000_000)
ctx = request_ix.context
if not ctx.files and not ctx.texts:
raise IXException(IXErrorCode.IX_000_002)
return True
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
# 1. Load the use-case pair — early so an unknown name fails before
# we waste time downloading files.
use_case_request_cls, use_case_response_cls = get_use_case(request_ix.use_case)
use_case_request = use_case_request_cls()
# 2. Resolve the per-request scratch directory. ix_id is assigned
# by the transport adapter — fall back to request_id for the MVP
# unit tests that don't set ix_id.
sub = request_ix.ix_id or request_ix.request_id
request_tmp = self._tmp_dir / sub
request_tmp.mkdir(parents=True, exist_ok=True)
# 3. Normalise file entries to FileRef. Plain strings get empty headers.
normalised: list[FileRef] = [
FileRef(url=entry, headers={}) if isinstance(entry, str) else entry
for entry in request_ix.context.files
]
# 4. Download in parallel. No retry — IX_000_007 propagates.
fetch_cfg = self._resolve_fetch_config()
local_paths: list[Path] = []
if normalised:
local_paths = list(
await asyncio.gather(
*(
self._fetcher(file_ref, request_tmp, fetch_cfg)
for file_ref in normalised
)
)
)
# 5. MIME-sniff each download; reject unsupported.
files_with_mime: list[tuple[Path, str]] = []
for path in local_paths:
mime = self._mime_detector(path)
require_supported(mime)
files_with_mime.append((path, mime))
# 6. Build the flat Page list + per-page metadata.
pages, page_metadata = self._ingestor.build_pages(
files=files_with_mime,
texts=list(request_ix.context.texts),
)
# 7. Stash everything on the internal context for downstream steps.
response_ix.context = _InternalContext(
pages=pages,
files=files_with_mime,
texts=list(request_ix.context.texts),
use_case_request=use_case_request,
use_case_response=use_case_response_cls,
segment_index=None,
page_metadata=page_metadata,
tmp_dir=request_tmp,
)
# 8. Echo use-case display name onto the public response.
response_ix.use_case_name = getattr(use_case_request, "use_case_name", None)
return response_ix
def _resolve_fetch_config(self) -> FetchConfig:
if self._fetch_config is not None:
return self._fetch_config
# Fallback defaults — used only when the caller didn't inject one.
# Real values land via ix.config in Chunk 3.
return FetchConfig(
connect_timeout_s=10.0,
read_timeout_s=30.0,
max_bytes=50 * 1024 * 1024,
)
__all__ = ["SetupStep"]