Merge pull request 'feat(pipeline): SetupStep (spec §6.1)' (#12) from feat/step-setup into main
Some checks are pending
tests / test (push) Waiting to run

This commit is contained in:
goldstein 2026-04-18 09:14:19 +00:00
commit 632acdcd26
2 changed files with 424 additions and 0 deletions

View file

@ -0,0 +1,164 @@
"""SetupStep — request validation + file ingestion (spec §6.1).
Runs first in the pipeline. Responsibilities:
* Reject nonsense requests up front (``IX_000_000`` / ``IX_000_002``).
* Normalise ``Context.files`` entries: plain ``str`` ``FileRef(url=s, headers={})``.
* Download every file in parallel (``asyncio.gather``) via the injected
``fetcher`` callable (default: :func:`ix.ingestion.fetch.fetch_file`).
* Byte-sniff MIMEs; gate unsupported ones via ``IX_000_005``.
* Load the use case pair from :data:`ix.use_cases.REGISTRY`
(``IX_001_001`` on miss).
* Hand the fetched files + raw texts to the injected ``ingestor`` so
``context.pages`` + ``context.page_metadata`` are ready for the OCRStep.
Dependency-inject the fetcher/ingestor/mime-detector so unit tests stay
hermetic production wires the defaults via the app factory.
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any, Protocol
from ix.contracts import FileRef, RequestIX, ResponseIX
from ix.contracts.response import _InternalContext
from ix.errors import IXErrorCode, IXException
from ix.ingestion import (
DocumentIngestor,
FetchConfig,
detect_mime,
fetch_file,
require_supported,
)
from ix.pipeline.step import Step
from ix.use_cases import get_use_case
class _Fetcher(Protocol):
"""Async callable that downloads one file to ``tmp_dir``."""
async def __call__(
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
) -> Path: ...
class _Ingestor(Protocol):
"""Turns fetched files + raw texts into a flat Page list."""
def build_pages(
self,
files: list[tuple[Path, str]],
texts: list[str],
) -> tuple[list[Any], list[Any]]: ...
class _MimeDetector(Protocol):
"""Returns the canonical MIME string for a local file."""
def __call__(self, path: Path) -> str: ...
class SetupStep(Step):
"""First pipeline step: validate + fetch + MIME + use-case load + pages."""
def __init__(
self,
fetcher: _Fetcher | None = None,
ingestor: _Ingestor | None = None,
tmp_dir: Path | None = None,
fetch_config: FetchConfig | None = None,
mime_detector: _MimeDetector | None = None,
) -> None:
self._fetcher: _Fetcher = fetcher or fetch_file
self._ingestor: _Ingestor = ingestor or DocumentIngestor()
self._tmp_dir = tmp_dir or Path("/tmp/ix")
self._fetch_config = fetch_config
self._mime_detector: _MimeDetector = mime_detector or detect_mime
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
if request_ix is None: # pragma: no cover - runtime sanity; Pydantic rejects it earlier
raise IXException(IXErrorCode.IX_000_000)
ctx = request_ix.context
if not ctx.files and not ctx.texts:
raise IXException(IXErrorCode.IX_000_002)
return True
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
# 1. Load the use-case pair — early so an unknown name fails before
# we waste time downloading files.
use_case_request_cls, use_case_response_cls = get_use_case(request_ix.use_case)
use_case_request = use_case_request_cls()
# 2. Resolve the per-request scratch directory. ix_id is assigned
# by the transport adapter — fall back to request_id for the MVP
# unit tests that don't set ix_id.
sub = request_ix.ix_id or request_ix.request_id
request_tmp = self._tmp_dir / sub
request_tmp.mkdir(parents=True, exist_ok=True)
# 3. Normalise file entries to FileRef. Plain strings get empty headers.
normalised: list[FileRef] = [
FileRef(url=entry, headers={}) if isinstance(entry, str) else entry
for entry in request_ix.context.files
]
# 4. Download in parallel. No retry — IX_000_007 propagates.
fetch_cfg = self._resolve_fetch_config()
local_paths: list[Path] = []
if normalised:
local_paths = list(
await asyncio.gather(
*(
self._fetcher(file_ref, request_tmp, fetch_cfg)
for file_ref in normalised
)
)
)
# 5. MIME-sniff each download; reject unsupported.
files_with_mime: list[tuple[Path, str]] = []
for path in local_paths:
mime = self._mime_detector(path)
require_supported(mime)
files_with_mime.append((path, mime))
# 6. Build the flat Page list + per-page metadata.
pages, page_metadata = self._ingestor.build_pages(
files=files_with_mime,
texts=list(request_ix.context.texts),
)
# 7. Stash everything on the internal context for downstream steps.
response_ix.context = _InternalContext(
pages=pages,
files=files_with_mime,
texts=list(request_ix.context.texts),
use_case_request=use_case_request,
use_case_response=use_case_response_cls,
segment_index=None,
page_metadata=page_metadata,
tmp_dir=request_tmp,
)
# 8. Echo use-case display name onto the public response.
response_ix.use_case_name = getattr(use_case_request, "use_case_name", None)
return response_ix
def _resolve_fetch_config(self) -> FetchConfig:
if self._fetch_config is not None:
return self._fetch_config
# Fallback defaults — used only when the caller didn't inject one.
# Real values land via ix.config in Chunk 3.
return FetchConfig(
connect_timeout_s=10.0,
read_timeout_s=30.0,
max_bytes=50 * 1024 * 1024,
)
__all__ = ["SetupStep"]

View file

@ -0,0 +1,260 @@
"""Tests for :class:`ix.pipeline.setup_step.SetupStep` (spec §6.1)."""
from __future__ import annotations
from pathlib import Path
import pytest
from ix.contracts import (
Context,
FileRef,
OCROptions,
Options,
ProvenanceOptions,
RequestIX,
ResponseIX,
)
from ix.contracts.response import _InternalContext
from ix.errors import IXErrorCode, IXException
from ix.ingestion import FetchConfig
from ix.pipeline.setup_step import SetupStep
from ix.segmentation import PageMetadata
class FakeFetcher:
"""Captures FileRef + tmp_dir + cfg; returns a pre-set path per URL."""
def __init__(self, routes: dict[str, Path]) -> None:
self.routes = routes
self.calls: list[tuple[FileRef, Path, FetchConfig]] = []
async def __call__(
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
) -> Path:
self.calls.append((file_ref, tmp_dir, cfg))
if file_ref.url not in self.routes:
raise IXException(IXErrorCode.IX_000_007, detail=file_ref.url)
return self.routes[file_ref.url]
class FakeIngestor:
"""Returns canned pages + metas; records build_pages arguments."""
def __init__(self, pages_by_file: list[list]) -> None:
# Each entry corresponds to one file in the input.
self.pages_by_file = pages_by_file
self.build_calls: list[tuple[list, list[str]]] = []
def build_pages(
self,
files: list[tuple[Path, str]],
texts: list[str],
) -> tuple[list, list[PageMetadata]]:
self.build_calls.append((files, texts))
# Flat out pages keyed by file_index.
from ix.contracts import Page
pages: list = []
metas: list[PageMetadata] = []
for file_index, _ in enumerate(files):
canned = self.pages_by_file[file_index]
for w, h in canned:
pages.append(
Page(page_no=len(pages) + 1, width=w, height=h, lines=[])
)
metas.append(PageMetadata(file_index=file_index))
for _ in texts:
pages.append(Page(page_no=len(pages) + 1, width=0.0, height=0.0, lines=[]))
metas.append(PageMetadata(file_index=None))
return pages, metas
class _AlwaysMimePdf:
"""detect_mime replacement that always returns application/pdf."""
def __call__(self, path: Path) -> str:
return "application/pdf"
def _make_response() -> ResponseIX:
return ResponseIX()
def _make_cfg() -> FetchConfig:
return FetchConfig(connect_timeout_s=1.0, read_timeout_s=2.0, max_bytes=10_000)
def _make_request(
files: list[str | FileRef] | None = None,
texts: list[str] | None = None,
use_case: str = "bank_statement_header",
) -> RequestIX:
return RequestIX(
use_case=use_case,
ix_client_id="test",
request_id="r-1",
context=Context(files=files or [], texts=texts or []),
options=Options(
ocr=OCROptions(use_ocr=True),
provenance=ProvenanceOptions(include_provenance=True),
),
)
class TestValidate:
async def test_empty_context_raises_IX_000_002(self, tmp_path: Path) -> None:
step = SetupStep(
fetcher=FakeFetcher({}),
ingestor=FakeIngestor([]),
tmp_dir=tmp_path,
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(files=[], texts=[])
with pytest.raises(IXException) as ei:
await step.validate(req, _make_response())
assert ei.value.code is IXErrorCode.IX_000_002
class TestProcessHappyPath:
async def test_files_downloaded_mime_checked_use_case_loaded(
self, tmp_path: Path
) -> None:
routes = {"http://host/a.pdf": tmp_path / "a.pdf"}
for p in routes.values():
p.write_bytes(b"%PDF-1.4")
fetcher = FakeFetcher(routes)
ingestor = FakeIngestor([[(200.0, 300.0), (200.0, 300.0)]])
step = SetupStep(
fetcher=fetcher,
ingestor=ingestor,
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(files=["http://host/a.pdf"])
resp = _make_response()
assert await step.validate(req, resp) is True
resp = await step.process(req, resp)
# Fetcher invoked once with the URL wrapped in a FileRef.
assert len(fetcher.calls) == 1
assert fetcher.calls[0][0].url == "http://host/a.pdf"
# Ingestor received [(local_path, mime)] + empty texts.
assert len(ingestor.build_calls) == 1
files, texts = ingestor.build_calls[0]
assert files == [(routes["http://host/a.pdf"], "application/pdf")]
assert texts == []
# Context populated.
ctx = resp.context
assert ctx is not None
assert len(getattr(ctx, "pages", [])) == 2
assert len(getattr(ctx, "page_metadata", [])) == 2
assert getattr(ctx, "texts", None) == []
assert getattr(ctx, "files", None) is not None
# Use case echoed.
assert resp.use_case_name == "Bank Statement Header"
async def test_fileref_headers_pass_through(self, tmp_path: Path) -> None:
routes = {"http://host/with-auth.pdf": tmp_path / "f.pdf"}
for p in routes.values():
p.write_bytes(b"%PDF-1.4")
fetcher = FakeFetcher(routes)
ingestor = FakeIngestor([[(10.0, 10.0)]])
step = SetupStep(
fetcher=fetcher,
ingestor=ingestor,
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(
files=[FileRef(url="http://host/with-auth.pdf", headers={"Authorization": "Token z"})],
)
await step.process(req, _make_response())
fr = fetcher.calls[0][0]
assert fr.headers == {"Authorization": "Token z"}
class TestProcessErrors:
async def test_unsupported_mime_raises_IX_000_005(self, tmp_path: Path) -> None:
routes = {"http://host/a.txt": tmp_path / "a.txt"}
routes["http://host/a.txt"].write_bytes(b"hello")
fetcher = FakeFetcher(routes)
ingestor = FakeIngestor([[(10.0, 10.0)]])
class _TextMime:
def __call__(self, path: Path) -> str:
return "text/plain"
step = SetupStep(
fetcher=fetcher,
ingestor=ingestor,
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_TextMime(),
)
req = _make_request(files=["http://host/a.txt"])
with pytest.raises(IXException) as ei:
await step.process(req, _make_response())
assert ei.value.code is IXErrorCode.IX_000_005
async def test_unknown_use_case_raises_IX_001_001(self, tmp_path: Path) -> None:
step = SetupStep(
fetcher=FakeFetcher({}),
ingestor=FakeIngestor([]),
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(files=[], texts=["hello"], use_case="nope")
# Validate passes (we have context). Process should raise IX_001_001.
with pytest.raises(IXException) as ei:
await step.process(req, _make_response())
assert ei.value.code is IXErrorCode.IX_001_001
class TestTextOnly:
async def test_texts_only_loads_use_case_and_builds_text_pages(
self, tmp_path: Path
) -> None:
fetcher = FakeFetcher({})
ingestor = FakeIngestor([])
step = SetupStep(
fetcher=fetcher,
ingestor=ingestor,
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(files=[], texts=["hello", "there"])
resp = _make_response()
assert await step.validate(req, resp) is True
resp = await step.process(req, resp)
assert fetcher.calls == []
assert ingestor.build_calls[0][1] == ["hello", "there"]
ctx = resp.context
assert ctx is not None
assert ctx.texts == ["hello", "there"]
class TestInternalContextShape:
async def test_context_is_internal_context_instance(self, tmp_path: Path) -> None:
fetcher = FakeFetcher({})
ingestor = FakeIngestor([])
step = SetupStep(
fetcher=fetcher,
ingestor=ingestor,
tmp_dir=tmp_path / "work",
fetch_config=_make_cfg(),
mime_detector=_AlwaysMimePdf(),
)
req = _make_request(files=[], texts=["hello"])
resp = await step.process(req, _make_response())
assert isinstance(resp.context, _InternalContext)