From 97aa24f478055f9793cf6db7d2d0372666f73a9c Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 11:14:04 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20SetupStep=20=E2=80=94=20valid?= =?UTF-8?q?ate=20+=20fetch=20+=20MIME=20+=20pages=20(spec=20=C2=A76.1)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First pipeline step. Validates the request (IX_000_002 on empty context), normalises every Context.files entry to a FileRef, downloads them in parallel via asyncio.gather, byte-sniffs MIMEs (IX_000_005 for unsupported), loads the use-case pair from REGISTRY (IX_001_001 on miss), and builds the flat pages + page_metadata list on response_ix.context. Fetcher / ingestor / MIME detector / tmp_dir / fetch_config all inject via the constructor so unit tests stay hermetic — production wires the real ix.ingestion defaults via the app factory. 7 unit tests in tests/unit/test_setup_step.py cover validate errors, happy path (fetcher + ingestor invoked correctly, context populated, use_case_name echoed), FileRef headers pass through, unsupported MIME -> IX_000_005, unknown use case -> IX_001_001, text-only request, and the _InternalContext type assertion. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ix/pipeline/setup_step.py | 164 +++++++++++++++++++++ tests/unit/test_setup_step.py | 260 ++++++++++++++++++++++++++++++++++ 2 files changed, 424 insertions(+) create mode 100644 src/ix/pipeline/setup_step.py create mode 100644 tests/unit/test_setup_step.py diff --git a/src/ix/pipeline/setup_step.py b/src/ix/pipeline/setup_step.py new file mode 100644 index 0000000..707f34d --- /dev/null +++ b/src/ix/pipeline/setup_step.py @@ -0,0 +1,164 @@ +"""SetupStep — request validation + file ingestion (spec §6.1). + +Runs first in the pipeline. Responsibilities: + +* Reject nonsense requests up front (``IX_000_000`` / ``IX_000_002``). +* Normalise ``Context.files`` entries: plain ``str`` → ``FileRef(url=s, headers={})``. +* Download every file in parallel (``asyncio.gather``) via the injected + ``fetcher`` callable (default: :func:`ix.ingestion.fetch.fetch_file`). +* Byte-sniff MIMEs; gate unsupported ones via ``IX_000_005``. +* Load the use case pair from :data:`ix.use_cases.REGISTRY` + (``IX_001_001`` on miss). +* Hand the fetched files + raw texts to the injected ``ingestor`` so + ``context.pages`` + ``context.page_metadata`` are ready for the OCRStep. + +Dependency-inject the fetcher/ingestor/mime-detector so unit tests stay +hermetic — production wires the defaults via the app factory. +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import Any, Protocol + +from ix.contracts import FileRef, RequestIX, ResponseIX +from ix.contracts.response import _InternalContext +from ix.errors import IXErrorCode, IXException +from ix.ingestion import ( + DocumentIngestor, + FetchConfig, + detect_mime, + fetch_file, + require_supported, +) +from ix.pipeline.step import Step +from ix.use_cases import get_use_case + + +class _Fetcher(Protocol): + """Async callable that downloads one file to ``tmp_dir``.""" + + async def __call__( + self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig + ) -> Path: ... + + +class _Ingestor(Protocol): + """Turns fetched files + raw texts into a flat Page list.""" + + def build_pages( + self, + files: list[tuple[Path, str]], + texts: list[str], + ) -> tuple[list[Any], list[Any]]: ... + + +class _MimeDetector(Protocol): + """Returns the canonical MIME string for a local file.""" + + def __call__(self, path: Path) -> str: ... + + +class SetupStep(Step): + """First pipeline step: validate + fetch + MIME + use-case load + pages.""" + + def __init__( + self, + fetcher: _Fetcher | None = None, + ingestor: _Ingestor | None = None, + tmp_dir: Path | None = None, + fetch_config: FetchConfig | None = None, + mime_detector: _MimeDetector | None = None, + ) -> None: + self._fetcher: _Fetcher = fetcher or fetch_file + self._ingestor: _Ingestor = ingestor or DocumentIngestor() + self._tmp_dir = tmp_dir or Path("/tmp/ix") + self._fetch_config = fetch_config + self._mime_detector: _MimeDetector = mime_detector or detect_mime + + async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool: + if request_ix is None: # pragma: no cover - runtime sanity; Pydantic rejects it earlier + raise IXException(IXErrorCode.IX_000_000) + ctx = request_ix.context + if not ctx.files and not ctx.texts: + raise IXException(IXErrorCode.IX_000_002) + return True + + async def process( + self, request_ix: RequestIX, response_ix: ResponseIX + ) -> ResponseIX: + # 1. Load the use-case pair — early so an unknown name fails before + # we waste time downloading files. + use_case_request_cls, use_case_response_cls = get_use_case(request_ix.use_case) + use_case_request = use_case_request_cls() + + # 2. Resolve the per-request scratch directory. ix_id is assigned + # by the transport adapter — fall back to request_id for the MVP + # unit tests that don't set ix_id. + sub = request_ix.ix_id or request_ix.request_id + request_tmp = self._tmp_dir / sub + request_tmp.mkdir(parents=True, exist_ok=True) + + # 3. Normalise file entries to FileRef. Plain strings get empty headers. + normalised: list[FileRef] = [ + FileRef(url=entry, headers={}) if isinstance(entry, str) else entry + for entry in request_ix.context.files + ] + + # 4. Download in parallel. No retry — IX_000_007 propagates. + fetch_cfg = self._resolve_fetch_config() + local_paths: list[Path] = [] + if normalised: + local_paths = list( + await asyncio.gather( + *( + self._fetcher(file_ref, request_tmp, fetch_cfg) + for file_ref in normalised + ) + ) + ) + + # 5. MIME-sniff each download; reject unsupported. + files_with_mime: list[tuple[Path, str]] = [] + for path in local_paths: + mime = self._mime_detector(path) + require_supported(mime) + files_with_mime.append((path, mime)) + + # 6. Build the flat Page list + per-page metadata. + pages, page_metadata = self._ingestor.build_pages( + files=files_with_mime, + texts=list(request_ix.context.texts), + ) + + # 7. Stash everything on the internal context for downstream steps. + response_ix.context = _InternalContext( + pages=pages, + files=files_with_mime, + texts=list(request_ix.context.texts), + use_case_request=use_case_request, + use_case_response=use_case_response_cls, + segment_index=None, + page_metadata=page_metadata, + tmp_dir=request_tmp, + ) + + # 8. Echo use-case display name onto the public response. + response_ix.use_case_name = getattr(use_case_request, "use_case_name", None) + + return response_ix + + def _resolve_fetch_config(self) -> FetchConfig: + if self._fetch_config is not None: + return self._fetch_config + # Fallback defaults — used only when the caller didn't inject one. + # Real values land via ix.config in Chunk 3. + return FetchConfig( + connect_timeout_s=10.0, + read_timeout_s=30.0, + max_bytes=50 * 1024 * 1024, + ) + + +__all__ = ["SetupStep"] diff --git a/tests/unit/test_setup_step.py b/tests/unit/test_setup_step.py new file mode 100644 index 0000000..3376c22 --- /dev/null +++ b/tests/unit/test_setup_step.py @@ -0,0 +1,260 @@ +"""Tests for :class:`ix.pipeline.setup_step.SetupStep` (spec §6.1).""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from ix.contracts import ( + Context, + FileRef, + OCROptions, + Options, + ProvenanceOptions, + RequestIX, + ResponseIX, +) +from ix.contracts.response import _InternalContext +from ix.errors import IXErrorCode, IXException +from ix.ingestion import FetchConfig +from ix.pipeline.setup_step import SetupStep +from ix.segmentation import PageMetadata + + +class FakeFetcher: + """Captures FileRef + tmp_dir + cfg; returns a pre-set path per URL.""" + + def __init__(self, routes: dict[str, Path]) -> None: + self.routes = routes + self.calls: list[tuple[FileRef, Path, FetchConfig]] = [] + + async def __call__( + self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig + ) -> Path: + self.calls.append((file_ref, tmp_dir, cfg)) + if file_ref.url not in self.routes: + raise IXException(IXErrorCode.IX_000_007, detail=file_ref.url) + return self.routes[file_ref.url] + + +class FakeIngestor: + """Returns canned pages + metas; records build_pages arguments.""" + + def __init__(self, pages_by_file: list[list]) -> None: + # Each entry corresponds to one file in the input. + self.pages_by_file = pages_by_file + self.build_calls: list[tuple[list, list[str]]] = [] + + def build_pages( + self, + files: list[tuple[Path, str]], + texts: list[str], + ) -> tuple[list, list[PageMetadata]]: + self.build_calls.append((files, texts)) + + # Flat out pages keyed by file_index. + from ix.contracts import Page + + pages: list = [] + metas: list[PageMetadata] = [] + for file_index, _ in enumerate(files): + canned = self.pages_by_file[file_index] + for w, h in canned: + pages.append( + Page(page_no=len(pages) + 1, width=w, height=h, lines=[]) + ) + metas.append(PageMetadata(file_index=file_index)) + for _ in texts: + pages.append(Page(page_no=len(pages) + 1, width=0.0, height=0.0, lines=[])) + metas.append(PageMetadata(file_index=None)) + return pages, metas + + +class _AlwaysMimePdf: + """detect_mime replacement that always returns application/pdf.""" + + def __call__(self, path: Path) -> str: + return "application/pdf" + + +def _make_response() -> ResponseIX: + return ResponseIX() + + +def _make_cfg() -> FetchConfig: + return FetchConfig(connect_timeout_s=1.0, read_timeout_s=2.0, max_bytes=10_000) + + +def _make_request( + files: list[str | FileRef] | None = None, + texts: list[str] | None = None, + use_case: str = "bank_statement_header", +) -> RequestIX: + return RequestIX( + use_case=use_case, + ix_client_id="test", + request_id="r-1", + context=Context(files=files or [], texts=texts or []), + options=Options( + ocr=OCROptions(use_ocr=True), + provenance=ProvenanceOptions(include_provenance=True), + ), + ) + + +class TestValidate: + async def test_empty_context_raises_IX_000_002(self, tmp_path: Path) -> None: + step = SetupStep( + fetcher=FakeFetcher({}), + ingestor=FakeIngestor([]), + tmp_dir=tmp_path, + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request(files=[], texts=[]) + with pytest.raises(IXException) as ei: + await step.validate(req, _make_response()) + assert ei.value.code is IXErrorCode.IX_000_002 + + +class TestProcessHappyPath: + async def test_files_downloaded_mime_checked_use_case_loaded( + self, tmp_path: Path + ) -> None: + routes = {"http://host/a.pdf": tmp_path / "a.pdf"} + for p in routes.values(): + p.write_bytes(b"%PDF-1.4") + fetcher = FakeFetcher(routes) + ingestor = FakeIngestor([[(200.0, 300.0), (200.0, 300.0)]]) + step = SetupStep( + fetcher=fetcher, + ingestor=ingestor, + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request(files=["http://host/a.pdf"]) + resp = _make_response() + assert await step.validate(req, resp) is True + resp = await step.process(req, resp) + + # Fetcher invoked once with the URL wrapped in a FileRef. + assert len(fetcher.calls) == 1 + assert fetcher.calls[0][0].url == "http://host/a.pdf" + + # Ingestor received [(local_path, mime)] + empty texts. + assert len(ingestor.build_calls) == 1 + files, texts = ingestor.build_calls[0] + assert files == [(routes["http://host/a.pdf"], "application/pdf")] + assert texts == [] + + # Context populated. + ctx = resp.context + assert ctx is not None + assert len(getattr(ctx, "pages", [])) == 2 + assert len(getattr(ctx, "page_metadata", [])) == 2 + assert getattr(ctx, "texts", None) == [] + assert getattr(ctx, "files", None) is not None + + # Use case echoed. + assert resp.use_case_name == "Bank Statement Header" + + async def test_fileref_headers_pass_through(self, tmp_path: Path) -> None: + routes = {"http://host/with-auth.pdf": tmp_path / "f.pdf"} + for p in routes.values(): + p.write_bytes(b"%PDF-1.4") + fetcher = FakeFetcher(routes) + ingestor = FakeIngestor([[(10.0, 10.0)]]) + step = SetupStep( + fetcher=fetcher, + ingestor=ingestor, + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request( + files=[FileRef(url="http://host/with-auth.pdf", headers={"Authorization": "Token z"})], + ) + await step.process(req, _make_response()) + fr = fetcher.calls[0][0] + assert fr.headers == {"Authorization": "Token z"} + + +class TestProcessErrors: + async def test_unsupported_mime_raises_IX_000_005(self, tmp_path: Path) -> None: + routes = {"http://host/a.txt": tmp_path / "a.txt"} + routes["http://host/a.txt"].write_bytes(b"hello") + fetcher = FakeFetcher(routes) + ingestor = FakeIngestor([[(10.0, 10.0)]]) + + class _TextMime: + def __call__(self, path: Path) -> str: + return "text/plain" + + step = SetupStep( + fetcher=fetcher, + ingestor=ingestor, + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_TextMime(), + ) + req = _make_request(files=["http://host/a.txt"]) + with pytest.raises(IXException) as ei: + await step.process(req, _make_response()) + assert ei.value.code is IXErrorCode.IX_000_005 + + async def test_unknown_use_case_raises_IX_001_001(self, tmp_path: Path) -> None: + step = SetupStep( + fetcher=FakeFetcher({}), + ingestor=FakeIngestor([]), + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request(files=[], texts=["hello"], use_case="nope") + # Validate passes (we have context). Process should raise IX_001_001. + with pytest.raises(IXException) as ei: + await step.process(req, _make_response()) + assert ei.value.code is IXErrorCode.IX_001_001 + + +class TestTextOnly: + async def test_texts_only_loads_use_case_and_builds_text_pages( + self, tmp_path: Path + ) -> None: + fetcher = FakeFetcher({}) + ingestor = FakeIngestor([]) + step = SetupStep( + fetcher=fetcher, + ingestor=ingestor, + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request(files=[], texts=["hello", "there"]) + resp = _make_response() + assert await step.validate(req, resp) is True + resp = await step.process(req, resp) + + assert fetcher.calls == [] + assert ingestor.build_calls[0][1] == ["hello", "there"] + ctx = resp.context + assert ctx is not None + assert ctx.texts == ["hello", "there"] + + +class TestInternalContextShape: + async def test_context_is_internal_context_instance(self, tmp_path: Path) -> None: + fetcher = FakeFetcher({}) + ingestor = FakeIngestor([]) + step = SetupStep( + fetcher=fetcher, + ingestor=ingestor, + tmp_dir=tmp_path / "work", + fetch_config=_make_cfg(), + mime_detector=_AlwaysMimePdf(), + ) + req = _make_request(files=[], texts=["hello"]) + resp = await step.process(req, _make_response()) + assert isinstance(resp.context, _InternalContext) -- 2.45.2