"""Maps LLM-emitted :class:`SegmentCitation` lists to :class:`ProvenanceData`. Implements spec §9.4. The algorithm is deliberately small: 1. For each citation, pick the seg-id list (``value`` vs. ``value_and_context``). 2. Cap at ``max_sources_per_field``. 3. Resolve each ID via :meth:`SegmentIndex.lookup_segment`; count misses. 4. Resolve the field's value by dot-path traversal of the extraction result. 5. Build a :class:`FieldProvenance`. Skip fields that resolved to zero sources. No verification / normalisation happens here — this module's sole job is structural assembly. :mod:`ix.provenance.verify` does the reliability pass downstream. """ from __future__ import annotations import re from typing import Any, Literal from ix.contracts.provenance import ( ExtractionSource, FieldProvenance, ProvenanceData, SegmentCitation, ) from ix.segmentation import SegmentIndex SourceType = Literal["value", "value_and_context"] _BRACKET_RE = re.compile(r"\[(\d+)\]") def resolve_nested_path(data: Any, path: str) -> Any: """Resolve a dot-path into ``data`` with ``[N]`` array notation normalised. ``"result.items[0].name"`` → walks ``data["result"]["items"][0]["name"]``. Returns ``None`` at any missing-key / index-out-of-range step so callers can fall back to recording the field with a null value. """ normalised = _BRACKET_RE.sub(r".\1", path) cur: Any = data for part in normalised.split("."): if cur is None: return None if part.isdigit() and isinstance(cur, list): i = int(part) if i < 0 or i >= len(cur): return None cur = cur[i] elif isinstance(cur, dict): cur = cur.get(part) else: return None return cur def _segment_ids_for_citation( citation: SegmentCitation, source_type: SourceType, ) -> list[str]: if source_type == "value": return list(citation.value_segment_ids) # value_and_context return list(citation.value_segment_ids) + list(citation.context_segment_ids) def map_segment_refs_to_provenance( extraction_result: dict[str, Any], segment_citations: list[SegmentCitation], segment_index: SegmentIndex, max_sources_per_field: int, min_confidence: float, # reserved (no-op for MVP) include_bounding_boxes: bool, source_type: SourceType, ) -> ProvenanceData: """Build a :class:`ProvenanceData` from LLM citations and a SegmentIndex.""" # min_confidence is reserved for future use (see spec §2 provenance options). _ = min_confidence fields: dict[str, FieldProvenance] = {} invalid_references = 0 for citation in segment_citations: seg_ids = _segment_ids_for_citation(citation, source_type)[:max_sources_per_field] sources: list[ExtractionSource] = [] for seg_id in seg_ids: pos = segment_index.lookup_segment(seg_id) if pos is None: invalid_references += 1 continue sources.append( ExtractionSource( page_number=pos["page"], file_index=pos.get("file_index"), bounding_box=pos["bbox"] if include_bounding_boxes else None, text_snippet=pos["text"], relevance_score=1.0, segment_id=seg_id, ) ) if not sources: continue value = resolve_nested_path(extraction_result, citation.field_path) fields[citation.field_path] = FieldProvenance( field_name=citation.field_path.split(".")[-1], field_path=citation.field_path, value=value, sources=sources, confidence=None, ) total_fields_in_result = _count_leaf_fields(extraction_result.get("result", {})) coverage_rate: float | None = None if total_fields_in_result > 0: coverage_rate = len(fields) / total_fields_in_result return ProvenanceData( fields=fields, quality_metrics={ "fields_with_provenance": len(fields), "total_fields": total_fields_in_result or None, "coverage_rate": coverage_rate, "invalid_references": invalid_references, }, segment_count=len(segment_index._ordered_ids), granularity=segment_index.granularity, ) def _count_leaf_fields(data: Any) -> int: """Count non-container leaves (str/int/float/Decimal/date/bool/None) recursively.""" if data is None: return 1 if isinstance(data, dict): if not data: return 0 return sum(_count_leaf_fields(v) for v in data.values()) if isinstance(data, list): if not data: return 0 return sum(_count_leaf_fields(v) for v in data) return 1