Compare commits

...

2 commits

Author SHA1 Message Date
230068e484 feat(contracts): ResponseIX + Provenance + Job (spec §3, §9.3) (#4)
Some checks are pending
tests / test (push) Waiting to run
Lands the outgoing-response data contracts.
2026-04-18 08:50:37 +00:00
02db3b05cc feat(contracts): ResponseIX + Provenance + Job envelope (spec §3, §9.3)
All checks were successful
tests / test (push) Successful in 1m2s
tests / test (pull_request) Successful in 1m0s
Completes the data-contract layer. Highlights:

- `ResponseIX.context` is an internal mutable accumulator used by pipeline
  steps (pages, files, texts, use_case classes, segment index). It MUST NOT
  leak into the serialised response, so we mark the field with
  `Field(exclude=True)` and carry the shape in a small `_InternalContext`
  sub-model with `extra="allow"` so steps can stash arbitrary state without
  schema churn. Tested: `model_dump()` and `model_dump_json()` both drop it.

- `FieldProvenance` gains `provenance_verified: bool | None` and
  `text_agreement: bool | None` — the two MVP reliability flags written by
  the new ReliabilityStep. Both default None so rows predating the
  ReliabilityStep (empty LLM output, cloud-import replay) parse cleanly.

- `quality_metrics` stays a free-form `dict[str, Any]` — the MVP adds
  `verified_fields` and `text_agreement_fields` counters without carving
  them into the schema, which keeps future metric additions free.

- `Job.status` and `Job.callback_status` are `Literal[...]` so Pydantic
  rejects unknown states at the edge. Invariant
  (`status='done' iff response.error is None`) stays worker-enforced —
  callers sometimes hydrate in-flight rows and we do not want validation
  to reject them.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 10:50:22 +02:00
5 changed files with 513 additions and 0 deletions

View file

@ -6,6 +6,14 @@ Re-exports the public symbols from sibling modules so call sites can write
from __future__ import annotations from __future__ import annotations
from ix.contracts.job import CallbackStatus, Job, JobStatus
from ix.contracts.provenance import (
BoundingBox,
ExtractionSource,
FieldProvenance,
ProvenanceData,
SegmentCitation,
)
from ix.contracts.request import ( from ix.contracts.request import (
Context, Context,
FileRef, FileRef,
@ -15,13 +23,37 @@ from ix.contracts.request import (
ProvenanceOptions, ProvenanceOptions,
RequestIX, RequestIX,
) )
from ix.contracts.response import (
IXResult,
Line,
Metadata,
OCRDetails,
OCRResult,
Page,
ResponseIX,
)
__all__ = [ __all__ = [
"BoundingBox",
"CallbackStatus",
"Context", "Context",
"ExtractionSource",
"FieldProvenance",
"FileRef", "FileRef",
"GenAIOptions", "GenAIOptions",
"IXResult",
"Job",
"JobStatus",
"Line",
"Metadata",
"OCRDetails",
"OCROptions", "OCROptions",
"OCRResult",
"Options", "Options",
"Page",
"ProvenanceData",
"ProvenanceOptions", "ProvenanceOptions",
"RequestIX", "RequestIX",
"ResponseIX",
"SegmentCitation",
] ]

46
src/ix/contracts/job.py Normal file
View file

@ -0,0 +1,46 @@
"""Job envelope stored in ``ix_jobs`` and returned by REST.
Mirrors spec §3 ("Job envelope") and §4 ("Job store"). The lifecycle
enum is a ``Literal`` so Pydantic rejects unknown values at parse time.
``callback_status`` is nullable until the worker attempts delivery (or
skips delivery when there's no ``callback_url``).
"""
from __future__ import annotations
from datetime import datetime
from typing import Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict
from ix.contracts.request import RequestIX
from ix.contracts.response import ResponseIX
JobStatus = Literal["pending", "running", "done", "error"]
CallbackStatus = Literal["pending", "delivered", "failed"]
class Job(BaseModel):
"""Row of ``ix_jobs`` + its request/response bodies.
The invariant ``status='done' iff response.error is None`` is enforced by
the worker, not here callers occasionally hydrate a stale or in-flight
row and we don't want the Pydantic validator to reject it.
"""
model_config = ConfigDict(extra="forbid")
job_id: UUID
ix_id: str
client_id: str
request_id: str
status: JobStatus
request: RequestIX
response: ResponseIX | None = None
callback_url: str | None = None
callback_status: CallbackStatus | None = None
attempts: int = 0
created_at: datetime
started_at: datetime | None = None
finished_at: datetime | None = None

View file

@ -0,0 +1,89 @@
"""Provenance contracts — per-field segment citations + reliability flags.
These models represent *outputs* attached to :class:`~ix.contracts.response.ResponseIX`.
The MVP adds two new fields to :class:`FieldProvenance` beyond the reference
spec: ``provenance_verified`` and ``text_agreement``. Both are written by the
new :class:`ReliabilityStep` and are the primary reliability signals that
callers (mammon first) use to decide trust.
"""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
class BoundingBox(BaseModel):
"""Eight-coordinate polygon, normalised to 0-1 against page dimensions.
``coordinates`` order: ``[x1, y1, x2, y2, x3, y3, x4, y4]`` top-left,
top-right, bottom-right, bottom-left (same as Surya's polygon output).
"""
model_config = ConfigDict(extra="forbid")
coordinates: list[float]
class SegmentCitation(BaseModel):
"""LLM-emitted citation: one field → the segments it came from.
This is part of the dynamic ``ProvenanceWrappedResponse`` the GenAIStep
asks the model to return when provenance is on (see spec §9.2).
"""
model_config = ConfigDict(extra="forbid")
field_path: str
value_segment_ids: list[str] = Field(default_factory=list)
context_segment_ids: list[str] = Field(default_factory=list)
class ExtractionSource(BaseModel):
"""One resolved source for a field — maps a segment ID to its on-page anchor."""
model_config = ConfigDict(extra="forbid")
page_number: int
file_index: int | None = None
bounding_box: BoundingBox | None = None
text_snippet: str
relevance_score: float = 1.0
segment_id: str | None = None
class FieldProvenance(BaseModel):
"""Per-field provenance + MVP reliability flags.
``provenance_verified``: True when at least one cited segment's
``text_snippet`` agrees with the extracted value after normalisation;
False when every cite disagrees; None when the field type makes the check
meaningless (``Literal``, ``None``/unset).
``text_agreement``: True when the value also appears in
``RequestIX.context.texts`` after normalisation; False when the texts
disagree; None when no texts were provided, or when the short-value skip
rule applies, or when the type is ``Literal``/``None``.
"""
model_config = ConfigDict(extra="forbid")
field_name: str
field_path: str
value: Any = None
sources: list[ExtractionSource] = Field(default_factory=list)
confidence: float | None = None
provenance_verified: bool | None = None
text_agreement: bool | None = None
class ProvenanceData(BaseModel):
"""Aggregate provenance payload attached to :class:`ResponseIX.provenance`."""
model_config = ConfigDict(extra="forbid")
fields: dict[str, FieldProvenance] = Field(default_factory=dict)
quality_metrics: dict[str, Any] = Field(default_factory=dict)
segment_count: int | None = None
granularity: str | None = None

View file

@ -0,0 +1,124 @@
"""Outgoing response contracts — :class:`ResponseIX` + nested result structures.
Mirrors MVP spec §3 / §9.3. The only subtle piece is ``ResponseIX.context``:
it is an *internal* mutable accumulator used by pipeline steps (pages, files,
texts, use-case classes, segment index) and MUST NOT be serialised to the
caller. We enforce this with ``Field(exclude=True)`` on an opaque
:class:`_InternalContext` sub-model.
Strictness note: unlike :class:`RequestIX`, ResponseIX tolerates extra keys
via ``extra="allow"`` on the internal-context carrier so worker-side code can
stash arbitrary step-level state without growing the schema. Public
response fields still use ``extra="forbid"``.
"""
from __future__ import annotations
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from ix.contracts.provenance import ProvenanceData
class Line(BaseModel):
"""One OCR line with its raw 8-coord polygon."""
model_config = ConfigDict(extra="forbid")
text: str | None = None
bounding_box: list[float] = Field(default_factory=list)
class Page(BaseModel):
"""One OCR page. ``width``/``height`` are in points (pixels for raster images)."""
model_config = ConfigDict(extra="forbid")
page_no: int
width: float
height: float
angle: float = 0.0
unit: str | None = None
lines: list[Line] = Field(default_factory=list)
class OCRDetails(BaseModel):
"""OCR structural output."""
model_config = ConfigDict(extra="forbid")
text: str | None = None
pages: list[Page] = Field(default_factory=list)
class OCRResult(BaseModel):
"""Wraps :class:`OCRDetails` with arbitrary adapter metadata."""
model_config = ConfigDict(extra="forbid")
result: OCRDetails = Field(default_factory=OCRDetails)
meta_data: dict[str, Any] = Field(default_factory=dict)
class IXResult(BaseModel):
"""LLM extraction payload + usage/model metadata."""
model_config = ConfigDict(extra="forbid")
result: dict[str, Any] = Field(default_factory=dict)
result_confidence: dict[str, Any] = Field(default_factory=dict)
meta_data: dict[str, Any] = Field(default_factory=dict)
class Metadata(BaseModel):
"""Pipeline-level telemetry — populated by the orchestrator."""
model_config = ConfigDict(extra="forbid")
timings: list[dict[str, Any]] = Field(default_factory=list)
processed_by: str | None = None
use_case_truncated: bool | None = None
class _InternalContext(BaseModel):
"""Internal mutable accumulator — NEVER serialised.
Holds per-request state the pipeline steps pass to each other: downloaded
file handles, flat page list, extracted text scratchpad, the loaded
use-case ``Request`` / response-schema classes, and the built segment
index. Kept ``extra="allow"`` so adapters/steps can stash arbitrary state
without churning this contract. Always excluded from ``model_dump``.
"""
model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)
pages: list[Any] = Field(default_factory=list)
files: list[Any] = Field(default_factory=list)
texts: list[str] = Field(default_factory=list)
use_case_request: Any = None
use_case_response: Any = None
segment_index: Any = None
class ResponseIX(BaseModel):
"""Top-level response shape returned through the job store.
``context`` is internal-only ``Field(exclude=True)`` keeps it out of
serialised JSON. Callers see ``use_case`` ``metadata`` and nothing else.
"""
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
use_case: str | None = None
use_case_name: str | None = None
ix_client_id: str | None = None
request_id: str | None = None
ix_id: str | None = None
error: str | None = None
warning: list[str] = Field(default_factory=list)
ix_result: IXResult = Field(default_factory=IXResult)
ocr_result: OCRResult = Field(default_factory=OCRResult)
provenance: ProvenanceData | None = None
metadata: Metadata = Field(default_factory=Metadata)
context: _InternalContext | None = Field(default=None, exclude=True)

View file

@ -3,18 +3,33 @@
from __future__ import annotations from __future__ import annotations
import json import json
from datetime import UTC, datetime
from uuid import uuid4
import pytest import pytest
from pydantic import ValidationError from pydantic import ValidationError
from ix.contracts import ( from ix.contracts import (
BoundingBox,
Context, Context,
ExtractionSource,
FieldProvenance,
FileRef, FileRef,
GenAIOptions, GenAIOptions,
IXResult,
Job,
Line,
Metadata,
OCRDetails,
OCROptions, OCROptions,
OCRResult,
Options, Options,
Page,
ProvenanceData,
ProvenanceOptions, ProvenanceOptions,
RequestIX, RequestIX,
ResponseIX,
SegmentCitation,
) )
@ -166,3 +181,210 @@ class TestRequestIX:
def test_missing_required_fields(self) -> None: def test_missing_required_fields(self) -> None:
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
RequestIX.model_validate({"use_case": "x"}) RequestIX.model_validate({"use_case": "x"})
class TestOCRResult:
def test_minimal_defaults(self) -> None:
result = OCRResult()
assert result.result.text is None
assert result.result.pages == []
assert result.meta_data == {}
def test_full_page_roundtrip(self) -> None:
page = Page(
page_no=1,
width=612.0,
height=792.0,
lines=[Line(text="hello", bounding_box=[0, 0, 10, 0, 10, 20, 0, 20])],
)
ocr = OCRResult(result=OCRDetails(text="hello", pages=[page]))
dumped = ocr.model_dump()
assert dumped["result"]["pages"][0]["lines"][0]["text"] == "hello"
assert dumped["result"]["pages"][0]["lines"][0]["bounding_box"] == [
0,
0,
10,
0,
10,
20,
0,
20,
]
class TestProvenance:
def test_field_provenance_new_flags(self) -> None:
# The MVP adds `provenance_verified` + `text_agreement` on top of the
# reference spec. Both are nullable bool.
fp = FieldProvenance(
field_name="bank_name",
field_path="result.bank_name",
value="UBS AG",
sources=[
ExtractionSource(
page_number=1,
file_index=0,
bounding_box=BoundingBox(coordinates=[0.1, 0.1, 0.9, 0.1, 0.9, 0.2, 0.1, 0.2]),
text_snippet="UBS AG",
segment_id="p1_l0",
)
],
provenance_verified=True,
text_agreement=None,
)
assert fp.provenance_verified is True
assert fp.text_agreement is None
def test_field_provenance_flags_default_to_none(self) -> None:
fp = FieldProvenance(field_name="x", field_path="result.x")
assert fp.provenance_verified is None
assert fp.text_agreement is None
def test_quality_metrics_accepts_all_keys(self) -> None:
# quality_metrics is a free-form dict; we just check the MVP-listed keys
# all round-trip as written.
prov = ProvenanceData(
fields={},
quality_metrics={
"fields_with_provenance": 8,
"total_fields": 10,
"coverage_rate": 0.8,
"invalid_references": 2,
"verified_fields": 6,
"text_agreement_fields": 5,
},
)
rt = ProvenanceData.model_validate(prov.model_dump())
assert rt.quality_metrics["verified_fields"] == 6
assert rt.quality_metrics["text_agreement_fields"] == 5
assert rt.quality_metrics["coverage_rate"] == 0.8
def test_segment_citation_basic(self) -> None:
sc = SegmentCitation(
field_path="result.invoice_number",
value_segment_ids=["p1_l4"],
context_segment_ids=["p1_l3"],
)
assert sc.value_segment_ids == ["p1_l4"]
class TestResponseIX:
def test_defaults(self) -> None:
r = ResponseIX()
assert r.error is None
assert r.warning == []
assert isinstance(r.ix_result, IXResult)
assert isinstance(r.ocr_result, OCRResult)
assert isinstance(r.metadata, Metadata)
assert r.provenance is None
assert r.context is None
def test_context_excluded_from_dump(self) -> None:
# ResponseIX.context is INTERNAL — must never show up in serialised form.
r = ResponseIX()
# Push something into context via the internal model.
from ix.contracts.response import _InternalContext
r.context = _InternalContext(texts=["scratch"])
dumped = r.model_dump()
assert "context" not in dumped
dumped_json = r.model_dump_json()
assert "context" not in dumped_json
assert '"texts"' not in dumped_json # was only inside context
def test_full_roundtrip_preserves_public_shape(self) -> None:
r = ResponseIX(
use_case="bank_statement_header",
use_case_name="Bank Statement Header",
ix_client_id="mammon",
request_id="req-1",
ix_id="abc123def4567890",
ix_result=IXResult(result={"bank_name": "UBS"}),
ocr_result=OCRResult(result=OCRDetails(text="UBS", pages=[])),
provenance=ProvenanceData(
fields={
"result.bank_name": FieldProvenance(
field_name="bank_name",
field_path="result.bank_name",
value="UBS",
provenance_verified=True,
text_agreement=True,
)
},
quality_metrics={"verified_fields": 1, "text_agreement_fields": 1},
),
metadata=Metadata(timings=[{"step": "SetupStep", "seconds": 0.01}]),
)
dumped = r.model_dump()
rt = ResponseIX.model_validate(dumped)
assert rt.provenance is not None
assert rt.provenance.fields["result.bank_name"].provenance_verified is True
assert rt.metadata.timings[0]["step"] == "SetupStep"
class TestJob:
def test_basic_construction(self) -> None:
req = RequestIX(
use_case="bank_statement_header",
ix_client_id="mammon",
request_id="r1",
context=Context(files=["file:///x.pdf"]),
)
job = Job(
job_id=uuid4(),
ix_id="abcd1234abcd1234",
client_id="mammon",
request_id="r1",
status="pending",
request=req,
created_at=datetime.now(UTC),
)
assert job.status == "pending"
assert job.callback_status is None
assert job.attempts == 0
def test_invalid_status_rejected(self) -> None:
req = RequestIX(
use_case="bank_statement_header",
ix_client_id="mammon",
request_id="r1",
context=Context(files=["file:///x.pdf"]),
)
with pytest.raises(ValidationError):
Job(
job_id=uuid4(),
ix_id="abcd",
client_id="mammon",
request_id="r1",
status="weird", # not in the Literal
request=req,
created_at=datetime.now(UTC),
)
def test_full_terminal_done(self) -> None:
req = RequestIX(
use_case="bank_statement_header",
ix_client_id="mammon",
request_id="r1",
context=Context(files=["file:///x.pdf"]),
)
resp = ResponseIX(use_case="bank_statement_header")
job = Job(
job_id=uuid4(),
ix_id="abcd1234abcd1234",
client_id="mammon",
request_id="r1",
status="done",
request=req,
response=resp,
callback_url="https://cb",
callback_status="delivered",
attempts=1,
created_at=datetime.now(UTC),
started_at=datetime.now(UTC),
finished_at=datetime.now(UTC),
)
dumped = job.model_dump()
# Context must not appear anywhere in the serialised job.
assert "context" not in dumped["response"]