feat(pipeline): ResponseHandlerStep — shape-up final payload (spec §8)
All checks were successful
tests / test (push) Successful in 1m0s
tests / test (pull_request) Successful in 1m2s

Final pipeline step. Three mechanical transforms:

1. include_ocr_text -> concatenate non-tag line texts, pages joined
   with \n\n, write to ocr_result.result.text.
2. include_geometries=False (default) -> strip ocr_result.result.pages
   + ocr_result.meta_data. Geometries are heavy; callers opt in.
3. Delete response.context so the internal accumulator never leaks to
   the caller (belt-and-braces; Field(exclude=True) already does this).

validate() always returns True per spec.

7 unit tests in tests/unit/test_response_handler_step.py cover all
three branches + context-not-in-model_dump check.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dirk Riemann 2026-04-18 11:21:36 +02:00
parent 83c1996702
commit 565d8d0676
2 changed files with 203 additions and 0 deletions

View file

@ -0,0 +1,67 @@
"""ResponseHandlerStep — final shape-up before the caller sees the payload (spec §8).
Does three purely mechanical things:
1. When ``include_ocr_text`` is set, concatenate every non-tag line text
into ``ocr_result.result.text`` (pages joined with blank line).
2. When ``include_geometries`` is **not** set (the default), strip
``ocr_result.result.pages`` and ``ocr_result.meta_data`` geometries
are heavyweight; callers opt in.
3. Clear ``response_ix.context`` (belt-and-braces ``Field(exclude=True)``
already keeps it out of ``model_dump`` output).
:meth:`validate` always returns True per spec.
"""
from __future__ import annotations
import re
from ix.contracts import RequestIX, ResponseIX
from ix.pipeline.step import Step
_PAGE_TAG_RE = re.compile(r"^\s*<\s*/?\s*page\b", re.IGNORECASE)
def _is_page_tag(text: str | None) -> bool:
if not text:
return False
return bool(_PAGE_TAG_RE.match(text))
class ResponseHandlerStep(Step):
"""Final shape-up step."""
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
return True
async def process(
self, request_ix: RequestIX, response_ix: ResponseIX
) -> ResponseIX:
ocr_opts = request_ix.options.ocr
# 1. Attach flat OCR text if requested.
if ocr_opts.include_ocr_text:
page_texts: list[str] = []
for page in response_ix.ocr_result.result.pages:
line_texts = [
line.text or ""
for line in page.lines
if not _is_page_tag(line.text)
]
page_texts.append("\n".join(line_texts))
response_ix.ocr_result.result.text = "\n\n".join(page_texts) or None
# 2. Strip geometries unless explicitly retained.
if not ocr_opts.include_geometries:
response_ix.ocr_result.result.pages = []
response_ix.ocr_result.meta_data = {}
# 3. Drop the internal context — already Field(exclude=True),
# this is defense in depth.
response_ix.context = None
return response_ix
__all__ = ["ResponseHandlerStep"]

View file

@ -0,0 +1,136 @@
"""Tests for :class:`ix.pipeline.response_handler_step.ResponseHandlerStep` (spec §8)."""
from __future__ import annotations
from ix.contracts import (
Context,
Line,
OCRDetails,
OCROptions,
OCRResult,
Options,
Page,
RequestIX,
ResponseIX,
)
from ix.contracts.response import _InternalContext
from ix.pipeline.response_handler_step import ResponseHandlerStep
def _make_request(
*,
include_geometries: bool = False,
include_ocr_text: bool = False,
) -> RequestIX:
return RequestIX(
use_case="bank_statement_header",
ix_client_id="test",
request_id="r-1",
context=Context(files=[], texts=[]),
options=Options(
ocr=OCROptions(
include_geometries=include_geometries,
include_ocr_text=include_ocr_text,
)
),
)
def _populated_response() -> ResponseIX:
resp = ResponseIX(
ocr_result=OCRResult(
result=OCRDetails(
text=None,
pages=[
Page(
page_no=1,
width=100.0,
height=200.0,
lines=[
Line(text='<page file="0" number="1">', bounding_box=[]),
Line(text="hello", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
Line(text="world", bounding_box=[0, 2, 1, 2, 1, 3, 0, 3]),
Line(text="</page>", bounding_box=[]),
],
),
Page(
page_no=2,
width=100.0,
height=200.0,
lines=[
Line(text="p2 line", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
],
),
],
),
meta_data={"adapter": "fake"},
)
)
resp.context = _InternalContext()
return resp
class TestValidateAlwaysTrue:
async def test_validate_always_true(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
assert await step.validate(req, _populated_response()) is True
class TestAttachOcrText:
async def test_include_ocr_text_concatenates_lines(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_ocr_text=True, include_geometries=True)
resp = _populated_response()
resp = await step.process(req, resp)
# Page tag lines excluded; real lines joined within page with \n,
# pages with \n\n.
text = resp.ocr_result.result.text
assert text is not None
assert "hello\nworld" in text
assert "p2 line" in text
assert "<page" not in text
async def test_include_ocr_text_false_leaves_text_alone(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_ocr_text=False, include_geometries=True)
resp = _populated_response()
resp.ocr_result.result.text = None
resp = await step.process(req, resp)
assert resp.ocr_result.result.text is None
class TestStripGeometries:
async def test_strips_pages_and_meta_when_off(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_geometries=False)
resp = _populated_response()
resp = await step.process(req, resp)
assert resp.ocr_result.result.pages == []
assert resp.ocr_result.meta_data == {}
async def test_keeps_pages_when_on(self) -> None:
step = ResponseHandlerStep()
req = _make_request(include_geometries=True)
resp = _populated_response()
pages_before = [p.page_no for p in resp.ocr_result.result.pages]
resp = await step.process(req, resp)
assert [p.page_no for p in resp.ocr_result.result.pages] == pages_before
assert resp.ocr_result.meta_data == {"adapter": "fake"}
class TestContextDeletion:
async def test_context_removed(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
resp = _populated_response()
resp = await step.process(req, resp)
assert resp.context is None
async def test_context_not_in_model_dump(self) -> None:
step = ResponseHandlerStep()
req = _make_request()
resp = _populated_response()
resp = await step.process(req, resp)
dump = resp.model_dump()
assert "context" not in dump