feat(pipeline): ResponseHandlerStep (spec §8) #16
2 changed files with 203 additions and 0 deletions
67
src/ix/pipeline/response_handler_step.py
Normal file
67
src/ix/pipeline/response_handler_step.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
"""ResponseHandlerStep — final shape-up before the caller sees the payload (spec §8).
|
||||
|
||||
Does three purely mechanical things:
|
||||
|
||||
1. When ``include_ocr_text`` is set, concatenate every non-tag line text
|
||||
into ``ocr_result.result.text`` (pages joined with blank line).
|
||||
2. When ``include_geometries`` is **not** set (the default), strip
|
||||
``ocr_result.result.pages`` and ``ocr_result.meta_data`` — geometries
|
||||
are heavyweight; callers opt in.
|
||||
3. Clear ``response_ix.context`` (belt-and-braces — ``Field(exclude=True)``
|
||||
already keeps it out of ``model_dump`` output).
|
||||
|
||||
:meth:`validate` always returns True per spec.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
from ix.contracts import RequestIX, ResponseIX
|
||||
from ix.pipeline.step import Step
|
||||
|
||||
_PAGE_TAG_RE = re.compile(r"^\s*<\s*/?\s*page\b", re.IGNORECASE)
|
||||
|
||||
|
||||
def _is_page_tag(text: str | None) -> bool:
|
||||
if not text:
|
||||
return False
|
||||
return bool(_PAGE_TAG_RE.match(text))
|
||||
|
||||
|
||||
class ResponseHandlerStep(Step):
|
||||
"""Final shape-up step."""
|
||||
|
||||
async def validate(self, request_ix: RequestIX, response_ix: ResponseIX) -> bool:
|
||||
return True
|
||||
|
||||
async def process(
|
||||
self, request_ix: RequestIX, response_ix: ResponseIX
|
||||
) -> ResponseIX:
|
||||
ocr_opts = request_ix.options.ocr
|
||||
|
||||
# 1. Attach flat OCR text if requested.
|
||||
if ocr_opts.include_ocr_text:
|
||||
page_texts: list[str] = []
|
||||
for page in response_ix.ocr_result.result.pages:
|
||||
line_texts = [
|
||||
line.text or ""
|
||||
for line in page.lines
|
||||
if not _is_page_tag(line.text)
|
||||
]
|
||||
page_texts.append("\n".join(line_texts))
|
||||
response_ix.ocr_result.result.text = "\n\n".join(page_texts) or None
|
||||
|
||||
# 2. Strip geometries unless explicitly retained.
|
||||
if not ocr_opts.include_geometries:
|
||||
response_ix.ocr_result.result.pages = []
|
||||
response_ix.ocr_result.meta_data = {}
|
||||
|
||||
# 3. Drop the internal context — already Field(exclude=True),
|
||||
# this is defense in depth.
|
||||
response_ix.context = None
|
||||
|
||||
return response_ix
|
||||
|
||||
|
||||
__all__ = ["ResponseHandlerStep"]
|
||||
136
tests/unit/test_response_handler_step.py
Normal file
136
tests/unit/test_response_handler_step.py
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
"""Tests for :class:`ix.pipeline.response_handler_step.ResponseHandlerStep` (spec §8)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from ix.contracts import (
|
||||
Context,
|
||||
Line,
|
||||
OCRDetails,
|
||||
OCROptions,
|
||||
OCRResult,
|
||||
Options,
|
||||
Page,
|
||||
RequestIX,
|
||||
ResponseIX,
|
||||
)
|
||||
from ix.contracts.response import _InternalContext
|
||||
from ix.pipeline.response_handler_step import ResponseHandlerStep
|
||||
|
||||
|
||||
def _make_request(
|
||||
*,
|
||||
include_geometries: bool = False,
|
||||
include_ocr_text: bool = False,
|
||||
) -> RequestIX:
|
||||
return RequestIX(
|
||||
use_case="bank_statement_header",
|
||||
ix_client_id="test",
|
||||
request_id="r-1",
|
||||
context=Context(files=[], texts=[]),
|
||||
options=Options(
|
||||
ocr=OCROptions(
|
||||
include_geometries=include_geometries,
|
||||
include_ocr_text=include_ocr_text,
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _populated_response() -> ResponseIX:
|
||||
resp = ResponseIX(
|
||||
ocr_result=OCRResult(
|
||||
result=OCRDetails(
|
||||
text=None,
|
||||
pages=[
|
||||
Page(
|
||||
page_no=1,
|
||||
width=100.0,
|
||||
height=200.0,
|
||||
lines=[
|
||||
Line(text='<page file="0" number="1">', bounding_box=[]),
|
||||
Line(text="hello", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
|
||||
Line(text="world", bounding_box=[0, 2, 1, 2, 1, 3, 0, 3]),
|
||||
Line(text="</page>", bounding_box=[]),
|
||||
],
|
||||
),
|
||||
Page(
|
||||
page_no=2,
|
||||
width=100.0,
|
||||
height=200.0,
|
||||
lines=[
|
||||
Line(text="p2 line", bounding_box=[0, 0, 1, 0, 1, 1, 0, 1]),
|
||||
],
|
||||
),
|
||||
],
|
||||
),
|
||||
meta_data={"adapter": "fake"},
|
||||
)
|
||||
)
|
||||
resp.context = _InternalContext()
|
||||
return resp
|
||||
|
||||
|
||||
class TestValidateAlwaysTrue:
|
||||
async def test_validate_always_true(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request()
|
||||
assert await step.validate(req, _populated_response()) is True
|
||||
|
||||
|
||||
class TestAttachOcrText:
|
||||
async def test_include_ocr_text_concatenates_lines(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request(include_ocr_text=True, include_geometries=True)
|
||||
resp = _populated_response()
|
||||
resp = await step.process(req, resp)
|
||||
# Page tag lines excluded; real lines joined within page with \n,
|
||||
# pages with \n\n.
|
||||
text = resp.ocr_result.result.text
|
||||
assert text is not None
|
||||
assert "hello\nworld" in text
|
||||
assert "p2 line" in text
|
||||
assert "<page" not in text
|
||||
|
||||
async def test_include_ocr_text_false_leaves_text_alone(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request(include_ocr_text=False, include_geometries=True)
|
||||
resp = _populated_response()
|
||||
resp.ocr_result.result.text = None
|
||||
resp = await step.process(req, resp)
|
||||
assert resp.ocr_result.result.text is None
|
||||
|
||||
|
||||
class TestStripGeometries:
|
||||
async def test_strips_pages_and_meta_when_off(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request(include_geometries=False)
|
||||
resp = _populated_response()
|
||||
resp = await step.process(req, resp)
|
||||
assert resp.ocr_result.result.pages == []
|
||||
assert resp.ocr_result.meta_data == {}
|
||||
|
||||
async def test_keeps_pages_when_on(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request(include_geometries=True)
|
||||
resp = _populated_response()
|
||||
pages_before = [p.page_no for p in resp.ocr_result.result.pages]
|
||||
resp = await step.process(req, resp)
|
||||
assert [p.page_no for p in resp.ocr_result.result.pages] == pages_before
|
||||
assert resp.ocr_result.meta_data == {"adapter": "fake"}
|
||||
|
||||
|
||||
class TestContextDeletion:
|
||||
async def test_context_removed(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request()
|
||||
resp = _populated_response()
|
||||
resp = await step.process(req, resp)
|
||||
assert resp.context is None
|
||||
|
||||
async def test_context_not_in_model_dump(self) -> None:
|
||||
step = ResponseHandlerStep()
|
||||
req = _make_request()
|
||||
resp = _populated_response()
|
||||
resp = await step.process(req, resp)
|
||||
dump = resp.model_dump()
|
||||
assert "context" not in dump
|
||||
Loading…
Reference in a new issue