infoxtractor/src/ix/pipeline/pipeline.py
Dirk Riemann dcd1bc764a
All checks were successful
tests / test (push) Successful in 56s
tests / test (pull_request) Successful in 1m7s
feat(pipeline): Step ABC + Pipeline runner + Timer (spec §3, §4)
Adds the transport-agnostic pipeline orchestrator. Each step implements
async validate + process; the runner wraps both in a Timer, writes
per-step entries to response.metadata.timings, and aborts on the first
IXException by writing response.error.

- Step exposes a step_name property (defaults to class name) so tests and
  logs label steps consistently.
- Timer is a plain context manager that appends one {step, elapsed_seconds}
  entry on exit regardless of whether the body raised, so the timeline
  stays reconstructable for failed steps.
- 9 unit tests cover ordering, skip-on-false, IXException in validate vs.
  process, timings populated for every executed step, and shared-response
  mutation across steps. Non-IX exceptions propagate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:06:46 +02:00

108 lines
3.6 KiB
Python

"""Pipeline runner + Timer context manager (spec §4).
The runner threads a fresh :class:`~ix.contracts.ResponseIX` through every
registered :class:`Step`, records per-step elapsed seconds in
``response.metadata.timings`` (always — even for validated-out-or-raised
steps, so the timeline is reconstructable from logs), and aborts on the
first :class:`~ix.errors.IXException` by writing ``response.error`` and
stopping the loop. Non-IX exceptions propagate — the job-store layer decides
whether to swallow or surface them.
"""
from __future__ import annotations
import time
from types import TracebackType
from typing import Any
from ix.contracts import Metadata, RequestIX, ResponseIX
from ix.errors import IXException
from ix.pipeline.step import Step
class Timer:
"""Context manager that appends one timing entry to a list.
Example::
timings: list[dict[str, Any]] = []
with Timer("setup", timings):
... # work
# timings == [{"step": "setup", "elapsed_seconds": 0.003}]
The entry is appended on ``__exit__`` regardless of whether the body
raised — the timeline stays accurate even for failed steps.
"""
def __init__(self, step_name: str, sink: list[dict[str, Any]]) -> None:
self._step_name = step_name
self._sink = sink
self._start: float = 0.0
def __enter__(self) -> Timer:
self._start = time.perf_counter()
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
elapsed = time.perf_counter() - self._start
self._sink.append({"step": self._step_name, "elapsed_seconds": elapsed})
class Pipeline:
"""Runs a fixed ordered list of :class:`Step` instances against one request.
The pipeline is stateless — constructing once at app-startup and calling
:meth:`start` repeatedly is the intended usage pattern. Per-request state
lives on the :class:`~ix.contracts.ResponseIX` the pipeline creates and
threads through every step.
"""
def __init__(self, steps: list[Step]) -> None:
self._steps = list(steps)
async def start(self, request_ix: RequestIX) -> ResponseIX:
"""Execute every step; return the populated :class:`ResponseIX`.
Flow:
1. Instantiate a fresh ``ResponseIX`` seeded with request correlation
ids.
2. For each step: time the call, run ``validate`` then (iff True)
``process``. Append the timing entry. If either hook raises
:class:`~ix.errors.IXException`, write ``response.error`` and
stop. Non-IX exceptions propagate.
"""
response_ix = ResponseIX(
use_case=request_ix.use_case,
ix_client_id=request_ix.ix_client_id,
request_id=request_ix.request_id,
ix_id=request_ix.ix_id,
metadata=Metadata(),
)
for step in self._steps:
with Timer(step.step_name, response_ix.metadata.timings):
try:
should_run = await step.validate(request_ix, response_ix)
except IXException as exc:
response_ix.error = str(exc)
return response_ix
if not should_run:
continue
try:
response_ix = await step.process(request_ix, response_ix)
except IXException as exc:
response_ix.error = str(exc)
return response_ix
return response_ix
__all__ = ["Pipeline", "Timer"]