feat(pipeline): Step ABC + Pipeline runner + Timer #9
4 changed files with 392 additions and 0 deletions
18
src/ix/pipeline/__init__.py
Normal file
18
src/ix/pipeline/__init__.py
Normal file
|
|
@ -0,0 +1,18 @@
|
|||
"""Pipeline orchestration: Step ABC, Pipeline runner, per-step Timer.
|
||||
|
||||
Concrete steps (Setup / OCR / GenAI / Reliability / ResponseHandler) live in
|
||||
sibling modules and are wired into a :class:`Pipeline` by the application
|
||||
factory. The pipeline core is transport-agnostic: it takes a
|
||||
:class:`~ix.contracts.RequestIX`, threads a shared
|
||||
:class:`~ix.contracts.ResponseIX` through every step, and returns the
|
||||
populated response. Timings land in ``response.metadata.timings``; the first
|
||||
:class:`~ix.errors.IXException` raised aborts the pipeline and is written to
|
||||
``response.error``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ix.pipeline.pipeline import Pipeline, Timer
|
||||
from ix.pipeline.step import Step
|
||||
|
||||
__all__ = ["Pipeline", "Step", "Timer"]
|
||||
108
src/ix/pipeline/pipeline.py
Normal file
108
src/ix/pipeline/pipeline.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
"""Pipeline runner + Timer context manager (spec §4).
|
||||
|
||||
The runner threads a fresh :class:`~ix.contracts.ResponseIX` through every
|
||||
registered :class:`Step`, records per-step elapsed seconds in
|
||||
``response.metadata.timings`` (always — even for validated-out-or-raised
|
||||
steps, so the timeline is reconstructable from logs), and aborts on the
|
||||
first :class:`~ix.errors.IXException` by writing ``response.error`` and
|
||||
stopping the loop. Non-IX exceptions propagate — the job-store layer decides
|
||||
whether to swallow or surface them.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from types import TracebackType
|
||||
from typing import Any
|
||||
|
||||
from ix.contracts import Metadata, RequestIX, ResponseIX
|
||||
from ix.errors import IXException
|
||||
from ix.pipeline.step import Step
|
||||
|
||||
|
||||
class Timer:
|
||||
"""Context manager that appends one timing entry to a list.
|
||||
|
||||
Example::
|
||||
|
||||
timings: list[dict[str, Any]] = []
|
||||
with Timer("setup", timings):
|
||||
... # work
|
||||
# timings == [{"step": "setup", "elapsed_seconds": 0.003}]
|
||||
|
||||
The entry is appended on ``__exit__`` regardless of whether the body
|
||||
raised — the timeline stays accurate even for failed steps.
|
||||
"""
|
||||
|
||||
def __init__(self, step_name: str, sink: list[dict[str, Any]]) -> None:
|
||||
self._step_name = step_name
|
||||
self._sink = sink
|
||||
self._start: float = 0.0
|
||||
|
||||
def __enter__(self) -> Timer:
|
||||
self._start = time.perf_counter()
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc: BaseException | None,
|
||||
tb: TracebackType | None,
|
||||
) -> None:
|
||||
elapsed = time.perf_counter() - self._start
|
||||
self._sink.append({"step": self._step_name, "elapsed_seconds": elapsed})
|
||||
|
||||
|
||||
class Pipeline:
|
||||
"""Runs a fixed ordered list of :class:`Step` instances against one request.
|
||||
|
||||
The pipeline is stateless — constructing once at app-startup and calling
|
||||
:meth:`start` repeatedly is the intended usage pattern. Per-request state
|
||||
lives on the :class:`~ix.contracts.ResponseIX` the pipeline creates and
|
||||
threads through every step.
|
||||
"""
|
||||
|
||||
def __init__(self, steps: list[Step]) -> None:
|
||||
self._steps = list(steps)
|
||||
|
||||
async def start(self, request_ix: RequestIX) -> ResponseIX:
|
||||
"""Execute every step; return the populated :class:`ResponseIX`.
|
||||
|
||||
Flow:
|
||||
|
||||
1. Instantiate a fresh ``ResponseIX`` seeded with request correlation
|
||||
ids.
|
||||
2. For each step: time the call, run ``validate`` then (iff True)
|
||||
``process``. Append the timing entry. If either hook raises
|
||||
:class:`~ix.errors.IXException`, write ``response.error`` and
|
||||
stop. Non-IX exceptions propagate.
|
||||
"""
|
||||
response_ix = ResponseIX(
|
||||
use_case=request_ix.use_case,
|
||||
ix_client_id=request_ix.ix_client_id,
|
||||
request_id=request_ix.request_id,
|
||||
ix_id=request_ix.ix_id,
|
||||
metadata=Metadata(),
|
||||
)
|
||||
|
||||
for step in self._steps:
|
||||
with Timer(step.step_name, response_ix.metadata.timings):
|
||||
try:
|
||||
should_run = await step.validate(request_ix, response_ix)
|
||||
except IXException as exc:
|
||||
response_ix.error = str(exc)
|
||||
return response_ix
|
||||
|
||||
if not should_run:
|
||||
continue
|
||||
|
||||
try:
|
||||
response_ix = await step.process(request_ix, response_ix)
|
||||
except IXException as exc:
|
||||
response_ix.error = str(exc)
|
||||
return response_ix
|
||||
|
||||
return response_ix
|
||||
|
||||
|
||||
__all__ = ["Pipeline", "Timer"]
|
||||
58
src/ix/pipeline/step.py
Normal file
58
src/ix/pipeline/step.py
Normal file
|
|
@ -0,0 +1,58 @@
|
|||
"""Step ABC — the pipeline-step contract (spec §3).
|
||||
|
||||
Every pipeline step implements two async hooks:
|
||||
|
||||
* :meth:`Step.validate` — returns ``True`` when the step should run for this
|
||||
request, ``False`` when it should be silently skipped. May raise
|
||||
:class:`~ix.errors.IXException` to abort the pipeline with an error code.
|
||||
* :meth:`Step.process` — does the work; mutates the shared ``response_ix``
|
||||
and returns it. May raise :class:`~ix.errors.IXException` to abort.
|
||||
|
||||
Both hooks are async so steps that need I/O (file download, OCR, LLM) can
|
||||
cooperate with the asyncio event loop without sync-async conversion dances.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from ix.contracts import RequestIX, ResponseIX
|
||||
|
||||
|
||||
class Step(ABC):
|
||||
"""Abstract base for every pipeline step.
|
||||
|
||||
Subclasses override both hooks. The pipeline runner guarantees
|
||||
``validate`` is called before ``process`` for a given step, and that
|
||||
``process`` runs iff ``validate`` returned ``True``.
|
||||
|
||||
The :attr:`step_name` property controls the label written to
|
||||
``metadata.timings``. Defaults to the class name so production steps
|
||||
(``SetupStep``, ``OCRStep``, …) log under their own name; test doubles
|
||||
override it with the value under test.
|
||||
"""
|
||||
|
||||
@property
|
||||
def step_name(self) -> str:
|
||||
"""Label used in ``metadata.timings``. Default: class name."""
|
||||
return type(self).__name__
|
||||
|
||||
@abstractmethod
|
||||
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||
"""Return ``True`` to run :meth:`process`, ``False`` to skip silently.
|
||||
|
||||
Raise :class:`~ix.errors.IXException` to abort the pipeline with an
|
||||
error code on ``response_ix.error``.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
async def process(
|
||||
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||
) -> ResponseIX:
|
||||
"""Run the step; return the (same, mutated) ``response_ix``.
|
||||
|
||||
Raising :class:`~ix.errors.IXException` aborts the pipeline.
|
||||
"""
|
||||
|
||||
|
||||
__all__ = ["Step"]
|
||||
208
tests/unit/test_pipeline.py
Normal file
208
tests/unit/test_pipeline.py
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
"""Tests for the Pipeline orchestrator + Step ABC + Timer (spec §3, §4)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
from ix.contracts import Context, RequestIX, ResponseIX
|
||||
from ix.errors import IXErrorCode, IXException
|
||||
from ix.pipeline import Pipeline, Step
|
||||
|
||||
|
||||
def _make_request() -> RequestIX:
|
||||
return RequestIX(
|
||||
use_case="bank_statement_header",
|
||||
ix_client_id="test",
|
||||
request_id="r-1",
|
||||
context=Context(files=[], texts=["hello"]),
|
||||
)
|
||||
|
||||
|
||||
class StubStep(Step):
|
||||
"""Hand-written Step double. Records call order on a shared list."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
*,
|
||||
calls: list[str] | None = None,
|
||||
validate_returns: bool = True,
|
||||
validate_raises: IXException | None = None,
|
||||
process_raises: IXException | BaseException | None = None,
|
||||
mutate: Any = None,
|
||||
) -> None:
|
||||
self.name = name
|
||||
self._calls = calls if calls is not None else []
|
||||
self._validate_returns = validate_returns
|
||||
self._validate_raises = validate_raises
|
||||
self._process_raises = process_raises
|
||||
self._mutate = mutate
|
||||
|
||||
@property
|
||||
def step_name(self) -> str: # type: ignore[override]
|
||||
return self.name
|
||||
|
||||
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||
self._calls.append(f"{self.name}.validate")
|
||||
if self._validate_raises is not None:
|
||||
raise self._validate_raises
|
||||
return self._validate_returns
|
||||
|
||||
async def process(
|
||||
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||
) -> ResponseIX:
|
||||
self._calls.append(f"{self.name}.process")
|
||||
if self._process_raises is not None:
|
||||
raise self._process_raises
|
||||
if self._mutate is not None:
|
||||
self._mutate(response_ix)
|
||||
return response_ix
|
||||
|
||||
|
||||
class TestOrdering:
|
||||
async def test_steps_run_in_registered_order(self) -> None:
|
||||
calls: list[str] = []
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", calls=calls),
|
||||
StubStep("b", calls=calls),
|
||||
StubStep("c", calls=calls),
|
||||
]
|
||||
)
|
||||
await pipeline.start(_make_request())
|
||||
assert calls == [
|
||||
"a.validate",
|
||||
"a.process",
|
||||
"b.validate",
|
||||
"b.process",
|
||||
"c.validate",
|
||||
"c.process",
|
||||
]
|
||||
|
||||
|
||||
class TestSkipOnFalse:
|
||||
async def test_validate_false_skips_process(self) -> None:
|
||||
calls: list[str] = []
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", calls=calls, validate_returns=False),
|
||||
StubStep("b", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
assert calls == ["a.validate", "b.validate", "b.process"]
|
||||
assert response.error is None
|
||||
|
||||
|
||||
class TestErrorFromValidate:
|
||||
async def test_validate_raising_ix_sets_error_and_aborts(self) -> None:
|
||||
calls: list[str] = []
|
||||
err = IXException(IXErrorCode.IX_000_002, detail="nothing")
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", calls=calls, validate_raises=err),
|
||||
StubStep("b", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
assert calls == ["a.validate"]
|
||||
assert response.error is not None
|
||||
assert response.error.startswith("IX_000_002")
|
||||
assert "nothing" in response.error
|
||||
|
||||
|
||||
class TestErrorFromProcess:
|
||||
async def test_process_raising_ix_sets_error_and_aborts(self) -> None:
|
||||
calls: list[str] = []
|
||||
err = IXException(IXErrorCode.IX_002_001, detail="bad parse")
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", calls=calls, process_raises=err),
|
||||
StubStep("b", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
assert calls == ["a.validate", "a.process"]
|
||||
assert response.error is not None
|
||||
assert response.error.startswith("IX_002_001")
|
||||
assert "bad parse" in response.error
|
||||
|
||||
|
||||
class TestTimings:
|
||||
async def test_timings_populated_for_every_executed_step(self) -> None:
|
||||
calls: list[str] = []
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("alpha", calls=calls),
|
||||
StubStep("beta", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
# One timing per executed step.
|
||||
names = [t["step"] for t in response.metadata.timings]
|
||||
assert names == ["alpha", "beta"]
|
||||
for entry in response.metadata.timings:
|
||||
assert isinstance(entry["elapsed_seconds"], float)
|
||||
assert entry["elapsed_seconds"] >= 0.0
|
||||
|
||||
async def test_timing_recorded_even_when_validate_false(self) -> None:
|
||||
calls: list[str] = []
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("skipper", calls=calls, validate_returns=False),
|
||||
StubStep("runner", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
names = [t["step"] for t in response.metadata.timings]
|
||||
assert names == ["skipper", "runner"]
|
||||
|
||||
async def test_timing_recorded_when_validate_raises(self) -> None:
|
||||
calls: list[str] = []
|
||||
err = IXException(IXErrorCode.IX_001_001, detail="missing")
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("boom", calls=calls, validate_raises=err),
|
||||
StubStep("unreached", calls=calls),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
names = [t["step"] for t in response.metadata.timings]
|
||||
assert names == ["boom"]
|
||||
|
||||
|
||||
class TestNonIXExceptionStillSurfaces:
|
||||
async def test_generic_exception_in_process_aborts(self) -> None:
|
||||
calls: list[str] = []
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", calls=calls, process_raises=RuntimeError("kaboom")),
|
||||
StubStep("b", calls=calls),
|
||||
]
|
||||
)
|
||||
with pytest.raises(RuntimeError):
|
||||
await pipeline.start(_make_request())
|
||||
assert calls == ["a.validate", "a.process"]
|
||||
|
||||
|
||||
class TestStepMutation:
|
||||
async def test_response_ix_shared_across_steps(self) -> None:
|
||||
def mutate_a(resp: ResponseIX) -> None:
|
||||
resp.use_case = "set-by-a"
|
||||
|
||||
def mutate_b(resp: ResponseIX) -> None:
|
||||
# b sees what a wrote.
|
||||
assert resp.use_case == "set-by-a"
|
||||
resp.use_case_name = "set-by-b"
|
||||
|
||||
pipeline = Pipeline(
|
||||
steps=[
|
||||
StubStep("a", mutate=mutate_a),
|
||||
StubStep("b", mutate=mutate_b),
|
||||
]
|
||||
)
|
||||
response = await pipeline.start(_make_request())
|
||||
assert response.use_case == "set-by-a"
|
||||
assert response.use_case_name == "set-by-b"
|
||||
Loading…
Reference in a new issue