From ebefee41841eacbe689eebc147e932c3f9204818 Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 12:09:11 +0200 Subject: [PATCH] =?UTF-8?q?feat(app):=20production=20wiring=20=E2=80=94=20?= =?UTF-8?q?factories,=20pipeline,=20/healthz=20real=20probes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Task 4.3 closes the loop on Chunk 4: the FastAPI lifespan now selects fake vs real clients via IX_TEST_MODE (new AppConfig field), wires /healthz probes to the live selfcheck() on OllamaClient / SuryaOCRClient, and spawns the worker with a production Pipeline factory that builds SetupStep -> OCRStep -> GenAIStep -> ReliabilityStep -> ResponseHandler over the injected clients. Factories: - make_genai_client(cfg) -> FakeGenAIClient | OllamaClient - make_ocr_client(cfg) -> FakeOCRClient | SuryaOCRClient (spec §6.2) Probes run the async selfcheck on a fresh event loop in a short-lived thread so they're safe to call from either sync callers or a live FastAPI handler without stalling the request loop. Drops the worker-loop spawn_worker_task stub — the app module owns the production spawn directly. Tests: +11 unit tests (5 factories + 6 app-wiring / probe adapter / pipeline build). Full suite: 236 passed. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ix/app.py | 184 +++++++++++++++++++++++++++++++--- src/ix/config.py | 8 ++ src/ix/genai/__init__.py | 31 +++++- src/ix/ocr/__init__.py | 31 +++++- src/ix/worker/loop.py | 16 --- tests/unit/test_app_wiring.py | 104 +++++++++++++++++++ tests/unit/test_factories.py | 60 +++++++++++ 7 files changed, 394 insertions(+), 40 deletions(-) create mode 100644 tests/unit/test_app_wiring.py create mode 100644 tests/unit/test_factories.py diff --git a/src/ix/app.py b/src/ix/app.py index ab7a3d6..42aa06a 100644 --- a/src/ix/app.py +++ b/src/ix/app.py @@ -5,21 +5,132 @@ worker loop (Task 3.5) and the pg_queue listener (Task 3.6). Tests that don't care about the worker call ``create_app(spawn_worker=False)`` so the lifespan returns cleanly. -The factory is parameterised (``spawn_worker``) instead of env-gated because -pytest runs multiple app instances per session and we want the decision local -to each call, not inferred from ``IX_*`` variables. The listener is also -gated on ``spawn_worker`` — the listener is only useful when a worker is -draining the queue, so the two share one flag. +Task 4.3 fills in the production wiring: + +* Factories (``make_genai_client`` / ``make_ocr_client``) pick between + fakes (``IX_TEST_MODE=fake``) and real Ollama/Surya clients. +* ``/healthz`` probes call ``selfcheck()`` on the active clients. In + ``fake`` mode they always report ok. +* The worker's :class:`Pipeline` is built once per spawn with the real + chain of Steps; each call to the injected ``pipeline_factory`` returns + a fresh Pipeline so per-request state stays isolated. """ from __future__ import annotations -from collections.abc import AsyncIterator +import asyncio +from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager, suppress +from typing import Literal from fastapi import FastAPI +from ix.adapters.rest.routes import Probes, get_probes from ix.adapters.rest.routes import router as rest_router +from ix.config import AppConfig, get_config +from ix.genai import make_genai_client +from ix.genai.client import GenAIClient +from ix.ocr import make_ocr_client +from ix.ocr.client import OCRClient +from ix.pipeline.genai_step import GenAIStep +from ix.pipeline.ocr_step import OCRStep +from ix.pipeline.pipeline import Pipeline +from ix.pipeline.reliability_step import ReliabilityStep +from ix.pipeline.response_handler_step import ResponseHandlerStep +from ix.pipeline.setup_step import SetupStep + + +def build_pipeline( + genai: GenAIClient, ocr: OCRClient, cfg: AppConfig +) -> Pipeline: + """Assemble the production :class:`Pipeline` with injected clients. + + Kept as a module-level helper so tests that want to exercise the + production wiring (without running the worker) can call it directly. + """ + + from pathlib import Path + + from ix.ingestion import FetchConfig + + return Pipeline( + steps=[ + SetupStep( + tmp_dir=Path(cfg.tmp_dir), + fetch_config=FetchConfig( + connect_timeout_s=float(cfg.file_connect_timeout_seconds), + read_timeout_s=float(cfg.file_read_timeout_seconds), + max_bytes=cfg.file_max_bytes, + ), + ), + OCRStep(ocr_client=ocr), + GenAIStep(genai_client=genai), + ReliabilityStep(), + ResponseHandlerStep(), + ] + ) + + +def _make_ollama_probe( + genai: GenAIClient, cfg: AppConfig +) -> Callable[[], Literal["ok", "degraded", "fail"]]: + """Adapter: async ``selfcheck`` → sync callable the route expects. + + Always drives the coroutine on a throwaway event loop in a separate + thread. This keeps the behavior identical whether the caller holds an + event loop (FastAPI request) or doesn't (a CLI tool), and avoids the + ``asyncio.run`` vs. already-running-loop footgun. + """ + + def probe() -> Literal["ok", "degraded", "fail"]: + if not hasattr(genai, "selfcheck"): + return "ok" # fake client — nothing to probe. + return _run_async_sync( + lambda: genai.selfcheck(expected_model=cfg.default_model), # type: ignore[attr-defined] + fallback="fail", + ) + + return probe + + +def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]: + def probe() -> Literal["ok", "fail"]: + if not hasattr(ocr, "selfcheck"): + return "ok" # fake — nothing to probe. + return _run_async_sync( + lambda: ocr.selfcheck(), # type: ignore[attr-defined] + fallback="fail", + ) + + return probe + + +def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def] + """Run ``make_coro()`` on a fresh loop in a thread; return its result. + + The thread owns its own event loop so the caller's loop (if any) keeps + running. Any exception collapses to ``fallback``. + """ + + import threading + + result: dict[str, object] = {} + + def _runner() -> None: + loop = asyncio.new_event_loop() + try: + result["value"] = loop.run_until_complete(make_coro()) + except Exception as exc: # any error collapses to fallback + result["error"] = exc + finally: + loop.close() + + t = threading.Thread(target=_runner) + t.start() + t.join() + if "error" in result or "value" not in result: + return fallback + return str(result["value"]) def create_app(*, spawn_worker: bool = True) -> FastAPI: @@ -36,21 +147,35 @@ def create_app(*, spawn_worker: bool = True) -> FastAPI: @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncIterator[None]: + cfg = get_config() + + # Build the clients once per process. The worker's pipeline + # factory closes over these so every job runs through the same + # Ollama/Surya instance (Surya's predictors are heavy; re-loading + # them per job would be catastrophic). + genai_client = make_genai_client(cfg) + ocr_client = make_ocr_client(cfg) + + # Override the route-level probe DI so /healthz reflects the + # actual clients. Tests that want canned probes can still override + # ``get_probes`` at the TestClient layer. + _app.dependency_overrides.setdefault( + get_probes, + lambda: Probes( + ollama=_make_ollama_probe(genai_client, cfg), + ocr=_make_ocr_probe(ocr_client), + ), + ) + worker_task = None listener = None if spawn_worker: - # Pipeline factory + listener wiring live in Chunk 4's - # production entrypoint; keeping this path best-effort lets the - # lifespan still start even on a box where Ollama/Surya aren't - # available (the listener just gives us a passive 10 s poll). try: from ix.adapters.pg_queue.listener import ( PgQueueListener, asyncpg_dsn_from_sqlalchemy_url, ) - from ix.config import get_config - cfg = get_config() listener = PgQueueListener( dsn=asyncpg_dsn_from_sqlalchemy_url(cfg.postgres_url) ) @@ -59,10 +184,10 @@ def create_app(*, spawn_worker: bool = True) -> FastAPI: listener = None try: - from ix.worker.loop import spawn_worker_task - - worker_task = await spawn_worker_task(_app) - except ImportError: + worker_task = await _spawn_production_worker( + cfg, genai_client, ocr_client, listener + ) + except Exception: worker_task = None try: yield @@ -78,3 +203,30 @@ def create_app(*, spawn_worker: bool = True) -> FastAPI: app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0") app.include_router(rest_router) return app + + +async def _spawn_production_worker( + cfg: AppConfig, + genai: GenAIClient, + ocr: OCRClient, + listener, # type: ignore[no-untyped-def] +) -> asyncio.Task[None]: + """Spawn the background worker with a production pipeline factory.""" + + from ix.store.engine import get_session_factory + from ix.worker.loop import Worker + + def pipeline_factory() -> Pipeline: + return build_pipeline(genai, ocr, cfg) + + worker = Worker( + session_factory=get_session_factory(), + pipeline_factory=pipeline_factory, + poll_interval_seconds=10.0, + max_running_seconds=2 * cfg.pipeline_request_timeout_seconds, + callback_timeout_seconds=cfg.callback_timeout_seconds, + wait_for_work=listener.wait_for_work if listener is not None else None, + ) + + stop = asyncio.Event() + return asyncio.create_task(worker.run(stop)) diff --git a/src/ix/config.py b/src/ix/config.py index 8cfe322..9740633 100644 --- a/src/ix/config.py +++ b/src/ix/config.py @@ -12,6 +12,7 @@ re-read after ``monkeypatch.setenv``. Production code never clears the cache. from __future__ import annotations from functools import lru_cache +from typing import Literal from pydantic_settings import BaseSettings, SettingsConfigDict @@ -62,6 +63,13 @@ class AppConfig(BaseSettings): # --- Observability --- log_level: str = "INFO" + # --- Test / wiring mode --- + # ``fake``: factories return FakeGenAIClient / FakeOCRClient and + # ``/healthz`` probes report ok. CI sets this so the Forgejo runner + # doesn't need access to Ollama or GPU-backed Surya. ``None`` (default) + # means production wiring: real OllamaClient + SuryaOCRClient. + test_mode: Literal["fake"] | None = None + @lru_cache(maxsize=1) def get_config() -> AppConfig: diff --git a/src/ix/genai/__init__.py b/src/ix/genai/__init__.py index b73c434..f524daf 100644 --- a/src/ix/genai/__init__.py +++ b/src/ix/genai/__init__.py @@ -1,18 +1,43 @@ """GenAI subsystem: protocol + fake client + invocation-result dataclasses. -Real backends (Ollama, etc.) plug in behind :class:`GenAIClient`. The MVP -ships only :class:`FakeGenAIClient` from this package; the real Ollama -client lands in Chunk 4. +Real backends (Ollama, …) plug in behind :class:`GenAIClient`. The factory +:func:`make_genai_client` picks between :class:`FakeGenAIClient` (for CI +/ hermetic tests via ``IX_TEST_MODE=fake``) and :class:`OllamaClient` +(production). Tests that want a real Ollama client anyway can call the +constructor directly. """ from __future__ import annotations +from ix.config import AppConfig from ix.genai.client import GenAIClient, GenAIInvocationResult, GenAIUsage from ix.genai.fake import FakeGenAIClient +from ix.genai.ollama_client import OllamaClient + + +def make_genai_client(cfg: AppConfig) -> GenAIClient: + """Return the :class:`GenAIClient` configured for the current run. + + When ``cfg.test_mode == "fake"`` the fake is returned; the pipeline + callers are expected to override the injected client via DI if they + want a non-default canned response. Otherwise a live + :class:`OllamaClient` bound to ``cfg.ollama_url`` and the per-call + timeout is returned. + """ + + if cfg.test_mode == "fake": + return FakeGenAIClient(parsed=None) + return OllamaClient( + base_url=cfg.ollama_url, + per_call_timeout_s=float(cfg.genai_call_timeout_seconds), + ) + __all__ = [ "FakeGenAIClient", "GenAIClient", "GenAIInvocationResult", "GenAIUsage", + "OllamaClient", + "make_genai_client", ] diff --git a/src/ix/ocr/__init__.py b/src/ix/ocr/__init__.py index 089a48a..016b924 100644 --- a/src/ix/ocr/__init__.py +++ b/src/ix/ocr/__init__.py @@ -1,13 +1,34 @@ -"""OCR subsystem: protocol + fake client. +"""OCR subsystem: protocol + fake + real Surya client + factory. -Real engines (Surya, Azure DI, …) plug in behind :class:`OCRClient`. The -MVP ships only :class:`FakeOCRClient` from this package; the real Surya -client lands in Chunk 4. +Real engines (Surya today, Azure DI / AWS Textract … deferred) plug in +behind :class:`OCRClient`. The factory :func:`make_ocr_client` picks +between :class:`FakeOCRClient` (when ``IX_TEST_MODE=fake``) and +:class:`SuryaOCRClient` (production). Unknown engine names raise so a +typo'd ``IX_OCR_ENGINE`` surfaces at startup, not later. """ from __future__ import annotations +from ix.config import AppConfig +from ix.contracts.response import OCRDetails, OCRResult from ix.ocr.client import OCRClient from ix.ocr.fake import FakeOCRClient +from ix.ocr.surya_client import SuryaOCRClient -__all__ = ["FakeOCRClient", "OCRClient"] + +def make_ocr_client(cfg: AppConfig) -> OCRClient: + """Return the :class:`OCRClient` configured for the current run.""" + + if cfg.test_mode == "fake": + return FakeOCRClient(canned=OCRResult(result=OCRDetails())) + if cfg.ocr_engine == "surya": + return SuryaOCRClient() + raise ValueError(f"Unknown ocr_engine: {cfg.ocr_engine!r}") + + +__all__ = [ + "FakeOCRClient", + "OCRClient", + "SuryaOCRClient", + "make_ocr_client", +] diff --git a/src/ix/worker/loop.py b/src/ix/worker/loop.py index 62c07ff..15eaa7f 100644 --- a/src/ix/worker/loop.py +++ b/src/ix/worker/loop.py @@ -28,8 +28,6 @@ from collections.abc import Callable from datetime import UTC, datetime from typing import TYPE_CHECKING -from fastapi import FastAPI - from ix.contracts.response import ResponseIX from ix.errors import IXErrorCode, IXException from ix.pipeline.pipeline import Pipeline @@ -179,17 +177,3 @@ class Worker: await session.commit() -async def spawn_worker_task(app: FastAPI): # type: ignore[no-untyped-def] - """Hook called from the FastAPI lifespan (Task 3.4). - - This module-level async function is here so ``ix.app`` can import it - lazily without the app factory depending on the worker at import time. - Production wiring (Chunk 4) constructs a real Pipeline; for now we - build a no-op pipeline so the import chain completes. Tests that need - the worker wire their own Worker explicitly. - """ - - # NOTE: the real spawn is done by explicit test fixtures / a production - # wiring layer in Chunk 4. We return None so the lifespan's cleanup - # branch is a no-op; the app still runs REST fine without a worker. - return None diff --git a/tests/unit/test_app_wiring.py b/tests/unit/test_app_wiring.py new file mode 100644 index 0000000..83c9105 --- /dev/null +++ b/tests/unit/test_app_wiring.py @@ -0,0 +1,104 @@ +"""Tests for ``ix.app`` lifespan / probe wiring (Task 4.3). + +The lifespan selects fake clients when ``IX_TEST_MODE=fake`` and exposes +their probes via the route DI hook. These tests exercise the probe +adapter in isolation — no DB, no real Ollama/Surya. +""" + +from __future__ import annotations + +from typing import Literal + +from ix.app import _make_ocr_probe, _make_ollama_probe, build_pipeline +from ix.config import AppConfig +from ix.genai.fake import FakeGenAIClient +from ix.ocr.fake import FakeOCRClient +from ix.pipeline.genai_step import GenAIStep +from ix.pipeline.ocr_step import OCRStep +from ix.pipeline.pipeline import Pipeline +from ix.pipeline.reliability_step import ReliabilityStep +from ix.pipeline.response_handler_step import ResponseHandlerStep +from ix.pipeline.setup_step import SetupStep + + +def _cfg(**overrides: object) -> AppConfig: + return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg] + + +class _SelfcheckOllamaClient: + async def invoke(self, *a: object, **kw: object) -> object: + raise NotImplementedError + + async def selfcheck( + self, expected_model: str + ) -> Literal["ok", "degraded", "fail"]: + self.called_with = expected_model + return "ok" + + +class _SelfcheckOCRClient: + async def ocr(self, *a: object, **kw: object) -> object: + raise NotImplementedError + + async def selfcheck(self) -> Literal["ok", "fail"]: + return "ok" + + +class _BrokenSelfcheckOllama: + async def invoke(self, *a: object, **kw: object) -> object: + raise NotImplementedError + + async def selfcheck( + self, expected_model: str + ) -> Literal["ok", "degraded", "fail"]: + raise RuntimeError("boom") + + +class TestOllamaProbe: + def test_fake_client_without_selfcheck_reports_ok(self) -> None: + cfg = _cfg(test_mode="fake", default_model="gpt-oss:20b") + probe = _make_ollama_probe(FakeGenAIClient(parsed=None), cfg) + assert probe() == "ok" + + def test_real_selfcheck_returns_its_verdict(self) -> None: + cfg = _cfg(default_model="gpt-oss:20b") + client = _SelfcheckOllamaClient() + probe = _make_ollama_probe(client, cfg) # type: ignore[arg-type] + assert probe() == "ok" + assert client.called_with == "gpt-oss:20b" + + def test_selfcheck_exception_falls_back_to_fail(self) -> None: + cfg = _cfg(default_model="gpt-oss:20b") + probe = _make_ollama_probe(_BrokenSelfcheckOllama(), cfg) # type: ignore[arg-type] + assert probe() == "fail" + + +class TestOCRProbe: + def test_fake_client_without_selfcheck_reports_ok(self) -> None: + from ix.contracts.response import OCRDetails, OCRResult + + probe = _make_ocr_probe(FakeOCRClient(canned=OCRResult(result=OCRDetails()))) + assert probe() == "ok" + + def test_real_selfcheck_returns_its_verdict(self) -> None: + probe = _make_ocr_probe(_SelfcheckOCRClient()) # type: ignore[arg-type] + assert probe() == "ok" + + +class TestBuildPipeline: + def test_assembles_all_five_steps_in_order(self) -> None: + from ix.contracts.response import OCRDetails, OCRResult + + genai = FakeGenAIClient(parsed=None) + ocr = FakeOCRClient(canned=OCRResult(result=OCRDetails())) + cfg = _cfg(test_mode="fake") + pipeline = build_pipeline(genai, ocr, cfg) + assert isinstance(pipeline, Pipeline) + steps = pipeline._steps # type: ignore[attr-defined] + assert [type(s) for s in steps] == [ + SetupStep, + OCRStep, + GenAIStep, + ReliabilityStep, + ResponseHandlerStep, + ] diff --git a/tests/unit/test_factories.py b/tests/unit/test_factories.py new file mode 100644 index 0000000..b7cafd6 --- /dev/null +++ b/tests/unit/test_factories.py @@ -0,0 +1,60 @@ +"""Tests for the GenAI + OCR factories (Task 4.3). + +The factories pick between fake and real clients based on +``IX_TEST_MODE``. CI runs with ``IX_TEST_MODE=fake``, production runs +without — so the selection knob is the one lever between hermetic CI and +real clients. +""" + +from __future__ import annotations + +from ix.config import AppConfig +from ix.genai import make_genai_client +from ix.genai.fake import FakeGenAIClient +from ix.genai.ollama_client import OllamaClient +from ix.ocr import make_ocr_client +from ix.ocr.fake import FakeOCRClient +from ix.ocr.surya_client import SuryaOCRClient + + +def _cfg(**overrides: object) -> AppConfig: + """Build an AppConfig without loading the repo's .env.example.""" + return AppConfig(_env_file=None, **overrides) # type: ignore[call-arg] + + +class TestGenAIFactory: + def test_fake_mode_returns_fake(self) -> None: + cfg = _cfg(test_mode="fake") + client = make_genai_client(cfg) + assert isinstance(client, FakeGenAIClient) + + def test_production_returns_ollama_with_configured_url(self) -> None: + cfg = _cfg( + test_mode=None, + ollama_url="http://ollama.host:11434", + genai_call_timeout_seconds=42, + ) + client = make_genai_client(cfg) + assert isinstance(client, OllamaClient) + # Inspect the private attrs for binding correctness. + assert client._base_url == "http://ollama.host:11434" + assert client._per_call_timeout_s == 42 + + +class TestOCRFactory: + def test_fake_mode_returns_fake(self) -> None: + cfg = _cfg(test_mode="fake") + client = make_ocr_client(cfg) + assert isinstance(client, FakeOCRClient) + + def test_production_surya_returns_surya(self) -> None: + cfg = _cfg(test_mode=None, ocr_engine="surya") + client = make_ocr_client(cfg) + assert isinstance(client, SuryaOCRClient) + + def test_unknown_engine_raises(self) -> None: + cfg = _cfg(test_mode=None, ocr_engine="tesseract") + import pytest + + with pytest.raises(ValueError, match="ocr_engine"): + make_ocr_client(cfg)