First pipeline step. Validates the request (IX_000_002 on empty context), normalises every Context.files entry to a FileRef, downloads them in parallel via asyncio.gather, byte-sniffs MIMEs (IX_000_005 for unsupported), loads the use-case pair from REGISTRY (IX_001_001 on miss), and builds the flat pages + page_metadata list on response_ix.context. Fetcher / ingestor / MIME detector / tmp_dir / fetch_config all inject via the constructor so unit tests stay hermetic — production wires the real ix.ingestion defaults via the app factory. 7 unit tests in tests/unit/test_setup_step.py cover validate errors, happy path (fetcher + ingestor invoked correctly, context populated, use_case_name echoed), FileRef headers pass through, unsupported MIME -> IX_000_005, unknown use case -> IX_001_001, text-only request, and the _InternalContext type assertion. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
260 lines
8.7 KiB
Python
260 lines
8.7 KiB
Python
"""Tests for :class:`ix.pipeline.setup_step.SetupStep` (spec §6.1)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from ix.contracts import (
|
|
Context,
|
|
FileRef,
|
|
OCROptions,
|
|
Options,
|
|
ProvenanceOptions,
|
|
RequestIX,
|
|
ResponseIX,
|
|
)
|
|
from ix.contracts.response import _InternalContext
|
|
from ix.errors import IXErrorCode, IXException
|
|
from ix.ingestion import FetchConfig
|
|
from ix.pipeline.setup_step import SetupStep
|
|
from ix.segmentation import PageMetadata
|
|
|
|
|
|
class FakeFetcher:
|
|
"""Captures FileRef + tmp_dir + cfg; returns a pre-set path per URL."""
|
|
|
|
def __init__(self, routes: dict[str, Path]) -> None:
|
|
self.routes = routes
|
|
self.calls: list[tuple[FileRef, Path, FetchConfig]] = []
|
|
|
|
async def __call__(
|
|
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
|
|
) -> Path:
|
|
self.calls.append((file_ref, tmp_dir, cfg))
|
|
if file_ref.url not in self.routes:
|
|
raise IXException(IXErrorCode.IX_000_007, detail=file_ref.url)
|
|
return self.routes[file_ref.url]
|
|
|
|
|
|
class FakeIngestor:
|
|
"""Returns canned pages + metas; records build_pages arguments."""
|
|
|
|
def __init__(self, pages_by_file: list[list]) -> None:
|
|
# Each entry corresponds to one file in the input.
|
|
self.pages_by_file = pages_by_file
|
|
self.build_calls: list[tuple[list, list[str]]] = []
|
|
|
|
def build_pages(
|
|
self,
|
|
files: list[tuple[Path, str]],
|
|
texts: list[str],
|
|
) -> tuple[list, list[PageMetadata]]:
|
|
self.build_calls.append((files, texts))
|
|
|
|
# Flat out pages keyed by file_index.
|
|
from ix.contracts import Page
|
|
|
|
pages: list = []
|
|
metas: list[PageMetadata] = []
|
|
for file_index, _ in enumerate(files):
|
|
canned = self.pages_by_file[file_index]
|
|
for w, h in canned:
|
|
pages.append(
|
|
Page(page_no=len(pages) + 1, width=w, height=h, lines=[])
|
|
)
|
|
metas.append(PageMetadata(file_index=file_index))
|
|
for _ in texts:
|
|
pages.append(Page(page_no=len(pages) + 1, width=0.0, height=0.0, lines=[]))
|
|
metas.append(PageMetadata(file_index=None))
|
|
return pages, metas
|
|
|
|
|
|
class _AlwaysMimePdf:
|
|
"""detect_mime replacement that always returns application/pdf."""
|
|
|
|
def __call__(self, path: Path) -> str:
|
|
return "application/pdf"
|
|
|
|
|
|
def _make_response() -> ResponseIX:
|
|
return ResponseIX()
|
|
|
|
|
|
def _make_cfg() -> FetchConfig:
|
|
return FetchConfig(connect_timeout_s=1.0, read_timeout_s=2.0, max_bytes=10_000)
|
|
|
|
|
|
def _make_request(
|
|
files: list[str | FileRef] | None = None,
|
|
texts: list[str] | None = None,
|
|
use_case: str = "bank_statement_header",
|
|
) -> RequestIX:
|
|
return RequestIX(
|
|
use_case=use_case,
|
|
ix_client_id="test",
|
|
request_id="r-1",
|
|
context=Context(files=files or [], texts=texts or []),
|
|
options=Options(
|
|
ocr=OCROptions(use_ocr=True),
|
|
provenance=ProvenanceOptions(include_provenance=True),
|
|
),
|
|
)
|
|
|
|
|
|
class TestValidate:
|
|
async def test_empty_context_raises_IX_000_002(self, tmp_path: Path) -> None:
|
|
step = SetupStep(
|
|
fetcher=FakeFetcher({}),
|
|
ingestor=FakeIngestor([]),
|
|
tmp_dir=tmp_path,
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(files=[], texts=[])
|
|
with pytest.raises(IXException) as ei:
|
|
await step.validate(req, _make_response())
|
|
assert ei.value.code is IXErrorCode.IX_000_002
|
|
|
|
|
|
class TestProcessHappyPath:
|
|
async def test_files_downloaded_mime_checked_use_case_loaded(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
routes = {"http://host/a.pdf": tmp_path / "a.pdf"}
|
|
for p in routes.values():
|
|
p.write_bytes(b"%PDF-1.4")
|
|
fetcher = FakeFetcher(routes)
|
|
ingestor = FakeIngestor([[(200.0, 300.0), (200.0, 300.0)]])
|
|
step = SetupStep(
|
|
fetcher=fetcher,
|
|
ingestor=ingestor,
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(files=["http://host/a.pdf"])
|
|
resp = _make_response()
|
|
assert await step.validate(req, resp) is True
|
|
resp = await step.process(req, resp)
|
|
|
|
# Fetcher invoked once with the URL wrapped in a FileRef.
|
|
assert len(fetcher.calls) == 1
|
|
assert fetcher.calls[0][0].url == "http://host/a.pdf"
|
|
|
|
# Ingestor received [(local_path, mime)] + empty texts.
|
|
assert len(ingestor.build_calls) == 1
|
|
files, texts = ingestor.build_calls[0]
|
|
assert files == [(routes["http://host/a.pdf"], "application/pdf")]
|
|
assert texts == []
|
|
|
|
# Context populated.
|
|
ctx = resp.context
|
|
assert ctx is not None
|
|
assert len(getattr(ctx, "pages", [])) == 2
|
|
assert len(getattr(ctx, "page_metadata", [])) == 2
|
|
assert getattr(ctx, "texts", None) == []
|
|
assert getattr(ctx, "files", None) is not None
|
|
|
|
# Use case echoed.
|
|
assert resp.use_case_name == "Bank Statement Header"
|
|
|
|
async def test_fileref_headers_pass_through(self, tmp_path: Path) -> None:
|
|
routes = {"http://host/with-auth.pdf": tmp_path / "f.pdf"}
|
|
for p in routes.values():
|
|
p.write_bytes(b"%PDF-1.4")
|
|
fetcher = FakeFetcher(routes)
|
|
ingestor = FakeIngestor([[(10.0, 10.0)]])
|
|
step = SetupStep(
|
|
fetcher=fetcher,
|
|
ingestor=ingestor,
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(
|
|
files=[FileRef(url="http://host/with-auth.pdf", headers={"Authorization": "Token z"})],
|
|
)
|
|
await step.process(req, _make_response())
|
|
fr = fetcher.calls[0][0]
|
|
assert fr.headers == {"Authorization": "Token z"}
|
|
|
|
|
|
class TestProcessErrors:
|
|
async def test_unsupported_mime_raises_IX_000_005(self, tmp_path: Path) -> None:
|
|
routes = {"http://host/a.txt": tmp_path / "a.txt"}
|
|
routes["http://host/a.txt"].write_bytes(b"hello")
|
|
fetcher = FakeFetcher(routes)
|
|
ingestor = FakeIngestor([[(10.0, 10.0)]])
|
|
|
|
class _TextMime:
|
|
def __call__(self, path: Path) -> str:
|
|
return "text/plain"
|
|
|
|
step = SetupStep(
|
|
fetcher=fetcher,
|
|
ingestor=ingestor,
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_TextMime(),
|
|
)
|
|
req = _make_request(files=["http://host/a.txt"])
|
|
with pytest.raises(IXException) as ei:
|
|
await step.process(req, _make_response())
|
|
assert ei.value.code is IXErrorCode.IX_000_005
|
|
|
|
async def test_unknown_use_case_raises_IX_001_001(self, tmp_path: Path) -> None:
|
|
step = SetupStep(
|
|
fetcher=FakeFetcher({}),
|
|
ingestor=FakeIngestor([]),
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(files=[], texts=["hello"], use_case="nope")
|
|
# Validate passes (we have context). Process should raise IX_001_001.
|
|
with pytest.raises(IXException) as ei:
|
|
await step.process(req, _make_response())
|
|
assert ei.value.code is IXErrorCode.IX_001_001
|
|
|
|
|
|
class TestTextOnly:
|
|
async def test_texts_only_loads_use_case_and_builds_text_pages(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
fetcher = FakeFetcher({})
|
|
ingestor = FakeIngestor([])
|
|
step = SetupStep(
|
|
fetcher=fetcher,
|
|
ingestor=ingestor,
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(files=[], texts=["hello", "there"])
|
|
resp = _make_response()
|
|
assert await step.validate(req, resp) is True
|
|
resp = await step.process(req, resp)
|
|
|
|
assert fetcher.calls == []
|
|
assert ingestor.build_calls[0][1] == ["hello", "there"]
|
|
ctx = resp.context
|
|
assert ctx is not None
|
|
assert ctx.texts == ["hello", "there"]
|
|
|
|
|
|
class TestInternalContextShape:
|
|
async def test_context_is_internal_context_instance(self, tmp_path: Path) -> None:
|
|
fetcher = FakeFetcher({})
|
|
ingestor = FakeIngestor([])
|
|
step = SetupStep(
|
|
fetcher=fetcher,
|
|
ingestor=ingestor,
|
|
tmp_dir=tmp_path / "work",
|
|
fetch_config=_make_cfg(),
|
|
mime_detector=_AlwaysMimePdf(),
|
|
)
|
|
req = _make_request(files=[], texts=["hello"])
|
|
resp = await step.process(req, _make_response())
|
|
assert isinstance(resp.context, _InternalContext)
|