feat(ingestion): fetch_file + MIME sniff + DocumentIngestor (spec §6.1)
All checks were successful
tests / test (push) Successful in 57s
tests / test (pull_request) Successful in 1m12s

Three layered modules the SetupStep will wire together in Task 2.4.

- fetch.py: async httpx fetch with configurable timeouts + incremental
  size cap (stream=True, accumulate bytes, raise IX_000_007 when
  exceeded). file:// URLs read locally. Auth headers pass through. The
  caller injects a FetchConfig — env reads happen in ix.config (Chunk 3).
- mime.py: python-magic byte-sniff + SUPPORTED_MIMES frozenset +
  require_supported(mime) helper that raises IX_000_005.
- pages.py: DocumentIngestor.build_pages(files, texts) ->
  (list[Page], list[PageMetadata]). PDFs via PyMuPDF (hard 100 pg/PDF
  cap -> IX_000_006), images via Pillow (multi-frame TIFFs yield
  multiple Pages), texts as zero-dim Pages so GenAIStep can still cite
  them.

21 new unit tests (141 total) cover: fetch success with headers, 4xx/5xx
mapping, timeout -> IX_000_007, size cap enforced globally + per-file,
file:// happy path + missing file, MIME detection for PDF/PNG/JPEG/TIFF,
require_supported gate, PDF/TIFF/text page counts, 101-page PDF ->
IX_000_006, multi-file file_index assignment.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dirk Riemann 2026-04-18 11:12:00 +02:00
parent 2709fb8d6b
commit 290e51416f
7 changed files with 676 additions and 0 deletions

View file

@ -0,0 +1,27 @@
"""Ingestion pipeline helpers: fetch → MIME-detect → build pages.
Three modules layered bottom-up:
* :mod:`ix.ingestion.fetch` async HTTP(S) / ``file://`` downloader with
incremental size caps and pluggable timeouts.
* :mod:`ix.ingestion.mime` byte-sniffing MIME detection + the
MVP-supported MIME set.
* :mod:`ix.ingestion.pages` :class:`DocumentIngestor` that turns local
files + raw texts into the flat :class:`~ix.contracts.Page` list the
OCR step expects.
"""
from __future__ import annotations
from ix.ingestion.fetch import FetchConfig, fetch_file
from ix.ingestion.mime import SUPPORTED_MIMES, detect_mime, require_supported
from ix.ingestion.pages import DocumentIngestor
__all__ = [
"SUPPORTED_MIMES",
"DocumentIngestor",
"FetchConfig",
"detect_mime",
"fetch_file",
"require_supported",
]

144
src/ix/ingestion/fetch.py Normal file
View file

@ -0,0 +1,144 @@
"""Async file fetcher (spec §6.1).
Supports ``http(s)://`` URLs (via httpx with configurable connect/read
timeouts and an incremental size cap) and ``file://`` URLs (read from
local fs used by the E2E fixture). Auth headers on the :class:`FileRef`
pass through unchanged.
Every failure mode surfaces as :attr:`~ix.errors.IXErrorCode.IX_000_007`
with the offending URL + cause in the ``detail`` slot so the caller log
line is grep-friendly.
Env-driven defaults live in :mod:`ix.config` (Chunk 3). The caller injects
a :class:`FetchConfig` this module is purely mechanical.
"""
from __future__ import annotations
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
import httpx
from ix.contracts import FileRef
from ix.errors import IXErrorCode, IXException
@dataclass(slots=True)
class FetchConfig:
"""Per-fetch knobs injected by the caller.
``connect_timeout_s`` / ``read_timeout_s`` httpx timeouts.
``max_bytes`` is the pipeline-wide default cap; the per-file override
on :attr:`~ix.contracts.FileRef.max_bytes` wins when lower.
"""
connect_timeout_s: float
read_timeout_s: float
max_bytes: int
def _effective_cap(file_ref: FileRef, cfg: FetchConfig) -> int:
"""The smaller of the pipeline-wide cap and the per-file override."""
if file_ref.max_bytes is None:
return cfg.max_bytes
return min(cfg.max_bytes, file_ref.max_bytes)
def _safe_filename(url: str) -> str:
"""Derive a readable filename for the scratch copy from the URL."""
parsed = urllib.parse.urlparse(url)
candidate = Path(parsed.path).name or "download"
# Strip anything that would escape the tmp dir.
return candidate.replace("/", "_").replace("\\", "_")
async def _fetch_http(file_ref: FileRef, dst: Path, cfg: FetchConfig) -> None:
"""HTTP(S) download with incremental size cap. Raises IX_000_007 on any failure."""
cap = _effective_cap(file_ref, cfg)
timeout = httpx.Timeout(
cfg.read_timeout_s,
connect=cfg.connect_timeout_s,
)
try:
async with (
httpx.AsyncClient(timeout=timeout) as client,
client.stream(
"GET",
file_ref.url,
headers=file_ref.headers or None,
) as response,
):
if response.status_code >= 300:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: HTTP {response.status_code}",
)
total = 0
with dst.open("wb") as fh:
async for chunk in response.aiter_bytes():
total += len(chunk)
if total > cap:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: size cap {cap} bytes exceeded",
)
fh.write(chunk)
except IXException:
raise
except httpx.TimeoutException as exc:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: timeout ({exc.__class__.__name__})",
) from exc
except httpx.HTTPError as exc:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: {exc.__class__.__name__}: {exc}",
) from exc
def _fetch_file_scheme(file_ref: FileRef, dst: Path, cfg: FetchConfig) -> None:
"""Local-path read via ``file://`` URL. Same failure-mode contract."""
cap = _effective_cap(file_ref, cfg)
src_path = Path(urllib.parse.urlparse(file_ref.url).path)
if not src_path.exists():
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: file does not exist",
)
size = src_path.stat().st_size
if size > cap:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: size cap {cap} bytes exceeded",
)
dst.write_bytes(src_path.read_bytes())
async def fetch_file(file_ref: FileRef, tmp_dir: Path, cfg: FetchConfig) -> Path:
"""Download / copy ``file_ref`` into ``tmp_dir`` and return the local path.
http(s) and file:// URLs both supported. Any fetch failure raises
:class:`~ix.errors.IXException` with
:attr:`~ix.errors.IXErrorCode.IX_000_007`.
"""
tmp_dir.mkdir(parents=True, exist_ok=True)
scheme = urllib.parse.urlparse(file_ref.url).scheme.lower()
dst = tmp_dir / _safe_filename(file_ref.url)
if scheme in ("http", "https"):
await _fetch_http(file_ref, dst, cfg)
elif scheme == "file":
_fetch_file_scheme(file_ref, dst, cfg)
else:
raise IXException(
IXErrorCode.IX_000_007,
detail=f"{file_ref.url}: unsupported URL scheme {scheme!r}",
)
return dst
__all__ = ["FetchConfig", "fetch_file"]

37
src/ix/ingestion/mime.py Normal file
View file

@ -0,0 +1,37 @@
"""MIME detection + supported-MIME gate (spec §6.1).
Bytes-only; URL extensions are ignored because callers (Paperless, )
may serve `/download` routes without a file suffix. ``python-magic``
reads the file header and returns the canonical MIME.
"""
from __future__ import annotations
from pathlib import Path
import magic
from ix.errors import IXErrorCode, IXException
SUPPORTED_MIMES: frozenset[str] = frozenset(
{
"application/pdf",
"image/png",
"image/jpeg",
"image/tiff",
}
)
def detect_mime(path: Path) -> str:
"""Return the canonical MIME string for ``path`` (byte-sniffed)."""
return magic.from_file(str(path), mime=True)
def require_supported(mime: str) -> None:
"""Raise :class:`~ix.errors.IXException` (``IX_000_005``) if ``mime`` is unsupported."""
if mime not in SUPPORTED_MIMES:
raise IXException(IXErrorCode.IX_000_005, detail=mime)
__all__ = ["SUPPORTED_MIMES", "detect_mime", "require_supported"]

118
src/ix/ingestion/pages.py Normal file
View file

@ -0,0 +1,118 @@
"""Turn downloaded files + raw texts into a flat :class:`Page` list (spec §6.1).
PDFs one :class:`Page` per page via PyMuPDF, with a hard 100 pages/PDF
cap (``IX_000_006``). Images Pillow; multi-frame TIFFs yield one Page
per frame. Texts one zero-dimension Page each so the downstream OCR /
GenAI steps can still cite them.
A parallel list of :class:`~ix.segmentation.PageMetadata` is returned so
the pipeline (via :class:`SegmentIndex`) can resolve segment IDs back to
``file_index`` anchors.
"""
from __future__ import annotations
from pathlib import Path
import fitz # PyMuPDF
from PIL import Image, ImageSequence
from ix.contracts import Page
from ix.errors import IXErrorCode, IXException
from ix.segmentation import PageMetadata
_PDF_PAGE_CAP = 100
class DocumentIngestor:
"""Builds the flat Page list that feeds :class:`~ix.pipeline.ocr_step.OCRStep`.
No constructor args for MVP the 100-page cap is a spec constant. If
this needs to be tunable later, move it to a dataclass config.
"""
def build_pages(
self,
files: list[tuple[Path, str]],
texts: list[str],
) -> tuple[list[Page], list[PageMetadata]]:
"""Return ``(pages, metas)`` in insertion order.
``files`` is a list of ``(local_path, mime_type)`` tuples; mimes
have already been validated by :func:`ix.ingestion.mime.require_supported`.
"""
pages: list[Page] = []
metas: list[PageMetadata] = []
for file_index, (path, mime) in enumerate(files):
if mime == "application/pdf":
self._extend_with_pdf(path, file_index, pages, metas)
elif mime in ("image/png", "image/jpeg", "image/tiff"):
self._extend_with_image(path, file_index, pages, metas)
else: # pragma: no cover - defensive; require_supported should gate upstream
raise IXException(IXErrorCode.IX_000_005, detail=mime)
for _ in texts:
# Text-backed pages are zero-dim; they exist so the GenAIStep
# can merge their content into the prompt alongside OCR.
pages.append(
Page(
page_no=len(pages) + 1,
width=0.0,
height=0.0,
lines=[],
)
)
metas.append(PageMetadata(file_index=None))
return pages, metas
def _extend_with_pdf(
self,
path: Path,
file_index: int,
pages: list[Page],
metas: list[PageMetadata],
) -> None:
doc = fitz.open(str(path))
try:
if doc.page_count > _PDF_PAGE_CAP:
raise IXException(
IXErrorCode.IX_000_006,
detail=f"{path}: {doc.page_count} pages (cap {_PDF_PAGE_CAP})",
)
for page in doc:
rect = page.rect
pages.append(
Page(
page_no=len(pages) + 1,
width=float(rect.width),
height=float(rect.height),
lines=[],
)
)
metas.append(PageMetadata(file_index=file_index))
finally:
doc.close()
def _extend_with_image(
self,
path: Path,
file_index: int,
pages: list[Page],
metas: list[PageMetadata],
) -> None:
with Image.open(path) as img:
for frame in ImageSequence.Iterator(img):
pages.append(
Page(
page_no=len(pages) + 1,
width=float(frame.width),
height=float(frame.height),
lines=[],
)
)
metas.append(PageMetadata(file_index=file_index))
__all__ = ["DocumentIngestor"]

View file

@ -0,0 +1,138 @@
"""Tests for :func:`ix.ingestion.fetch.fetch_file` (spec §6.1)."""
from __future__ import annotations
from pathlib import Path
import httpx
import pytest
from pytest_httpx import HTTPXMock
from ix.contracts import FileRef
from ix.errors import IXErrorCode, IXException
from ix.ingestion import FetchConfig, fetch_file
@pytest.fixture
def cfg() -> FetchConfig:
return FetchConfig(
connect_timeout_s=1.0,
read_timeout_s=2.0,
max_bytes=1024 * 1024,
)
class TestSuccessPath:
async def test_downloads_with_auth_header_and_writes_to_tmp(
self, tmp_path: Path, cfg: FetchConfig, httpx_mock: HTTPXMock
) -> None:
url = "https://paperless.local/doc/123/download"
httpx_mock.add_response(
url=url,
method="GET",
status_code=200,
content=b"%PDF-1.4 body",
headers={"content-type": "application/pdf"},
)
file_ref = FileRef(
url=url,
headers={"Authorization": "Token abc"},
)
path = await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert path.exists()
assert path.read_bytes() == b"%PDF-1.4 body"
# Confirm header went out.
reqs = httpx_mock.get_requests()
assert len(reqs) == 1
assert reqs[0].headers["Authorization"] == "Token abc"
class TestNon2xx:
async def test_404_raises_IX_000_007(
self, tmp_path: Path, cfg: FetchConfig, httpx_mock: HTTPXMock
) -> None:
url = "https://host.local/missing.pdf"
httpx_mock.add_response(url=url, status_code=404, content=b"")
file_ref = FileRef(url=url)
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007
assert "404" in (ei.value.detail or "")
async def test_500_raises_IX_000_007(
self, tmp_path: Path, cfg: FetchConfig, httpx_mock: HTTPXMock
) -> None:
url = "https://host.local/boom.pdf"
httpx_mock.add_response(url=url, status_code=500, content=b"oops")
file_ref = FileRef(url=url)
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007
class TestTimeout:
async def test_timeout_raises_IX_000_007(
self, tmp_path: Path, cfg: FetchConfig, httpx_mock: HTTPXMock
) -> None:
url = "https://host.local/slow.pdf"
httpx_mock.add_exception(httpx.ReadTimeout("slow"), url=url)
file_ref = FileRef(url=url)
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007
class TestOversize:
async def test_oversize_raises_IX_000_007(
self, tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
url = "https://host.local/big.pdf"
cfg = FetchConfig(
connect_timeout_s=1.0,
read_timeout_s=2.0,
max_bytes=100,
)
# 500 bytes of payload; cap is 100.
httpx_mock.add_response(url=url, status_code=200, content=b"x" * 500)
file_ref = FileRef(url=url)
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007
async def test_per_file_max_bytes_override(
self, tmp_path: Path, httpx_mock: HTTPXMock
) -> None:
url = "https://host.local/mid.pdf"
cfg = FetchConfig(
connect_timeout_s=1.0,
read_timeout_s=2.0,
max_bytes=1_000_000,
)
# file_ref sets a tighter cap.
httpx_mock.add_response(url=url, status_code=200, content=b"x" * 500)
file_ref = FileRef(url=url, max_bytes=100)
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007
class TestFileUrl:
async def test_file_scheme_reads_local(
self, tmp_path: Path, cfg: FetchConfig
) -> None:
src = tmp_path / "in.pdf"
src.write_bytes(b"%PDF-1.4\nfile scheme content")
file_ref = FileRef(url=src.as_uri())
dst = await fetch_file(file_ref, tmp_dir=tmp_path / "out", cfg=cfg)
assert dst.exists()
assert dst.read_bytes() == b"%PDF-1.4\nfile scheme content"
async def test_file_scheme_missing_raises(
self, tmp_path: Path, cfg: FetchConfig
) -> None:
missing = tmp_path / "nope.pdf"
file_ref = FileRef(url=missing.as_uri())
with pytest.raises(IXException) as ei:
await fetch_file(file_ref, tmp_dir=tmp_path, cfg=cfg)
assert ei.value.code is IXErrorCode.IX_000_007

View file

@ -0,0 +1,96 @@
"""Tests for MIME sniffing (spec §6.1)."""
from __future__ import annotations
from pathlib import Path
import pytest
from ix.errors import IXErrorCode, IXException
from ix.ingestion import SUPPORTED_MIMES, detect_mime, require_supported
# Real-header fixtures. python-magic looks at bytes, not extensions, so
# these are the smallest valid-byte samples we can produce on the fly.
_PDF_BYTES = b"%PDF-1.4\n%\xe2\xe3\xcf\xd3\n1 0 obj\n<<>>\nendobj\nxref\n0 1\n0000000000 65535 f\ntrailer<<>>\nstartxref\n0\n%%EOF\n"
_PNG_BYTES = bytes.fromhex(
# PNG magic + minimal IHDR + IDAT + IEND (1x1 all-black).
"89504e470d0a1a0a"
"0000000d49484452"
"00000001000000010806000000"
"1f15c4890000000d"
"49444154789c6300010000000500010d0a2db400000000"
"49454e44ae426082"
)
_JPEG_BYTES = (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00"
b"\xff\xdb\x00C\x00" + b"\x08" * 64
+ b"\xff\xc0\x00\x0b\x08\x00\x01\x00\x01\x01\x01\x11\x00"
+ b"\xff\xc4\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01\x01\x00\x00"
+ b"\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b"
+ b"\xff\xda\x00\x08\x01\x01\x00\x00?\x00\xfb\xff\xd9"
)
def _make_tiff_bytes() -> bytes:
# Tiny valid TIFF via PIL.
from io import BytesIO
from PIL import Image
buf = BytesIO()
Image.new("L", (2, 2), color=0).save(buf, format="TIFF")
return buf.getvalue()
_TIFF_BYTES = _make_tiff_bytes()
@pytest.fixture
def fixtures_dir(tmp_path: Path) -> Path:
d = tmp_path / "fixtures"
d.mkdir()
(d / "sample.pdf").write_bytes(_PDF_BYTES)
(d / "sample.png").write_bytes(_PNG_BYTES)
(d / "sample.jpg").write_bytes(_JPEG_BYTES)
(d / "sample.tif").write_bytes(_TIFF_BYTES)
(d / "sample.txt").write_bytes(b"this is plain text, no magic bytes\n")
return d
class TestDetectMime:
def test_pdf(self, fixtures_dir: Path) -> None:
assert detect_mime(fixtures_dir / "sample.pdf") == "application/pdf"
def test_png(self, fixtures_dir: Path) -> None:
assert detect_mime(fixtures_dir / "sample.png") == "image/png"
def test_jpeg(self, fixtures_dir: Path) -> None:
assert detect_mime(fixtures_dir / "sample.jpg") == "image/jpeg"
def test_tiff(self, fixtures_dir: Path) -> None:
assert detect_mime(fixtures_dir / "sample.tif") == "image/tiff"
class TestSupportedSet:
def test_supported_mimes_contents(self) -> None:
assert {
"application/pdf",
"image/png",
"image/jpeg",
"image/tiff",
} == set(SUPPORTED_MIMES)
class TestRequireSupported:
def test_allows_supported(self) -> None:
for m in SUPPORTED_MIMES:
require_supported(m) # no raise
def test_rejects_unsupported(self) -> None:
with pytest.raises(IXException) as ei:
require_supported("text/plain")
assert ei.value.code is IXErrorCode.IX_000_005
assert "text/plain" in (ei.value.detail or "")

View file

@ -0,0 +1,116 @@
"""Tests for DocumentIngestor.build_pages (spec §6.1)."""
from __future__ import annotations
from io import BytesIO
from pathlib import Path
import pytest
from ix.errors import IXErrorCode, IXException
from ix.ingestion import DocumentIngestor
def _make_pdf_bytes(n_pages: int) -> bytes:
import fitz
doc = fitz.open()
for i in range(n_pages):
page = doc.new_page(width=200, height=300)
page.insert_text((10, 20), f"page {i+1}")
out = doc.tobytes()
doc.close()
return out
def _make_multi_frame_tiff_bytes(n_frames: int) -> bytes:
from PIL import Image
frames = [Image.new("L", (10, 10), color=i * 30) for i in range(n_frames)]
buf = BytesIO()
frames[0].save(buf, format="TIFF", save_all=True, append_images=frames[1:])
return buf.getvalue()
class TestPdf:
def test_three_page_pdf_yields_three_pages(self, tmp_path: Path) -> None:
p = tmp_path / "doc.pdf"
p.write_bytes(_make_pdf_bytes(3))
ing = DocumentIngestor()
pages, metas = ing.build_pages(files=[(p, "application/pdf")], texts=[])
assert len(pages) == 3
for i, page in enumerate(pages, start=1):
assert page.page_no == i
assert page.width > 0
assert page.height > 0
assert len(metas) == 3
for m in metas:
assert m.file_index == 0
def test_page_count_cap_raises_IX_000_006(self, tmp_path: Path) -> None:
p = tmp_path / "toomany.pdf"
p.write_bytes(_make_pdf_bytes(101))
ing = DocumentIngestor()
with pytest.raises(IXException) as ei:
ing.build_pages(files=[(p, "application/pdf")], texts=[])
assert ei.value.code is IXErrorCode.IX_000_006
class TestImages:
def test_single_frame_png(self, tmp_path: Path) -> None:
from PIL import Image
p = tmp_path / "img.png"
Image.new("RGB", (50, 80), color="white").save(p, format="PNG")
ing = DocumentIngestor()
pages, metas = ing.build_pages(files=[(p, "image/png")], texts=[])
assert len(pages) == 1
assert pages[0].width == 50
assert pages[0].height == 80
assert metas[0].file_index == 0
def test_multi_frame_tiff_yields_multiple_pages(self, tmp_path: Path) -> None:
p = tmp_path / "multi.tif"
p.write_bytes(_make_multi_frame_tiff_bytes(2))
ing = DocumentIngestor()
pages, metas = ing.build_pages(files=[(p, "image/tiff")], texts=[])
assert len(pages) == 2
for page in pages:
assert page.width == 10
assert page.height == 10
# Both frames share the same file_index.
assert {m.file_index for m in metas} == {0}
class TestTexts:
def test_texts_become_pages(self) -> None:
ing = DocumentIngestor()
pages, metas = ing.build_pages(files=[], texts=["hello", "world"])
assert len(pages) == 2
assert pages[0].page_no == 1
assert pages[1].page_no == 2
# Text-backed pages have no file_index source.
assert metas[0].file_index is None
assert metas[1].file_index is None
class TestFileIndexes:
def test_multi_file_indexes_are_contiguous(self, tmp_path: Path) -> None:
p1 = tmp_path / "a.pdf"
p1.write_bytes(_make_pdf_bytes(2))
p2 = tmp_path / "b.pdf"
p2.write_bytes(_make_pdf_bytes(1))
ing = DocumentIngestor()
pages, metas = ing.build_pages(
files=[(p1, "application/pdf"), (p2, "application/pdf")],
texts=[],
)
assert len(pages) == 3
# First two pages from file 0, last from file 1.
assert metas[0].file_index == 0
assert metas[1].file_index == 0
assert metas[2].file_index == 1