feat(pipeline): Step ABC + Pipeline runner + Timer (spec §3, §4)
Adds the transport-agnostic pipeline orchestrator. Each step implements
async validate + process; the runner wraps both in a Timer, writes
per-step entries to response.metadata.timings, and aborts on the first
IXException by writing response.error.
- Step exposes a step_name property (defaults to class name) so tests and
logs label steps consistently.
- Timer is a plain context manager that appends one {step, elapsed_seconds}
entry on exit regardless of whether the body raised, so the timeline
stays reconstructable for failed steps.
- 9 unit tests cover ordering, skip-on-false, IXException in validate vs.
process, timings populated for every executed step, and shared-response
mutation across steps. Non-IX exceptions propagate.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
b397a80c0b
commit
dcd1bc764a
4 changed files with 392 additions and 0 deletions
18
src/ix/pipeline/__init__.py
Normal file
18
src/ix/pipeline/__init__.py
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
"""Pipeline orchestration: Step ABC, Pipeline runner, per-step Timer.
|
||||||
|
|
||||||
|
Concrete steps (Setup / OCR / GenAI / Reliability / ResponseHandler) live in
|
||||||
|
sibling modules and are wired into a :class:`Pipeline` by the application
|
||||||
|
factory. The pipeline core is transport-agnostic: it takes a
|
||||||
|
:class:`~ix.contracts.RequestIX`, threads a shared
|
||||||
|
:class:`~ix.contracts.ResponseIX` through every step, and returns the
|
||||||
|
populated response. Timings land in ``response.metadata.timings``; the first
|
||||||
|
:class:`~ix.errors.IXException` raised aborts the pipeline and is written to
|
||||||
|
``response.error``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from ix.pipeline.pipeline import Pipeline, Timer
|
||||||
|
from ix.pipeline.step import Step
|
||||||
|
|
||||||
|
__all__ = ["Pipeline", "Step", "Timer"]
|
||||||
108
src/ix/pipeline/pipeline.py
Normal file
108
src/ix/pipeline/pipeline.py
Normal file
|
|
@ -0,0 +1,108 @@
|
||||||
|
"""Pipeline runner + Timer context manager (spec §4).
|
||||||
|
|
||||||
|
The runner threads a fresh :class:`~ix.contracts.ResponseIX` through every
|
||||||
|
registered :class:`Step`, records per-step elapsed seconds in
|
||||||
|
``response.metadata.timings`` (always — even for validated-out-or-raised
|
||||||
|
steps, so the timeline is reconstructable from logs), and aborts on the
|
||||||
|
first :class:`~ix.errors.IXException` by writing ``response.error`` and
|
||||||
|
stopping the loop. Non-IX exceptions propagate — the job-store layer decides
|
||||||
|
whether to swallow or surface them.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from types import TracebackType
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from ix.contracts import Metadata, RequestIX, ResponseIX
|
||||||
|
from ix.errors import IXException
|
||||||
|
from ix.pipeline.step import Step
|
||||||
|
|
||||||
|
|
||||||
|
class Timer:
|
||||||
|
"""Context manager that appends one timing entry to a list.
|
||||||
|
|
||||||
|
Example::
|
||||||
|
|
||||||
|
timings: list[dict[str, Any]] = []
|
||||||
|
with Timer("setup", timings):
|
||||||
|
... # work
|
||||||
|
# timings == [{"step": "setup", "elapsed_seconds": 0.003}]
|
||||||
|
|
||||||
|
The entry is appended on ``__exit__`` regardless of whether the body
|
||||||
|
raised — the timeline stays accurate even for failed steps.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, step_name: str, sink: list[dict[str, Any]]) -> None:
|
||||||
|
self._step_name = step_name
|
||||||
|
self._sink = sink
|
||||||
|
self._start: float = 0.0
|
||||||
|
|
||||||
|
def __enter__(self) -> Timer:
|
||||||
|
self._start = time.perf_counter()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(
|
||||||
|
self,
|
||||||
|
exc_type: type[BaseException] | None,
|
||||||
|
exc: BaseException | None,
|
||||||
|
tb: TracebackType | None,
|
||||||
|
) -> None:
|
||||||
|
elapsed = time.perf_counter() - self._start
|
||||||
|
self._sink.append({"step": self._step_name, "elapsed_seconds": elapsed})
|
||||||
|
|
||||||
|
|
||||||
|
class Pipeline:
|
||||||
|
"""Runs a fixed ordered list of :class:`Step` instances against one request.
|
||||||
|
|
||||||
|
The pipeline is stateless — constructing once at app-startup and calling
|
||||||
|
:meth:`start` repeatedly is the intended usage pattern. Per-request state
|
||||||
|
lives on the :class:`~ix.contracts.ResponseIX` the pipeline creates and
|
||||||
|
threads through every step.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, steps: list[Step]) -> None:
|
||||||
|
self._steps = list(steps)
|
||||||
|
|
||||||
|
async def start(self, request_ix: RequestIX) -> ResponseIX:
|
||||||
|
"""Execute every step; return the populated :class:`ResponseIX`.
|
||||||
|
|
||||||
|
Flow:
|
||||||
|
|
||||||
|
1. Instantiate a fresh ``ResponseIX`` seeded with request correlation
|
||||||
|
ids.
|
||||||
|
2. For each step: time the call, run ``validate`` then (iff True)
|
||||||
|
``process``. Append the timing entry. If either hook raises
|
||||||
|
:class:`~ix.errors.IXException`, write ``response.error`` and
|
||||||
|
stop. Non-IX exceptions propagate.
|
||||||
|
"""
|
||||||
|
response_ix = ResponseIX(
|
||||||
|
use_case=request_ix.use_case,
|
||||||
|
ix_client_id=request_ix.ix_client_id,
|
||||||
|
request_id=request_ix.request_id,
|
||||||
|
ix_id=request_ix.ix_id,
|
||||||
|
metadata=Metadata(),
|
||||||
|
)
|
||||||
|
|
||||||
|
for step in self._steps:
|
||||||
|
with Timer(step.step_name, response_ix.metadata.timings):
|
||||||
|
try:
|
||||||
|
should_run = await step.validate(request_ix, response_ix)
|
||||||
|
except IXException as exc:
|
||||||
|
response_ix.error = str(exc)
|
||||||
|
return response_ix
|
||||||
|
|
||||||
|
if not should_run:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
response_ix = await step.process(request_ix, response_ix)
|
||||||
|
except IXException as exc:
|
||||||
|
response_ix.error = str(exc)
|
||||||
|
return response_ix
|
||||||
|
|
||||||
|
return response_ix
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["Pipeline", "Timer"]
|
||||||
58
src/ix/pipeline/step.py
Normal file
58
src/ix/pipeline/step.py
Normal file
|
|
@ -0,0 +1,58 @@
|
||||||
|
"""Step ABC — the pipeline-step contract (spec §3).
|
||||||
|
|
||||||
|
Every pipeline step implements two async hooks:
|
||||||
|
|
||||||
|
* :meth:`Step.validate` — returns ``True`` when the step should run for this
|
||||||
|
request, ``False`` when it should be silently skipped. May raise
|
||||||
|
:class:`~ix.errors.IXException` to abort the pipeline with an error code.
|
||||||
|
* :meth:`Step.process` — does the work; mutates the shared ``response_ix``
|
||||||
|
and returns it. May raise :class:`~ix.errors.IXException` to abort.
|
||||||
|
|
||||||
|
Both hooks are async so steps that need I/O (file download, OCR, LLM) can
|
||||||
|
cooperate with the asyncio event loop without sync-async conversion dances.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from ix.contracts import RequestIX, ResponseIX
|
||||||
|
|
||||||
|
|
||||||
|
class Step(ABC):
|
||||||
|
"""Abstract base for every pipeline step.
|
||||||
|
|
||||||
|
Subclasses override both hooks. The pipeline runner guarantees
|
||||||
|
``validate`` is called before ``process`` for a given step, and that
|
||||||
|
``process`` runs iff ``validate`` returned ``True``.
|
||||||
|
|
||||||
|
The :attr:`step_name` property controls the label written to
|
||||||
|
``metadata.timings``. Defaults to the class name so production steps
|
||||||
|
(``SetupStep``, ``OCRStep``, …) log under their own name; test doubles
|
||||||
|
override it with the value under test.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def step_name(self) -> str:
|
||||||
|
"""Label used in ``metadata.timings``. Default: class name."""
|
||||||
|
return type(self).__name__
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||||
|
"""Return ``True`` to run :meth:`process`, ``False`` to skip silently.
|
||||||
|
|
||||||
|
Raise :class:`~ix.errors.IXException` to abort the pipeline with an
|
||||||
|
error code on ``response_ix.error``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def process(
|
||||||
|
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||||
|
) -> ResponseIX:
|
||||||
|
"""Run the step; return the (same, mutated) ``response_ix``.
|
||||||
|
|
||||||
|
Raising :class:`~ix.errors.IXException` aborts the pipeline.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["Step"]
|
||||||
208
tests/unit/test_pipeline.py
Normal file
208
tests/unit/test_pipeline.py
Normal file
|
|
@ -0,0 +1,208 @@
|
||||||
|
"""Tests for the Pipeline orchestrator + Step ABC + Timer (spec §3, §4)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ix.contracts import Context, RequestIX, ResponseIX
|
||||||
|
from ix.errors import IXErrorCode, IXException
|
||||||
|
from ix.pipeline import Pipeline, Step
|
||||||
|
|
||||||
|
|
||||||
|
def _make_request() -> RequestIX:
|
||||||
|
return RequestIX(
|
||||||
|
use_case="bank_statement_header",
|
||||||
|
ix_client_id="test",
|
||||||
|
request_id="r-1",
|
||||||
|
context=Context(files=[], texts=["hello"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class StubStep(Step):
|
||||||
|
"""Hand-written Step double. Records call order on a shared list."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
name: str,
|
||||||
|
*,
|
||||||
|
calls: list[str] | None = None,
|
||||||
|
validate_returns: bool = True,
|
||||||
|
validate_raises: IXException | None = None,
|
||||||
|
process_raises: IXException | BaseException | None = None,
|
||||||
|
mutate: Any = None,
|
||||||
|
) -> None:
|
||||||
|
self.name = name
|
||||||
|
self._calls = calls if calls is not None else []
|
||||||
|
self._validate_returns = validate_returns
|
||||||
|
self._validate_raises = validate_raises
|
||||||
|
self._process_raises = process_raises
|
||||||
|
self._mutate = mutate
|
||||||
|
|
||||||
|
@property
|
||||||
|
def step_name(self) -> str: # type: ignore[override]
|
||||||
|
return self.name
|
||||||
|
|
||||||
|
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||||
|
self._calls.append(f"{self.name}.validate")
|
||||||
|
if self._validate_raises is not None:
|
||||||
|
raise self._validate_raises
|
||||||
|
return self._validate_returns
|
||||||
|
|
||||||
|
async def process(
|
||||||
|
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||||
|
) -> ResponseIX:
|
||||||
|
self._calls.append(f"{self.name}.process")
|
||||||
|
if self._process_raises is not None:
|
||||||
|
raise self._process_raises
|
||||||
|
if self._mutate is not None:
|
||||||
|
self._mutate(response_ix)
|
||||||
|
return response_ix
|
||||||
|
|
||||||
|
|
||||||
|
class TestOrdering:
|
||||||
|
async def test_steps_run_in_registered_order(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", calls=calls),
|
||||||
|
StubStep("b", calls=calls),
|
||||||
|
StubStep("c", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
await pipeline.start(_make_request())
|
||||||
|
assert calls == [
|
||||||
|
"a.validate",
|
||||||
|
"a.process",
|
||||||
|
"b.validate",
|
||||||
|
"b.process",
|
||||||
|
"c.validate",
|
||||||
|
"c.process",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSkipOnFalse:
|
||||||
|
async def test_validate_false_skips_process(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", calls=calls, validate_returns=False),
|
||||||
|
StubStep("b", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
assert calls == ["a.validate", "b.validate", "b.process"]
|
||||||
|
assert response.error is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrorFromValidate:
|
||||||
|
async def test_validate_raising_ix_sets_error_and_aborts(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
err = IXException(IXErrorCode.IX_000_002, detail="nothing")
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", calls=calls, validate_raises=err),
|
||||||
|
StubStep("b", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
assert calls == ["a.validate"]
|
||||||
|
assert response.error is not None
|
||||||
|
assert response.error.startswith("IX_000_002")
|
||||||
|
assert "nothing" in response.error
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrorFromProcess:
|
||||||
|
async def test_process_raising_ix_sets_error_and_aborts(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
err = IXException(IXErrorCode.IX_002_001, detail="bad parse")
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", calls=calls, process_raises=err),
|
||||||
|
StubStep("b", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
assert calls == ["a.validate", "a.process"]
|
||||||
|
assert response.error is not None
|
||||||
|
assert response.error.startswith("IX_002_001")
|
||||||
|
assert "bad parse" in response.error
|
||||||
|
|
||||||
|
|
||||||
|
class TestTimings:
|
||||||
|
async def test_timings_populated_for_every_executed_step(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("alpha", calls=calls),
|
||||||
|
StubStep("beta", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
# One timing per executed step.
|
||||||
|
names = [t["step"] for t in response.metadata.timings]
|
||||||
|
assert names == ["alpha", "beta"]
|
||||||
|
for entry in response.metadata.timings:
|
||||||
|
assert isinstance(entry["elapsed_seconds"], float)
|
||||||
|
assert entry["elapsed_seconds"] >= 0.0
|
||||||
|
|
||||||
|
async def test_timing_recorded_even_when_validate_false(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("skipper", calls=calls, validate_returns=False),
|
||||||
|
StubStep("runner", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
names = [t["step"] for t in response.metadata.timings]
|
||||||
|
assert names == ["skipper", "runner"]
|
||||||
|
|
||||||
|
async def test_timing_recorded_when_validate_raises(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
err = IXException(IXErrorCode.IX_001_001, detail="missing")
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("boom", calls=calls, validate_raises=err),
|
||||||
|
StubStep("unreached", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
names = [t["step"] for t in response.metadata.timings]
|
||||||
|
assert names == ["boom"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestNonIXExceptionStillSurfaces:
|
||||||
|
async def test_generic_exception_in_process_aborts(self) -> None:
|
||||||
|
calls: list[str] = []
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", calls=calls, process_raises=RuntimeError("kaboom")),
|
||||||
|
StubStep("b", calls=calls),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
await pipeline.start(_make_request())
|
||||||
|
assert calls == ["a.validate", "a.process"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestStepMutation:
|
||||||
|
async def test_response_ix_shared_across_steps(self) -> None:
|
||||||
|
def mutate_a(resp: ResponseIX) -> None:
|
||||||
|
resp.use_case = "set-by-a"
|
||||||
|
|
||||||
|
def mutate_b(resp: ResponseIX) -> None:
|
||||||
|
# b sees what a wrote.
|
||||||
|
assert resp.use_case == "set-by-a"
|
||||||
|
resp.use_case_name = "set-by-b"
|
||||||
|
|
||||||
|
pipeline = Pipeline(
|
||||||
|
steps=[
|
||||||
|
StubStep("a", mutate=mutate_a),
|
||||||
|
StubStep("b", mutate=mutate_b),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
response = await pipeline.start(_make_request())
|
||||||
|
assert response.use_case == "set-by-a"
|
||||||
|
assert response.use_case_name == "set-by-b"
|
||||||
Loading…
Reference in a new issue