From dcd1bc764ad25b29801bc8073dfa2d049c0e15a7 Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 11:06:46 +0200 Subject: [PATCH] =?UTF-8?q?feat(pipeline):=20Step=20ABC=20+=20Pipeline=20r?= =?UTF-8?q?unner=20+=20Timer=20(spec=20=C2=A73,=20=C2=A74)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds the transport-agnostic pipeline orchestrator. Each step implements async validate + process; the runner wraps both in a Timer, writes per-step entries to response.metadata.timings, and aborts on the first IXException by writing response.error. - Step exposes a step_name property (defaults to class name) so tests and logs label steps consistently. - Timer is a plain context manager that appends one {step, elapsed_seconds} entry on exit regardless of whether the body raised, so the timeline stays reconstructable for failed steps. - 9 unit tests cover ordering, skip-on-false, IXException in validate vs. process, timings populated for every executed step, and shared-response mutation across steps. Non-IX exceptions propagate. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ix/pipeline/__init__.py | 18 ++++ src/ix/pipeline/pipeline.py | 108 +++++++++++++++++++ src/ix/pipeline/step.py | 58 ++++++++++ tests/unit/test_pipeline.py | 208 ++++++++++++++++++++++++++++++++++++ 4 files changed, 392 insertions(+) create mode 100644 src/ix/pipeline/__init__.py create mode 100644 src/ix/pipeline/pipeline.py create mode 100644 src/ix/pipeline/step.py create mode 100644 tests/unit/test_pipeline.py diff --git a/src/ix/pipeline/__init__.py b/src/ix/pipeline/__init__.py new file mode 100644 index 0000000..fb5f4e6 --- /dev/null +++ b/src/ix/pipeline/__init__.py @@ -0,0 +1,18 @@ +"""Pipeline orchestration: Step ABC, Pipeline runner, per-step Timer. + +Concrete steps (Setup / OCR / GenAI / Reliability / ResponseHandler) live in +sibling modules and are wired into a :class:`Pipeline` by the application +factory. The pipeline core is transport-agnostic: it takes a +:class:`~ix.contracts.RequestIX`, threads a shared +:class:`~ix.contracts.ResponseIX` through every step, and returns the +populated response. Timings land in ``response.metadata.timings``; the first +:class:`~ix.errors.IXException` raised aborts the pipeline and is written to +``response.error``. +""" + +from __future__ import annotations + +from ix.pipeline.pipeline import Pipeline, Timer +from ix.pipeline.step import Step + +__all__ = ["Pipeline", "Step", "Timer"] diff --git a/src/ix/pipeline/pipeline.py b/src/ix/pipeline/pipeline.py new file mode 100644 index 0000000..1fcf3b4 --- /dev/null +++ b/src/ix/pipeline/pipeline.py @@ -0,0 +1,108 @@ +"""Pipeline runner + Timer context manager (spec §4). + +The runner threads a fresh :class:`~ix.contracts.ResponseIX` through every +registered :class:`Step`, records per-step elapsed seconds in +``response.metadata.timings`` (always — even for validated-out-or-raised +steps, so the timeline is reconstructable from logs), and aborts on the +first :class:`~ix.errors.IXException` by writing ``response.error`` and +stopping the loop. Non-IX exceptions propagate — the job-store layer decides +whether to swallow or surface them. +""" + +from __future__ import annotations + +import time +from types import TracebackType +from typing import Any + +from ix.contracts import Metadata, RequestIX, ResponseIX +from ix.errors import IXException +from ix.pipeline.step import Step + + +class Timer: + """Context manager that appends one timing entry to a list. + + Example:: + + timings: list[dict[str, Any]] = [] + with Timer("setup", timings): + ... # work + # timings == [{"step": "setup", "elapsed_seconds": 0.003}] + + The entry is appended on ``__exit__`` regardless of whether the body + raised — the timeline stays accurate even for failed steps. + """ + + def __init__(self, step_name: str, sink: list[dict[str, Any]]) -> None: + self._step_name = step_name + self._sink = sink + self._start: float = 0.0 + + def __enter__(self) -> Timer: + self._start = time.perf_counter() + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + elapsed = time.perf_counter() - self._start + self._sink.append({"step": self._step_name, "elapsed_seconds": elapsed}) + + +class Pipeline: + """Runs a fixed ordered list of :class:`Step` instances against one request. + + The pipeline is stateless — constructing once at app-startup and calling + :meth:`start` repeatedly is the intended usage pattern. Per-request state + lives on the :class:`~ix.contracts.ResponseIX` the pipeline creates and + threads through every step. + """ + + def __init__(self, steps: list[Step]) -> None: + self._steps = list(steps) + + async def start(self, request_ix: RequestIX) -> ResponseIX: + """Execute every step; return the populated :class:`ResponseIX`. + + Flow: + + 1. Instantiate a fresh ``ResponseIX`` seeded with request correlation + ids. + 2. For each step: time the call, run ``validate`` then (iff True) + ``process``. Append the timing entry. If either hook raises + :class:`~ix.errors.IXException`, write ``response.error`` and + stop. Non-IX exceptions propagate. + """ + response_ix = ResponseIX( + use_case=request_ix.use_case, + ix_client_id=request_ix.ix_client_id, + request_id=request_ix.request_id, + ix_id=request_ix.ix_id, + metadata=Metadata(), + ) + + for step in self._steps: + with Timer(step.step_name, response_ix.metadata.timings): + try: + should_run = await step.validate(request_ix, response_ix) + except IXException as exc: + response_ix.error = str(exc) + return response_ix + + if not should_run: + continue + + try: + response_ix = await step.process(request_ix, response_ix) + except IXException as exc: + response_ix.error = str(exc) + return response_ix + + return response_ix + + +__all__ = ["Pipeline", "Timer"] diff --git a/src/ix/pipeline/step.py b/src/ix/pipeline/step.py new file mode 100644 index 0000000..45ed24c --- /dev/null +++ b/src/ix/pipeline/step.py @@ -0,0 +1,58 @@ +"""Step ABC — the pipeline-step contract (spec §3). + +Every pipeline step implements two async hooks: + +* :meth:`Step.validate` — returns ``True`` when the step should run for this + request, ``False`` when it should be silently skipped. May raise + :class:`~ix.errors.IXException` to abort the pipeline with an error code. +* :meth:`Step.process` — does the work; mutates the shared ``response_ix`` + and returns it. May raise :class:`~ix.errors.IXException` to abort. + +Both hooks are async so steps that need I/O (file download, OCR, LLM) can +cooperate with the asyncio event loop without sync-async conversion dances. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from ix.contracts import RequestIX, ResponseIX + + +class Step(ABC): + """Abstract base for every pipeline step. + + Subclasses override both hooks. The pipeline runner guarantees + ``validate`` is called before ``process`` for a given step, and that + ``process`` runs iff ``validate`` returned ``True``. + + The :attr:`step_name` property controls the label written to + ``metadata.timings``. Defaults to the class name so production steps + (``SetupStep``, ``OCRStep``, …) log under their own name; test doubles + override it with the value under test. + """ + + @property + def step_name(self) -> str: + """Label used in ``metadata.timings``. Default: class name.""" + return type(self).__name__ + + @abstractmethod + async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool: + """Return ``True`` to run :meth:`process`, ``False`` to skip silently. + + Raise :class:`~ix.errors.IXException` to abort the pipeline with an + error code on ``response_ix.error``. + """ + + @abstractmethod + async def process( + self, request_ix: RequestIX, response_ix: ResponseIX + ) -> ResponseIX: + """Run the step; return the (same, mutated) ``response_ix``. + + Raising :class:`~ix.errors.IXException` aborts the pipeline. + """ + + +__all__ = ["Step"] diff --git a/tests/unit/test_pipeline.py b/tests/unit/test_pipeline.py new file mode 100644 index 0000000..6767e5b --- /dev/null +++ b/tests/unit/test_pipeline.py @@ -0,0 +1,208 @@ +"""Tests for the Pipeline orchestrator + Step ABC + Timer (spec §3, §4).""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from ix.contracts import Context, RequestIX, ResponseIX +from ix.errors import IXErrorCode, IXException +from ix.pipeline import Pipeline, Step + + +def _make_request() -> RequestIX: + return RequestIX( + use_case="bank_statement_header", + ix_client_id="test", + request_id="r-1", + context=Context(files=[], texts=["hello"]), + ) + + +class StubStep(Step): + """Hand-written Step double. Records call order on a shared list.""" + + def __init__( + self, + name: str, + *, + calls: list[str] | None = None, + validate_returns: bool = True, + validate_raises: IXException | None = None, + process_raises: IXException | BaseException | None = None, + mutate: Any = None, + ) -> None: + self.name = name + self._calls = calls if calls is not None else [] + self._validate_returns = validate_returns + self._validate_raises = validate_raises + self._process_raises = process_raises + self._mutate = mutate + + @property + def step_name(self) -> str: # type: ignore[override] + return self.name + + async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool: + self._calls.append(f"{self.name}.validate") + if self._validate_raises is not None: + raise self._validate_raises + return self._validate_returns + + async def process( + self, request_ix: RequestIX, response_ix: ResponseIX + ) -> ResponseIX: + self._calls.append(f"{self.name}.process") + if self._process_raises is not None: + raise self._process_raises + if self._mutate is not None: + self._mutate(response_ix) + return response_ix + + +class TestOrdering: + async def test_steps_run_in_registered_order(self) -> None: + calls: list[str] = [] + pipeline = Pipeline( + steps=[ + StubStep("a", calls=calls), + StubStep("b", calls=calls), + StubStep("c", calls=calls), + ] + ) + await pipeline.start(_make_request()) + assert calls == [ + "a.validate", + "a.process", + "b.validate", + "b.process", + "c.validate", + "c.process", + ] + + +class TestSkipOnFalse: + async def test_validate_false_skips_process(self) -> None: + calls: list[str] = [] + pipeline = Pipeline( + steps=[ + StubStep("a", calls=calls, validate_returns=False), + StubStep("b", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + assert calls == ["a.validate", "b.validate", "b.process"] + assert response.error is None + + +class TestErrorFromValidate: + async def test_validate_raising_ix_sets_error_and_aborts(self) -> None: + calls: list[str] = [] + err = IXException(IXErrorCode.IX_000_002, detail="nothing") + pipeline = Pipeline( + steps=[ + StubStep("a", calls=calls, validate_raises=err), + StubStep("b", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + assert calls == ["a.validate"] + assert response.error is not None + assert response.error.startswith("IX_000_002") + assert "nothing" in response.error + + +class TestErrorFromProcess: + async def test_process_raising_ix_sets_error_and_aborts(self) -> None: + calls: list[str] = [] + err = IXException(IXErrorCode.IX_002_001, detail="bad parse") + pipeline = Pipeline( + steps=[ + StubStep("a", calls=calls, process_raises=err), + StubStep("b", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + assert calls == ["a.validate", "a.process"] + assert response.error is not None + assert response.error.startswith("IX_002_001") + assert "bad parse" in response.error + + +class TestTimings: + async def test_timings_populated_for_every_executed_step(self) -> None: + calls: list[str] = [] + pipeline = Pipeline( + steps=[ + StubStep("alpha", calls=calls), + StubStep("beta", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + # One timing per executed step. + names = [t["step"] for t in response.metadata.timings] + assert names == ["alpha", "beta"] + for entry in response.metadata.timings: + assert isinstance(entry["elapsed_seconds"], float) + assert entry["elapsed_seconds"] >= 0.0 + + async def test_timing_recorded_even_when_validate_false(self) -> None: + calls: list[str] = [] + pipeline = Pipeline( + steps=[ + StubStep("skipper", calls=calls, validate_returns=False), + StubStep("runner", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + names = [t["step"] for t in response.metadata.timings] + assert names == ["skipper", "runner"] + + async def test_timing_recorded_when_validate_raises(self) -> None: + calls: list[str] = [] + err = IXException(IXErrorCode.IX_001_001, detail="missing") + pipeline = Pipeline( + steps=[ + StubStep("boom", calls=calls, validate_raises=err), + StubStep("unreached", calls=calls), + ] + ) + response = await pipeline.start(_make_request()) + names = [t["step"] for t in response.metadata.timings] + assert names == ["boom"] + + +class TestNonIXExceptionStillSurfaces: + async def test_generic_exception_in_process_aborts(self) -> None: + calls: list[str] = [] + pipeline = Pipeline( + steps=[ + StubStep("a", calls=calls, process_raises=RuntimeError("kaboom")), + StubStep("b", calls=calls), + ] + ) + with pytest.raises(RuntimeError): + await pipeline.start(_make_request()) + assert calls == ["a.validate", "a.process"] + + +class TestStepMutation: + async def test_response_ix_shared_across_steps(self) -> None: + def mutate_a(resp: ResponseIX) -> None: + resp.use_case = "set-by-a" + + def mutate_b(resp: ResponseIX) -> None: + # b sees what a wrote. + assert resp.use_case == "set-by-a" + resp.use_case_name = "set-by-b" + + pipeline = Pipeline( + steps=[ + StubStep("a", mutate=mutate_a), + StubStep("b", mutate=mutate_b), + ] + ) + response = await pipeline.start(_make_request()) + assert response.use_case == "set-by-a" + assert response.use_case_name == "set-by-b"