Compare commits
2 commits
d801038c74
...
632acdcd26
| Author | SHA1 | Date | |
|---|---|---|---|
| 632acdcd26 | |||
| 97aa24f478 |
2 changed files with 424 additions and 0 deletions
164
src/ix/pipeline/setup_step.py
Normal file
164
src/ix/pipeline/setup_step.py
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
"""SetupStep — request validation + file ingestion (spec §6.1).
|
||||
|
||||
Runs first in the pipeline. Responsibilities:
|
||||
|
||||
* Reject nonsense requests up front (``IX_000_000`` / ``IX_000_002``).
|
||||
* Normalise ``Context.files`` entries: plain ``str`` → ``FileRef(url=s, headers={})``.
|
||||
* Download every file in parallel (``asyncio.gather``) via the injected
|
||||
``fetcher`` callable (default: :func:`ix.ingestion.fetch.fetch_file`).
|
||||
* Byte-sniff MIMEs; gate unsupported ones via ``IX_000_005``.
|
||||
* Load the use case pair from :data:`ix.use_cases.REGISTRY`
|
||||
(``IX_001_001`` on miss).
|
||||
* Hand the fetched files + raw texts to the injected ``ingestor`` so
|
||||
``context.pages`` + ``context.page_metadata`` are ready for the OCRStep.
|
||||
|
||||
Dependency-inject the fetcher/ingestor/mime-detector so unit tests stay
|
||||
hermetic — production wires the defaults via the app factory.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
from typing import Any, Protocol
|
||||
|
||||
from ix.contracts import FileRef, RequestIX, ResponseIX
|
||||
from ix.contracts.response import _InternalContext
|
||||
from ix.errors import IXErrorCode, IXException
|
||||
from ix.ingestion import (
|
||||
DocumentIngestor,
|
||||
FetchConfig,
|
||||
detect_mime,
|
||||
fetch_file,
|
||||
require_supported,
|
||||
)
|
||||
from ix.pipeline.step import Step
|
||||
from ix.use_cases import get_use_case
|
||||
|
||||
|
||||
class _Fetcher(Protocol):
|
||||
"""Async callable that downloads one file to ``tmp_dir``."""
|
||||
|
||||
async def __call__(
|
||||
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
|
||||
) -> Path: ...
|
||||
|
||||
|
||||
class _Ingestor(Protocol):
|
||||
"""Turns fetched files + raw texts into a flat Page list."""
|
||||
|
||||
def build_pages(
|
||||
self,
|
||||
files: list[tuple[Path, str]],
|
||||
texts: list[str],
|
||||
) -> tuple[list[Any], list[Any]]: ...
|
||||
|
||||
|
||||
class _MimeDetector(Protocol):
|
||||
"""Returns the canonical MIME string for a local file."""
|
||||
|
||||
def __call__(self, path: Path) -> str: ...
|
||||
|
||||
|
||||
class SetupStep(Step):
|
||||
"""First pipeline step: validate + fetch + MIME + use-case load + pages."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
fetcher: _Fetcher | None = None,
|
||||
ingestor: _Ingestor | None = None,
|
||||
tmp_dir: Path | None = None,
|
||||
fetch_config: FetchConfig | None = None,
|
||||
mime_detector: _MimeDetector | None = None,
|
||||
) -> None:
|
||||
self._fetcher: _Fetcher = fetcher or fetch_file
|
||||
self._ingestor: _Ingestor = ingestor or DocumentIngestor()
|
||||
self._tmp_dir = tmp_dir or Path("/tmp/ix")
|
||||
self._fetch_config = fetch_config
|
||||
self._mime_detector: _MimeDetector = mime_detector or detect_mime
|
||||
|
||||
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||
if request_ix is None: # pragma: no cover - runtime sanity; Pydantic rejects it earlier
|
||||
raise IXException(IXErrorCode.IX_000_000)
|
||||
ctx = request_ix.context
|
||||
if not ctx.files and not ctx.texts:
|
||||
raise IXException(IXErrorCode.IX_000_002)
|
||||
return True
|
||||
|
||||
async def process(
|
||||
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||
) -> ResponseIX:
|
||||
# 1. Load the use-case pair — early so an unknown name fails before
|
||||
# we waste time downloading files.
|
||||
use_case_request_cls, use_case_response_cls = get_use_case(request_ix.use_case)
|
||||
use_case_request = use_case_request_cls()
|
||||
|
||||
# 2. Resolve the per-request scratch directory. ix_id is assigned
|
||||
# by the transport adapter — fall back to request_id for the MVP
|
||||
# unit tests that don't set ix_id.
|
||||
sub = request_ix.ix_id or request_ix.request_id
|
||||
request_tmp = self._tmp_dir / sub
|
||||
request_tmp.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# 3. Normalise file entries to FileRef. Plain strings get empty headers.
|
||||
normalised: list[FileRef] = [
|
||||
FileRef(url=entry, headers={}) if isinstance(entry, str) else entry
|
||||
for entry in request_ix.context.files
|
||||
]
|
||||
|
||||
# 4. Download in parallel. No retry — IX_000_007 propagates.
|
||||
fetch_cfg = self._resolve_fetch_config()
|
||||
local_paths: list[Path] = []
|
||||
if normalised:
|
||||
local_paths = list(
|
||||
await asyncio.gather(
|
||||
*(
|
||||
self._fetcher(file_ref, request_tmp, fetch_cfg)
|
||||
for file_ref in normalised
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
# 5. MIME-sniff each download; reject unsupported.
|
||||
files_with_mime: list[tuple[Path, str]] = []
|
||||
for path in local_paths:
|
||||
mime = self._mime_detector(path)
|
||||
require_supported(mime)
|
||||
files_with_mime.append((path, mime))
|
||||
|
||||
# 6. Build the flat Page list + per-page metadata.
|
||||
pages, page_metadata = self._ingestor.build_pages(
|
||||
files=files_with_mime,
|
||||
texts=list(request_ix.context.texts),
|
||||
)
|
||||
|
||||
# 7. Stash everything on the internal context for downstream steps.
|
||||
response_ix.context = _InternalContext(
|
||||
pages=pages,
|
||||
files=files_with_mime,
|
||||
texts=list(request_ix.context.texts),
|
||||
use_case_request=use_case_request,
|
||||
use_case_response=use_case_response_cls,
|
||||
segment_index=None,
|
||||
page_metadata=page_metadata,
|
||||
tmp_dir=request_tmp,
|
||||
)
|
||||
|
||||
# 8. Echo use-case display name onto the public response.
|
||||
response_ix.use_case_name = getattr(use_case_request, "use_case_name", None)
|
||||
|
||||
return response_ix
|
||||
|
||||
def _resolve_fetch_config(self) -> FetchConfig:
|
||||
if self._fetch_config is not None:
|
||||
return self._fetch_config
|
||||
# Fallback defaults — used only when the caller didn't inject one.
|
||||
# Real values land via ix.config in Chunk 3.
|
||||
return FetchConfig(
|
||||
connect_timeout_s=10.0,
|
||||
read_timeout_s=30.0,
|
||||
max_bytes=50 * 1024 * 1024,
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["SetupStep"]
|
||||
260
tests/unit/test_setup_step.py
Normal file
260
tests/unit/test_setup_step.py
Normal file
|
|
@ -0,0 +1,260 @@
|
|||
"""Tests for :class:`ix.pipeline.setup_step.SetupStep` (spec §6.1)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from ix.contracts import (
|
||||
Context,
|
||||
FileRef,
|
||||
OCROptions,
|
||||
Options,
|
||||
ProvenanceOptions,
|
||||
RequestIX,
|
||||
ResponseIX,
|
||||
)
|
||||
from ix.contracts.response import _InternalContext
|
||||
from ix.errors import IXErrorCode, IXException
|
||||
from ix.ingestion import FetchConfig
|
||||
from ix.pipeline.setup_step import SetupStep
|
||||
from ix.segmentation import PageMetadata
|
||||
|
||||
|
||||
class FakeFetcher:
|
||||
"""Captures FileRef + tmp_dir + cfg; returns a pre-set path per URL."""
|
||||
|
||||
def __init__(self, routes: dict[str, Path]) -> None:
|
||||
self.routes = routes
|
||||
self.calls: list[tuple[FileRef, Path, FetchConfig]] = []
|
||||
|
||||
async def __call__(
|
||||
self, file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig
|
||||
) -> Path:
|
||||
self.calls.append((file_ref, tmp_dir, cfg))
|
||||
if file_ref.url not in self.routes:
|
||||
raise IXException(IXErrorCode.IX_000_007, detail=file_ref.url)
|
||||
return self.routes[file_ref.url]
|
||||
|
||||
|
||||
class FakeIngestor:
|
||||
"""Returns canned pages + metas; records build_pages arguments."""
|
||||
|
||||
def __init__(self, pages_by_file: list[list]) -> None:
|
||||
# Each entry corresponds to one file in the input.
|
||||
self.pages_by_file = pages_by_file
|
||||
self.build_calls: list[tuple[list, list[str]]] = []
|
||||
|
||||
def build_pages(
|
||||
self,
|
||||
files: list[tuple[Path, str]],
|
||||
texts: list[str],
|
||||
) -> tuple[list, list[PageMetadata]]:
|
||||
self.build_calls.append((files, texts))
|
||||
|
||||
# Flat out pages keyed by file_index.
|
||||
from ix.contracts import Page
|
||||
|
||||
pages: list = []
|
||||
metas: list[PageMetadata] = []
|
||||
for file_index, _ in enumerate(files):
|
||||
canned = self.pages_by_file[file_index]
|
||||
for w, h in canned:
|
||||
pages.append(
|
||||
Page(page_no=len(pages) + 1, width=w, height=h, lines=[])
|
||||
)
|
||||
metas.append(PageMetadata(file_index=file_index))
|
||||
for _ in texts:
|
||||
pages.append(Page(page_no=len(pages) + 1, width=0.0, height=0.0, lines=[]))
|
||||
metas.append(PageMetadata(file_index=None))
|
||||
return pages, metas
|
||||
|
||||
|
||||
class _AlwaysMimePdf:
|
||||
"""detect_mime replacement that always returns application/pdf."""
|
||||
|
||||
def __call__(self, path: Path) -> str:
|
||||
return "application/pdf"
|
||||
|
||||
|
||||
def _make_response() -> ResponseIX:
|
||||
return ResponseIX()
|
||||
|
||||
|
||||
def _make_cfg() -> FetchConfig:
|
||||
return FetchConfig(connect_timeout_s=1.0, read_timeout_s=2.0, max_bytes=10_000)
|
||||
|
||||
|
||||
def _make_request(
|
||||
files: list[str | FileRef] | None = None,
|
||||
texts: list[str] | None = None,
|
||||
use_case: str = "bank_statement_header",
|
||||
) -> RequestIX:
|
||||
return RequestIX(
|
||||
use_case=use_case,
|
||||
ix_client_id="test",
|
||||
request_id="r-1",
|
||||
context=Context(files=files or [], texts=texts or []),
|
||||
options=Options(
|
||||
ocr=OCROptions(use_ocr=True),
|
||||
provenance=ProvenanceOptions(include_provenance=True),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class TestValidate:
|
||||
async def test_empty_context_raises_IX_000_002(self, tmp_path: Path) -> None:
|
||||
step = SetupStep(
|
||||
fetcher=FakeFetcher({}),
|
||||
ingestor=FakeIngestor([]),
|
||||
tmp_dir=tmp_path,
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(files=[], texts=[])
|
||||
with pytest.raises(IXException) as ei:
|
||||
await step.validate(req, _make_response())
|
||||
assert ei.value.code is IXErrorCode.IX_000_002
|
||||
|
||||
|
||||
class TestProcessHappyPath:
|
||||
async def test_files_downloaded_mime_checked_use_case_loaded(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
routes = {"http://host/a.pdf": tmp_path / "a.pdf"}
|
||||
for p in routes.values():
|
||||
p.write_bytes(b"%PDF-1.4")
|
||||
fetcher = FakeFetcher(routes)
|
||||
ingestor = FakeIngestor([[(200.0, 300.0), (200.0, 300.0)]])
|
||||
step = SetupStep(
|
||||
fetcher=fetcher,
|
||||
ingestor=ingestor,
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(files=["http://host/a.pdf"])
|
||||
resp = _make_response()
|
||||
assert await step.validate(req, resp) is True
|
||||
resp = await step.process(req, resp)
|
||||
|
||||
# Fetcher invoked once with the URL wrapped in a FileRef.
|
||||
assert len(fetcher.calls) == 1
|
||||
assert fetcher.calls[0][0].url == "http://host/a.pdf"
|
||||
|
||||
# Ingestor received [(local_path, mime)] + empty texts.
|
||||
assert len(ingestor.build_calls) == 1
|
||||
files, texts = ingestor.build_calls[0]
|
||||
assert files == [(routes["http://host/a.pdf"], "application/pdf")]
|
||||
assert texts == []
|
||||
|
||||
# Context populated.
|
||||
ctx = resp.context
|
||||
assert ctx is not None
|
||||
assert len(getattr(ctx, "pages", [])) == 2
|
||||
assert len(getattr(ctx, "page_metadata", [])) == 2
|
||||
assert getattr(ctx, "texts", None) == []
|
||||
assert getattr(ctx, "files", None) is not None
|
||||
|
||||
# Use case echoed.
|
||||
assert resp.use_case_name == "Bank Statement Header"
|
||||
|
||||
async def test_fileref_headers_pass_through(self, tmp_path: Path) -> None:
|
||||
routes = {"http://host/with-auth.pdf": tmp_path / "f.pdf"}
|
||||
for p in routes.values():
|
||||
p.write_bytes(b"%PDF-1.4")
|
||||
fetcher = FakeFetcher(routes)
|
||||
ingestor = FakeIngestor([[(10.0, 10.0)]])
|
||||
step = SetupStep(
|
||||
fetcher=fetcher,
|
||||
ingestor=ingestor,
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(
|
||||
files=[FileRef(url="http://host/with-auth.pdf", headers={"Authorization": "Token z"})],
|
||||
)
|
||||
await step.process(req, _make_response())
|
||||
fr = fetcher.calls[0][0]
|
||||
assert fr.headers == {"Authorization": "Token z"}
|
||||
|
||||
|
||||
class TestProcessErrors:
|
||||
async def test_unsupported_mime_raises_IX_000_005(self, tmp_path: Path) -> None:
|
||||
routes = {"http://host/a.txt": tmp_path / "a.txt"}
|
||||
routes["http://host/a.txt"].write_bytes(b"hello")
|
||||
fetcher = FakeFetcher(routes)
|
||||
ingestor = FakeIngestor([[(10.0, 10.0)]])
|
||||
|
||||
class _TextMime:
|
||||
def __call__(self, path: Path) -> str:
|
||||
return "text/plain"
|
||||
|
||||
step = SetupStep(
|
||||
fetcher=fetcher,
|
||||
ingestor=ingestor,
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_TextMime(),
|
||||
)
|
||||
req = _make_request(files=["http://host/a.txt"])
|
||||
with pytest.raises(IXException) as ei:
|
||||
await step.process(req, _make_response())
|
||||
assert ei.value.code is IXErrorCode.IX_000_005
|
||||
|
||||
async def test_unknown_use_case_raises_IX_001_001(self, tmp_path: Path) -> None:
|
||||
step = SetupStep(
|
||||
fetcher=FakeFetcher({}),
|
||||
ingestor=FakeIngestor([]),
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(files=[], texts=["hello"], use_case="nope")
|
||||
# Validate passes (we have context). Process should raise IX_001_001.
|
||||
with pytest.raises(IXException) as ei:
|
||||
await step.process(req, _make_response())
|
||||
assert ei.value.code is IXErrorCode.IX_001_001
|
||||
|
||||
|
||||
class TestTextOnly:
|
||||
async def test_texts_only_loads_use_case_and_builds_text_pages(
|
||||
self, tmp_path: Path
|
||||
) -> None:
|
||||
fetcher = FakeFetcher({})
|
||||
ingestor = FakeIngestor([])
|
||||
step = SetupStep(
|
||||
fetcher=fetcher,
|
||||
ingestor=ingestor,
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(files=[], texts=["hello", "there"])
|
||||
resp = _make_response()
|
||||
assert await step.validate(req, resp) is True
|
||||
resp = await step.process(req, resp)
|
||||
|
||||
assert fetcher.calls == []
|
||||
assert ingestor.build_calls[0][1] == ["hello", "there"]
|
||||
ctx = resp.context
|
||||
assert ctx is not None
|
||||
assert ctx.texts == ["hello", "there"]
|
||||
|
||||
|
||||
class TestInternalContextShape:
|
||||
async def test_context_is_internal_context_instance(self, tmp_path: Path) -> None:
|
||||
fetcher = FakeFetcher({})
|
||||
ingestor = FakeIngestor([])
|
||||
step = SetupStep(
|
||||
fetcher=fetcher,
|
||||
ingestor=ingestor,
|
||||
tmp_dir=tmp_path / "work",
|
||||
fetch_config=_make_cfg(),
|
||||
mime_detector=_AlwaysMimePdf(),
|
||||
)
|
||||
req = _make_request(files=[], texts=["hello"])
|
||||
resp = await step.process(req, _make_response())
|
||||
assert isinstance(resp.context, _InternalContext)
|
||||
Loading…
Reference in a new issue