"""FastAPI app factory + lifespan. ``create_app()`` wires the REST router on top of a lifespan that spawns the worker loop (Task 3.5) and the pg_queue listener (Task 3.6). Tests that don't care about the worker call ``create_app(spawn_worker=False)`` so the lifespan returns cleanly. Task 4.3 fills in the production wiring: * Factories (``make_genai_client`` / ``make_ocr_client``) pick between fakes (``IX_TEST_MODE=fake``) and real Ollama/Surya clients. * ``/healthz`` probes call ``selfcheck()`` on the active clients. In ``fake`` mode they always report ok. * The worker's :class:`Pipeline` is built once per spawn with the real chain of Steps; each call to the injected ``pipeline_factory`` returns a fresh Pipeline so per-request state stays isolated. """ from __future__ import annotations import asyncio from collections.abc import AsyncIterator, Callable from contextlib import asynccontextmanager, suppress from typing import Literal from fastapi import FastAPI from ix.adapters.rest.routes import Probes, get_probes from ix.adapters.rest.routes import router as rest_router from ix.config import AppConfig, get_config from ix.genai import make_genai_client from ix.genai.client import GenAIClient from ix.ocr import make_ocr_client from ix.ocr.client import OCRClient from ix.pipeline.genai_step import GenAIStep from ix.pipeline.ocr_step import OCRStep from ix.pipeline.pipeline import Pipeline from ix.pipeline.reliability_step import ReliabilityStep from ix.pipeline.response_handler_step import ResponseHandlerStep from ix.pipeline.setup_step import SetupStep def build_pipeline( genai: GenAIClient, ocr: OCRClient, cfg: AppConfig ) -> Pipeline: """Assemble the production :class:`Pipeline` with injected clients. Kept as a module-level helper so tests that want to exercise the production wiring (without running the worker) can call it directly. """ from pathlib import Path from ix.ingestion import FetchConfig return Pipeline( steps=[ SetupStep( tmp_dir=Path(cfg.tmp_dir), fetch_config=FetchConfig( connect_timeout_s=float(cfg.file_connect_timeout_seconds), read_timeout_s=float(cfg.file_read_timeout_seconds), max_bytes=cfg.file_max_bytes, ), ), OCRStep(ocr_client=ocr), GenAIStep(genai_client=genai), ReliabilityStep(), ResponseHandlerStep(), ] ) def _make_ollama_probe( genai: GenAIClient, cfg: AppConfig ) -> Callable[[], Literal["ok", "degraded", "fail"]]: """Adapter: async ``selfcheck`` → sync callable the route expects. Always drives the coroutine on a throwaway event loop in a separate thread. This keeps the behavior identical whether the caller holds an event loop (FastAPI request) or doesn't (a CLI tool), and avoids the ``asyncio.run`` vs. already-running-loop footgun. """ def probe() -> Literal["ok", "degraded", "fail"]: if not hasattr(genai, "selfcheck"): return "ok" # fake client — nothing to probe. return _run_async_sync( lambda: genai.selfcheck(expected_model=cfg.default_model), # type: ignore[attr-defined] fallback="fail", ) return probe def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]: def probe() -> Literal["ok", "fail"]: if not hasattr(ocr, "selfcheck"): return "ok" # fake — nothing to probe. return _run_async_sync( lambda: ocr.selfcheck(), # type: ignore[attr-defined] fallback="fail", ) return probe def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def] """Run ``make_coro()`` on a fresh loop in a thread; return its result. The thread owns its own event loop so the caller's loop (if any) keeps running. Any exception collapses to ``fallback``. """ import threading result: dict[str, object] = {} def _runner() -> None: loop = asyncio.new_event_loop() try: result["value"] = loop.run_until_complete(make_coro()) except Exception as exc: # any error collapses to fallback result["error"] = exc finally: loop.close() t = threading.Thread(target=_runner) t.start() t.join() if "error" in result or "value" not in result: return fallback return str(result["value"]) def create_app(*, spawn_worker: bool = True) -> FastAPI: """Construct the ASGI app. Parameters ---------- spawn_worker: When True (default), the lifespan spawns the background worker task and the pg_queue listener. Integration tests that only exercise the REST adapter pass False so jobs pile up as ``pending`` and the tests can assert on their state without a racing worker mutating them. """ @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncIterator[None]: cfg = get_config() # Build the clients once per process. The worker's pipeline # factory closes over these so every job runs through the same # Ollama/Surya instance (Surya's predictors are heavy; re-loading # them per job would be catastrophic). genai_client = make_genai_client(cfg) ocr_client = make_ocr_client(cfg) # Override the route-level probe DI so /healthz reflects the # actual clients. Tests that want canned probes can still override # ``get_probes`` at the TestClient layer. _app.dependency_overrides.setdefault( get_probes, lambda: Probes( ollama=_make_ollama_probe(genai_client, cfg), ocr=_make_ocr_probe(ocr_client), ), ) worker_task = None listener = None if spawn_worker: try: from ix.adapters.pg_queue.listener import ( PgQueueListener, asyncpg_dsn_from_sqlalchemy_url, ) listener = PgQueueListener( dsn=asyncpg_dsn_from_sqlalchemy_url(cfg.postgres_url) ) await listener.start() except Exception: listener = None try: worker_task = await _spawn_production_worker( cfg, genai_client, ocr_client, listener ) except Exception: worker_task = None try: yield finally: if worker_task is not None: worker_task.cancel() with suppress(Exception): await worker_task if listener is not None: with suppress(Exception): await listener.stop() app = FastAPI(lifespan=lifespan, title="infoxtractor", version="0.1.0") app.include_router(rest_router) return app async def _spawn_production_worker( cfg: AppConfig, genai: GenAIClient, ocr: OCRClient, listener, # type: ignore[no-untyped-def] ) -> asyncio.Task[None]: """Spawn the background worker with a production pipeline factory.""" from ix.store.engine import get_session_factory from ix.worker.loop import Worker def pipeline_factory() -> Pipeline: return build_pipeline(genai, ocr, cfg) worker = Worker( session_factory=get_session_factory(), pipeline_factory=pipeline_factory, poll_interval_seconds=10.0, max_running_seconds=2 * cfg.pipeline_request_timeout_seconds, callback_timeout_seconds=cfg.callback_timeout_seconds, wait_for_work=listener.wait_for_work if listener is not None else None, ) stop = asyncio.Event() return asyncio.create_task(worker.run(stop))