diff --git a/AGENTS.md b/AGENTS.md index 292cb19..b6f1d48 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -8,6 +8,8 @@ Status: MVP deployed (2026-04-18) at `http://192.168.68.42:8994` — LAN only. B Use cases: the built-in registry lives in `src/ix/use_cases/__init__.py` (`bank_statement_header` for MVP). Callers without a registered entry can ship an ad-hoc schema inline via `RequestIX.use_case_inline` (see README "Ad-hoc use cases"); the pipeline builds the Pydantic classes on the fly per request. The `/ui` page exposes this as a "custom" option so non-engineering users can experiment without a deploy. +UX notes: the `/ui` job page surfaces queue position + elapsed MM:SS on each poll, renders the client-provided filename (stored via `FileRef.display_name`, optional metadata — the pipeline ignores it for execution), and shows a CPU-mode notice when `/healthz` reports `ocr_gpu: false`. + ## Guiding Principles - **On-prem always.** All LLM inference, OCR, and user-data processing run on the home server (192.168.68.42). No cloud APIs — OpenAI, Anthropic, Azure, AWS Bedrock/Textract, Google Document AI, Mistral, etc. are not to be used for user data or inference. LLM backend is Ollama (:11434); OCR runs locally (pluggable `OCRClient` interface, first engine: Surya on the RTX 3090); job state lives in local Postgres on the postgis container. The spec's references to Azure / AWS / OpenAI are examples to *replace*, not inherit. diff --git a/README.md b/README.md index 0bac34a..39a0237 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,15 @@ uv run pytest tests/unit -v # hermetic unit + integration sui IX_TEST_OLLAMA=1 uv run pytest tests/live -v # needs LAN access to Ollama + GPU ``` +### UI queue + progress UX + +The `/ui` job page polls `GET /ui/jobs/{id}/fragment` every 2 s and surfaces: + +- **Queue position** while pending: "Queue position: N ahead — M jobs total in flight (single worker)" so it's obvious a new submission is waiting on an earlier job rather than stuck. "About to start" when the worker has just freed up. +- **Elapsed time** while running ("Running for MM:SS") and on finish ("Finished in MM:SS"). +- **Original filename** — the UI stashes the client-provided upload name in `FileRef.display_name` so the browser shows `your_statement.pdf` instead of the on-disk UUID. +- **CPU-mode notice** when `/healthz` reports `ocr_gpu: false` (the Surya OCR client observed `torch.cuda.is_available() == False`): a collapsed `
` pointing at the deployment runbook. + ## Deploying ```bash diff --git a/docs/superpowers/specs/2026-04-18-ix-mvp-design.md b/docs/superpowers/specs/2026-04-18-ix-mvp-design.md index 06926ed..15100f6 100644 --- a/docs/superpowers/specs/2026-04-18-ix-mvp-design.md +++ b/docs/superpowers/specs/2026-04-18-ix-mvp-design.md @@ -85,6 +85,7 @@ class FileRef(BaseModel): url: str # http(s):// or file:// headers: dict[str, str] = {} # e.g. {"Authorization": "Token …"} max_bytes: Optional[int] = None # per-file override; defaults to IX_FILE_MAX_BYTES + display_name: Optional[str] = None # UI-only metadata; client-provided filename for display (pipeline ignores) class Options(BaseModel): ocr: OCROptions = OCROptions() @@ -225,14 +226,15 @@ Callers that prefer direct SQL (the `pg_queue_adapter` contract): insert a row w | `POST` | `/jobs` | Body = `RequestIX` (+ optional `callback_url`). → `201 {job_id, ix_id, status: "pending"}`. Idempotent on `(ix_client_id, request_id)` — same pair returns the existing `job_id` with `200`. | | `GET` | `/jobs/{job_id}` | → full `Job`. Source of truth regardless of submission path or callback outcome. | | `GET` | `/jobs?client_id=…&request_id=…` | Lookup-by-correlation (caller idempotency helper). The pair is UNIQUE in the table → at most one match. Returns the job or `404`. | -| `GET` | `/healthz` | `{postgres, ollama, ocr}`. See below for semantics. Used by `infrastructure` monitoring dashboard. | +| `GET` | `/healthz` | `{postgres, ollama, ocr, ocr_gpu}`. See below for semantics. Used by `infrastructure` monitoring dashboard. `ocr_gpu` is additive metadata (not part of the gate). | | `GET` | `/metrics` | Counters over the last 24 hours: `jobs_pending`, `jobs_running`, `jobs_done_24h`, `jobs_error_24h`, per-use-case avg seconds over the same window. Plain JSON, no Prometheus format for MVP. | **`/healthz` semantics:** - `postgres`: `SELECT 1` on the job store pool; `ok` iff the query returns within 2 s. - `ollama`: `GET {IX_OLLAMA_URL}/api/tags` within 5 s; `ok` iff reachable AND the default model (`IX_DEFAULT_MODEL`) is listed in the tags response; `degraded` iff reachable but the model is missing (ops action: run `ollama pull ` on the host); `fail` on any other error. - `ocr`: `SuryaOCRClient.selfcheck()` — returns `ok` iff CUDA is available and the Surya text-recognition model is loaded into GPU memory at process start. `fail` on any error. -- Overall HTTP status: `200` iff all three are `ok`; `503` otherwise. The monitoring dashboard only surfaces `200`/`non-200`. +- `ocr_gpu`: `true | false | null`. Additive metadata: reports whether the OCR client observed `torch.cuda.is_available() == True` at first warm-up. `null` means not yet probed (fresh process, fake client, etc.). The UI reads this to surface a CPU-mode slowdown notice; never part of the 200/503 gate. +- Overall HTTP status: `200` iff all three core statuses (`postgres`, `ollama`, `ocr`) are `ok`; `503` otherwise. `ocr_gpu` does not affect the gate. The monitoring dashboard only surfaces `200`/`non-200`. **Callback delivery** (when `callback_url` is set): one POST of the full `Job` body, 10 s timeout. 2xx → `callback_status='delivered'`. Anything else → `'failed'`. No retry. Callers always have `GET /jobs/{id}` as the authoritative fallback. diff --git a/src/ix/adapters/rest/routes.py b/src/ix/adapters/rest/routes.py index 912b9b4..f4a555b 100644 --- a/src/ix/adapters/rest/routes.py +++ b/src/ix/adapters/rest/routes.py @@ -18,7 +18,7 @@ from __future__ import annotations import asyncio from collections.abc import Callable -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import UTC, datetime, timedelta from typing import Annotated, Literal from uuid import UUID @@ -44,10 +44,15 @@ class Probes: keeping them sync lets tests pass plain lambdas. Real probes that need async work run the call through ``asyncio.run_in_executor`` inside the callable (Chunk 4). + + ``ocr_gpu`` is additive metadata for the UI (not a health gate): returns + ``True`` iff the OCR client reports CUDA is available, ``False`` for + explicit CPU-mode, ``None`` if unknown (fake client, not yet warmed up). """ ollama: Callable[[], Literal["ok", "degraded", "fail"]] ocr: Callable[[], Literal["ok", "fail"]] + ocr_gpu: Callable[[], bool | None] = field(default=lambda: None) def get_session_factory_dep() -> async_sessionmaker[AsyncSession]: @@ -163,8 +168,16 @@ async def healthz( except Exception: ocr_state = "fail" + try: + ocr_gpu_state: bool | None = probes.ocr_gpu() + except Exception: + ocr_gpu_state = None + body = HealthStatus( - postgres=postgres_state, ollama=ollama_state, ocr=ocr_state + postgres=postgres_state, + ollama=ollama_state, + ocr=ocr_state, + ocr_gpu=ocr_gpu_state, ) if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok": response.status_code = 503 diff --git a/src/ix/adapters/rest/schemas.py b/src/ix/adapters/rest/schemas.py index cc57f78..d6734b6 100644 --- a/src/ix/adapters/rest/schemas.py +++ b/src/ix/adapters/rest/schemas.py @@ -28,9 +28,15 @@ class HealthStatus(BaseModel): """Body of GET /healthz. Each field reports per-subsystem state. Overall HTTP status is 200 iff - every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"`` - when the backend is reachable but the default model isn't pulled — - monitoring surfaces that as non-200. + every of the three core status keys is ``"ok"`` (spec §5). ``ollama`` can + be ``"degraded"`` when the backend is reachable but the default model + isn't pulled — monitoring surfaces that as non-200. + + ``ocr_gpu`` is additive metadata, not part of the health gate: it reports + whether the Surya OCR client observed ``torch.cuda.is_available() == True`` + on first warm-up. ``None`` means we haven't probed yet (fresh process, + fake client, or warm_up hasn't happened). The UI reads this to surface a + CPU-mode slowdown warning to users. """ model_config = ConfigDict(extra="forbid") @@ -38,6 +44,7 @@ class HealthStatus(BaseModel): postgres: Literal["ok", "fail"] ollama: Literal["ok", "degraded", "fail"] ocr: Literal["ok", "fail"] + ocr_gpu: bool | None = None class MetricsResponse(BaseModel): diff --git a/src/ix/app.py b/src/ix/app.py index 79c921a..a47f31f 100644 --- a/src/ix/app.py +++ b/src/ix/app.py @@ -108,6 +108,20 @@ def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]: return probe +def _make_ocr_gpu_probe(ocr: OCRClient) -> Callable[[], bool | None]: + """Adapter: read the OCR client's recorded ``gpu_available`` attribute. + + The attribute is set by :meth:`SuryaOCRClient.warm_up` on first load. + Returns ``None`` when the client has no such attribute (e.g. FakeOCRClient + in test mode) or warm_up hasn't happened yet. Never raises. + """ + + def probe() -> bool | None: + return getattr(ocr, "gpu_available", None) + + return probe + + def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def] """Run ``make_coro()`` on a fresh loop in a thread; return its result. @@ -167,6 +181,7 @@ def create_app(*, spawn_worker: bool = True) -> FastAPI: lambda: Probes( ollama=_make_ollama_probe(genai_client, cfg), ocr=_make_ocr_probe(ocr_client), + ocr_gpu=_make_ocr_gpu_probe(ocr_client), ), ) diff --git a/src/ix/contracts/request.py b/src/ix/contracts/request.py index c11e113..fca9579 100644 --- a/src/ix/contracts/request.py +++ b/src/ix/contracts/request.py @@ -22,6 +22,11 @@ class FileRef(BaseModel): Used when the file URL needs authentication (e.g. Paperless ``Token``) or a tighter size cap than :envvar:`IX_FILE_MAX_BYTES`. Plain URLs that need no headers can stay as bare ``str`` values in :attr:`Context.files`. + + ``display_name`` is pure UI metadata — the pipeline never consults it for + execution. When the UI uploads a PDF under a random ``{uuid}.pdf`` name on + disk, it stashes the client-provided filename here so the browser can + surface "your_statement.pdf" instead of "8f3a...pdf" back to the user. """ model_config = ConfigDict(extra="forbid") @@ -29,6 +34,7 @@ class FileRef(BaseModel): url: str headers: dict[str, str] = Field(default_factory=dict) max_bytes: int | None = None + display_name: str | None = None class Context(BaseModel): diff --git a/src/ix/ocr/surya_client.py b/src/ix/ocr/surya_client.py index b277be9..9862f51 100644 --- a/src/ix/ocr/surya_client.py +++ b/src/ix/ocr/surya_client.py @@ -48,6 +48,11 @@ class SuryaOCRClient: def __init__(self) -> None: self._recognition_predictor: Any = None self._detection_predictor: Any = None + # ``None`` until warm_up() has run at least once. After that it's the + # observed value of ``torch.cuda.is_available()`` at load time. We + # cache it on the instance so ``/healthz`` / the UI can surface a + # CPU-mode warning without re-probing torch each request. + self.gpu_available: bool | None = None def warm_up(self) -> None: """Load the detection + recognition predictors. Idempotent. @@ -72,6 +77,18 @@ class SuryaOCRClient: self._recognition_predictor = RecognitionPredictor(foundation) self._detection_predictor = DetectionPredictor() + # Best-effort CUDA probe — only after predictors loaded cleanly so we + # know torch is fully importable. ``torch`` is a Surya transitive + # dependency so if we got this far it's on sys.path. We swallow any + # exception to keep warm_up() sturdy: the attribute stays None and the + # UI falls back to "unknown" gracefully. + try: + import torch # type: ignore[import-not-found] + + self.gpu_available = bool(torch.cuda.is_available()) + except Exception: + self.gpu_available = None + async def ocr( self, pages: list[Page], diff --git a/src/ix/store/jobs_repo.py b/src/ix/store/jobs_repo.py index 7f7d368..f3bc0de 100644 --- a/src/ix/store/jobs_repo.py +++ b/src/ix/store/jobs_repo.py @@ -157,6 +157,76 @@ async def get(session: AsyncSession, job_id: UUID) -> Job | None: return _orm_to_job(row) if row is not None else None +async def queue_position( + session: AsyncSession, job_id: UUID +) -> tuple[int, int]: + """Return ``(ahead, total_active)`` for a pending/running job. + + ``ahead`` counts active jobs (``pending`` or ``running``) that would be + claimed by the worker before this one: + + * any ``running`` job is always ahead — it has the worker already. + * other ``pending`` jobs with a strictly older ``created_at`` are ahead + (the worker picks pending rows in ``ORDER BY created_at`` per + :func:`claim_next_pending`). + + ``total_active`` is the total count of ``pending`` + ``running`` rows. + + Terminal jobs (``done`` / ``error``) always return ``(0, 0)`` — there is + no meaningful "position" for a finished job. + """ + + row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id)) + if row is None: + return (0, 0) + if row.status not in ("pending", "running"): + return (0, 0) + + total_active = int( + await session.scalar( + select(func.count()) + .select_from(IxJob) + .where(IxJob.status.in_(("pending", "running"))) + ) + or 0 + ) + + if row.status == "running": + # A running row is at the head of the queue for our purposes. + return (0, total_active) + + # Pending: count running rows (always ahead) + older pending rows. + # We tiebreak on ``job_id`` for deterministic ordering when multiple + # rows share a ``created_at`` (e.g. the same transaction inserts two + # jobs, which Postgres stamps with identical ``now()`` values). + running_ahead = int( + await session.scalar( + select(func.count()) + .select_from(IxJob) + .where(IxJob.status == "running") + ) + or 0 + ) + pending_ahead = int( + await session.scalar( + select(func.count()) + .select_from(IxJob) + .where( + IxJob.status == "pending", + ( + (IxJob.created_at < row.created_at) + | ( + (IxJob.created_at == row.created_at) + & (IxJob.job_id < row.job_id) + ) + ), + ) + ) + or 0 + ) + return (running_ahead + pending_ahead, total_active) + + async def get_by_correlation( session: AsyncSession, client_id: str, request_id: str ) -> Job | None: diff --git a/src/ix/ui/routes.py b/src/ix/ui/routes.py index 4d7daf2..c76c71e 100644 --- a/src/ix/ui/routes.py +++ b/src/ix/ui/routes.py @@ -15,14 +15,20 @@ Design notes: pending/running, the fragment auto-refreshes every 2s via ``hx-trigger="every 2s"``; when terminal, the trigger is dropped and the pretty-printed response is rendered with highlight.js. +* A process-wide 60-second cache of the OCR GPU flag (read from the + injected :class:`Probes`) gates a "Surya is running on CPU" notice on + the fragment. The fragment is polled every 2 s; re-probing the OCR + client on every poll is waste — one probe per minute is plenty. """ from __future__ import annotations import json +import time import uuid from pathlib import Path from typing import Annotated +from urllib.parse import unquote, urlsplit from uuid import UUID import aiofiles @@ -39,7 +45,7 @@ from fastapi.responses import HTMLResponse, RedirectResponse, Response from fastapi.templating import Jinja2Templates from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker -from ix.adapters.rest.routes import get_session_factory_dep +from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep from ix.config import AppConfig, get_config from ix.contracts.request import ( Context, @@ -58,6 +64,12 @@ from ix.use_cases import REGISTRY TEMPLATES_DIR = Path(__file__).parent / "templates" STATIC_DIR = Path(__file__).parent / "static" +# Module-level cache for the OCR GPU flag. The tuple is ``(value, expires_at)`` +# where ``expires_at`` is a monotonic-clock deadline. A per-request call to +# :func:`_cached_ocr_gpu` re-probes only once the deadline has passed. +_OCR_GPU_CACHE: tuple[bool | None, float] = (None, 0.0) +_OCR_GPU_TTL_SECONDS = 60.0 + def _templates() -> Jinja2Templates: """One Jinja env per process; cheap enough to build per DI call.""" @@ -73,6 +85,52 @@ def _ui_tmp_dir(cfg: AppConfig) -> Path: return d +def _cached_ocr_gpu(probes: Probes) -> bool | None: + """Read the cached OCR GPU flag, re-probing if the TTL has elapsed. + + Used by the index + fragment routes so the HTMX poll loop doesn't hit + the OCR client's torch-probe every 2 seconds. Falls back to ``None`` + (unknown) on any probe error. + """ + + global _OCR_GPU_CACHE + value, expires_at = _OCR_GPU_CACHE + now = time.monotonic() + if now >= expires_at: + try: + value = probes.ocr_gpu() + except Exception: + value = None + _OCR_GPU_CACHE = (value, now + _OCR_GPU_TTL_SECONDS) + return value + + +def _file_display_entries( + request: RequestIX | None, +) -> list[str]: + """Human-readable filename(s) for a request's context.files. + + Prefers :attr:`FileRef.display_name`. Falls back to the URL's basename + (``unquote``ed so ``%20`` → space). Plain string entries use the same + basename rule. Empty list for a request with no files. + """ + + if request is None: + return [] + out: list[str] = [] + for entry in request.context.files: + if isinstance(entry, FileRef): + if entry.display_name: + out.append(entry.display_name) + continue + url = entry.url + else: + url = entry + basename = unquote(urlsplit(url).path.rsplit("/", 1)[-1]) or url + out.append(basename) + return out + + def build_router() -> APIRouter: """Return a fresh router. Kept as a factory so :mod:`ix.app` can wire DI.""" @@ -80,7 +138,10 @@ def build_router() -> APIRouter: @router.get("", response_class=HTMLResponse) @router.get("/", response_class=HTMLResponse) - async def index(request: Request) -> Response: + async def index( + request: Request, + probes: Annotated[Probes, Depends(get_probes)], + ) -> Response: tpl = _templates() return tpl.TemplateResponse( request, @@ -90,6 +151,8 @@ def build_router() -> APIRouter: "job": None, "form_error": None, "form_values": {}, + "file_names": [], + "cpu_mode": _cached_ocr_gpu(probes) is False, }, ) @@ -100,6 +163,7 @@ def build_router() -> APIRouter: session_factory: Annotated[ async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) ], + probes: Annotated[Probes, Depends(get_probes)], ) -> Response: async with session_factory() as session: job = await jobs_repo.get(session, job_id) @@ -114,6 +178,8 @@ def build_router() -> APIRouter: "job": job, "form_error": None, "form_values": {}, + "file_names": _file_display_entries(job.request), + "cpu_mode": _cached_ocr_gpu(probes) is False, }, ) @@ -124,11 +190,16 @@ def build_router() -> APIRouter: session_factory: Annotated[ async_sessionmaker[AsyncSession], Depends(get_session_factory_dep) ], + probes: Annotated[Probes, Depends(get_probes)], ) -> Response: async with session_factory() as session: job = await jobs_repo.get(session, job_id) - if job is None: - raise HTTPException(status_code=404, detail="job not found") + if job is None: + raise HTTPException(status_code=404, detail="job not found") + ahead, total_active = await jobs_repo.queue_position( + session, job_id + ) + response_json: str | None = None if job.response is not None: response_json = json.dumps( @@ -137,11 +208,23 @@ def build_router() -> APIRouter: sort_keys=True, default=str, ) + + elapsed_text = _format_elapsed(job) + file_names = _file_display_entries(job.request) + tpl = _templates() return tpl.TemplateResponse( request, "job_fragment.html", - {"job": job, "response_json": response_json}, + { + "job": job, + "response_json": response_json, + "ahead": ahead, + "total_active": total_active, + "elapsed_text": elapsed_text, + "file_names": file_names, + "cpu_mode": _cached_ocr_gpu(probes) is False, + }, ) @router.post("/jobs") @@ -256,6 +339,12 @@ def build_router() -> APIRouter: ctx_texts = [texts.strip()] req_id = request_id.strip() or uuid.uuid4().hex + # Preserve the client-provided filename so the UI can surface the + # original name to the user (the on-disk name is a UUID). Strip any + # path prefix a browser included. + original_name = (pdf.filename or "").rsplit("/", 1)[-1].rsplit( + "\\", 1 + )[-1] or None try: request_ix = RequestIX( use_case=use_case_name or "adhoc", @@ -263,7 +352,12 @@ def build_router() -> APIRouter: ix_client_id=(ix_client_id.strip() or "ui"), request_id=req_id, context=Context( - files=[FileRef(url=f"file://{target.resolve()}")], + files=[ + FileRef( + url=f"file://{target.resolve()}", + display_name=original_name, + ) + ], texts=ctx_texts, ), options=Options( @@ -305,3 +399,30 @@ def _flag(value: str, *, default: bool) -> bool: if value == "": return default return value.lower() in ("on", "true", "1", "yes") + + +def _format_elapsed(job) -> str | None: # type: ignore[no-untyped-def] + """Render a ``MM:SS`` elapsed string for the fragment template. + + * running → time since ``started_at`` + * done/error → ``finished_at - created_at`` (total wall-clock including + queue time) + * pending / missing timestamps → ``None`` (template omits the line) + """ + + from datetime import UTC, datetime + + def _fmt(seconds: float) -> str: + s = max(0, int(seconds)) + return f"{s // 60:02d}:{s % 60:02d}" + + if job.status == "running" and job.started_at is not None: + now = datetime.now(UTC) + return _fmt((now - job.started_at).total_seconds()) + if ( + job.status in ("done", "error") + and job.finished_at is not None + and job.created_at is not None + ): + return _fmt((job.finished_at - job.created_at).total_seconds()) + return None diff --git a/src/ix/ui/templates/index.html b/src/ix/ui/templates/index.html index 0459327..f35684a 100644 --- a/src/ix/ui/templates/index.html +++ b/src/ix/ui/templates/index.html @@ -3,7 +3,9 @@ - infoxtractor UI + + InfoXtractor{% if job %} — job {{ job.job_id }}{% endif %} +
+ +

infoxtractor

Drop a PDF, pick or define a use case, run the pipeline.

@@ -35,6 +78,7 @@ {% endif %} + {% if not job %}
Submit
+ {% endif %} {% if job %}
Job {{ job.job_id }}
ix_id: {{ job.ix_id }} + {% if file_names %} +
+ File{% if file_names|length > 1 %}s{% endif %}: + {% for name in file_names %} + {{ name }}{% if not loop.last %}, {% endif %} + {% endfor %} + + {% endif %}
- Loading… + Loading…
{% endif %} diff --git a/src/ix/ui/templates/job_fragment.html b/src/ix/ui/templates/job_fragment.html index 645adb2..9d2352b 100644 --- a/src/ix/ui/templates/job_fragment.html +++ b/src/ix/ui/templates/job_fragment.html @@ -9,16 +9,69 @@ hx-swap="outerHTML" {% endif %} > -

- Status: {{ job.status }} - {% if not terminal %} - - {% endif %} -

+
+
+ Job status +
- {% if terminal and response_json %} -
{{ response_json }}
- {% elif terminal %} -

No response body.

- {% endif %} +

+ Status: + {{ job.status }} + {% if not terminal %} + + {% endif %} +

+ + {% if file_names %} +

+ File{% if file_names|length > 1 %}s{% endif %}: + {% for name in file_names %} + {{ name }}{% if not loop.last %}, {% endif %} + {% endfor %} +

+ {% endif %} + + {% if job.status == "pending" %} +

+ {% if ahead == 0 %} + About to start — the worker just freed up. + {% else %} + Queue position: {{ ahead }} ahead — {{ total_active }} job{% if total_active != 1 %}s{% endif %} total in flight (single worker). + {% endif %} +

+ + {% elif job.status == "running" %} + {% if elapsed_text %} +

Running for {{ elapsed_text }}.

+ {% endif %} + + {% elif terminal %} + {% if elapsed_text %} +

Finished in {{ elapsed_text }}.

+ {% endif %} + {% endif %} + + {% if cpu_mode and not terminal %} +
+ Surya is running on CPU (~1–2 min/page) +

+ A host NVIDIA driver upgrade would unlock GPU extraction; tracked in + docs/deployment.md. +

+
+ {% endif %} +
+ +
+
+ Result +
+ {% if terminal and response_json %} +
{{ response_json }}
+ {% elif terminal %} +

No response body.

+ {% else %} +

Waiting for the pipeline to finish…

+ {% endif %} +
diff --git a/tests/integration/test_jobs_repo.py b/tests/integration/test_jobs_repo.py index e6be127..0b9cc08 100644 --- a/tests/integration/test_jobs_repo.py +++ b/tests/integration/test_jobs_repo.py @@ -341,6 +341,117 @@ async def test_sweep_orphans_leaves_fresh_running_alone( assert after.status == "running" +async def test_queue_position_pending_only( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """Three pending rows in insertion order → positions 0, 1, 2; total 3. + + Each row is committed in its own transaction so the DB stamps a + distinct ``created_at`` per row (``now()`` is transaction-stable). + """ + + async with session_factory() as session: + a = await jobs_repo.insert_pending( + session, _make_request("c", "qp-a"), callback_url=None + ) + await session.commit() + async with session_factory() as session: + b = await jobs_repo.insert_pending( + session, _make_request("c", "qp-b"), callback_url=None + ) + await session.commit() + async with session_factory() as session: + c = await jobs_repo.insert_pending( + session, _make_request("c", "qp-c"), callback_url=None + ) + await session.commit() + + async with session_factory() as session: + pa = await jobs_repo.queue_position(session, a.job_id) + pb = await jobs_repo.queue_position(session, b.job_id) + pc = await jobs_repo.queue_position(session, c.job_id) + + # All three active; total == 3. + assert pa == (0, 3) + assert pb == (1, 3) + assert pc == (2, 3) + + +async def test_queue_position_running_plus_pending( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """One running + two pending → running:(0,3), next:(1,3), last:(2,3).""" + + async with session_factory() as session: + first = await jobs_repo.insert_pending( + session, _make_request("c", "qp-r-1"), callback_url=None + ) + await session.commit() + async with session_factory() as session: + second = await jobs_repo.insert_pending( + session, _make_request("c", "qp-r-2"), callback_url=None + ) + await session.commit() + async with session_factory() as session: + third = await jobs_repo.insert_pending( + session, _make_request("c", "qp-r-3"), callback_url=None + ) + await session.commit() + + # Claim the first → it becomes running. + async with session_factory() as session: + claimed = await jobs_repo.claim_next_pending(session) + await session.commit() + assert claimed is not None + assert claimed.job_id == first.job_id + + async with session_factory() as session: + p_running = await jobs_repo.queue_position(session, first.job_id) + p_second = await jobs_repo.queue_position(session, second.job_id) + p_third = await jobs_repo.queue_position(session, third.job_id) + + # Running row reports 0 ahead (itself is the head). + assert p_running == (0, 3) + # Second pending: running is ahead (1) + zero older pendings. + assert p_second == (1, 3) + # Third pending: running ahead + one older pending. + assert p_third == (2, 3) + + +async def test_queue_position_terminal_returns_zero_zero( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """Finished jobs have no queue position — always (0, 0).""" + + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request("c", "qp-term"), callback_url=None + ) + await session.commit() + + response = ResponseIX( + use_case="bank_statement_header", + ix_client_id="c", + request_id="qp-term", + ) + async with session_factory() as session: + await jobs_repo.mark_done(session, inserted.job_id, response) + await session.commit() + + async with session_factory() as session: + pos = await jobs_repo.queue_position(session, inserted.job_id) + + assert pos == (0, 0) + + +async def test_queue_position_unknown_id_returns_zero_zero( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + pos = await jobs_repo.queue_position(session, uuid4()) + assert pos == (0, 0) + + async def test_concurrent_claim_never_double_dispatches( session_factory: async_sessionmaker[AsyncSession], ) -> None: diff --git a/tests/integration/test_ui_routes.py b/tests/integration/test_ui_routes.py index 3822277..be2f42a 100644 --- a/tests/integration/test_ui_routes.py +++ b/tests/integration/test_ui_routes.py @@ -18,7 +18,7 @@ from __future__ import annotations import json from collections.abc import Iterator from pathlib import Path -from uuid import uuid4 +from uuid import UUID, uuid4 import pytest from fastapi.testclient import TestClient @@ -205,6 +205,38 @@ class TestSubmitJobCustom: ) +class TestDisplayName: + def test_post_persists_display_name_in_file_ref( + self, + app: TestClient, + postgres_url: str, + ) -> None: + """The client-provided upload filename lands in FileRef.display_name.""" + + request_id = f"ui-name-{uuid4().hex[:8]}" + with FIXTURE_PDF.open("rb") as fh: + resp = app.post( + "/ui/jobs", + data={ + "use_case_mode": "registered", + "use_case_name": "bank_statement_header", + "ix_client_id": "ui-test", + "request_id": request_id, + }, + files={ + "pdf": ("my statement.pdf", fh, "application/pdf") + }, + follow_redirects=False, + ) + assert resp.status_code in (200, 303), resp.text + + job_row = _find_job(postgres_url, "ui-test", request_id) + assert job_row is not None + entry = job_row.request["context"]["files"][0] + assert isinstance(entry, dict) + assert entry["display_name"] == "my statement.pdf" + + class TestFragment: def test_fragment_pending_has_trigger( self, @@ -234,6 +266,117 @@ class TestFragment: assert "hx-trigger" in body assert "2s" in body assert "pending" in body.lower() or "running" in body.lower() + # New queue-awareness copy. + assert "Queue position" in body or "About to start" in body + + def test_fragment_pending_shows_filename( + self, + app: TestClient, + postgres_url: str, + ) -> None: + request_id = f"ui-frag-pf-{uuid4().hex[:8]}" + with FIXTURE_PDF.open("rb") as fh: + app.post( + "/ui/jobs", + data={ + "use_case_mode": "registered", + "use_case_name": "bank_statement_header", + "ix_client_id": "ui-test", + "request_id": request_id, + }, + files={ + "pdf": ( + "client-side-name.pdf", + fh, + "application/pdf", + ) + }, + follow_redirects=False, + ) + job_row = _find_job(postgres_url, "ui-test", request_id) + assert job_row is not None + resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment") + assert resp.status_code == 200 + assert "client-side-name.pdf" in resp.text + + def test_fragment_running_shows_elapsed( + self, + app: TestClient, + postgres_url: str, + ) -> None: + """After flipping a row to running with a backdated started_at, the + fragment renders a ``Running for MM:SS`` line.""" + + request_id = f"ui-frag-r-{uuid4().hex[:8]}" + with FIXTURE_PDF.open("rb") as fh: + app.post( + "/ui/jobs", + data={ + "use_case_mode": "registered", + "use_case_name": "bank_statement_header", + "ix_client_id": "ui-test", + "request_id": request_id, + }, + files={"pdf": ("sample.pdf", fh, "application/pdf")}, + follow_redirects=False, + ) + job_row = _find_job(postgres_url, "ui-test", request_id) + assert job_row is not None + + _force_running(postgres_url, job_row.job_id) + + resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment") + assert resp.status_code == 200 + body = resp.text + assert "Running for" in body + # MM:SS; our backdate is ~10s so expect 00:1? or higher. + import re + + assert re.search(r"\d{2}:\d{2}", body), body + + def test_fragment_backward_compat_no_display_name( + self, + app: TestClient, + postgres_url: str, + ) -> None: + """Older rows (stored before display_name existed) must still render.""" + + from ix.contracts.request import Context, FileRef, RequestIX + + legacy_req = RequestIX( + use_case="bank_statement_header", + ix_client_id="ui-test", + request_id=f"ui-legacy-{uuid4().hex[:8]}", + context=Context( + files=[ + FileRef(url="file:///tmp/ix/ui/legacy.pdf") + ] + ), + ) + + import asyncio + + from ix.store import jobs_repo as _repo + + async def _insert() -> UUID: + eng = create_async_engine(postgres_url) + sf = async_sessionmaker(eng, expire_on_commit=False) + try: + async with sf() as session: + job = await _repo.insert_pending( + session, legacy_req, callback_url=None + ) + await session.commit() + return job.job_id + finally: + await eng.dispose() + + job_id = asyncio.run(_insert()) + resp = app.get(f"/ui/jobs/{job_id}/fragment") + assert resp.status_code == 200 + body = resp.text + # Must not crash; must include the fallback basename from the URL. + assert "legacy.pdf" in body def test_fragment_done_shows_pretty_json( self, @@ -250,7 +393,13 @@ class TestFragment: "ix_client_id": "ui-test", "request_id": request_id, }, - files={"pdf": ("sample.pdf", fh, "application/pdf")}, + files={ + "pdf": ( + "my-done-doc.pdf", + fh, + "application/pdf", + ) + }, follow_redirects=False, ) job_row = _find_job(postgres_url, "ui-test", request_id) @@ -274,6 +423,8 @@ class TestFragment: # JSON present. assert "UBS AG" in body assert "CHF" in body + # Filename surfaced on the done fragment. + assert "my-done-doc.pdf" in body def _find_job(postgres_url: str, client_id: str, request_id: str): # type: ignore[no-untyped-def] @@ -348,3 +499,39 @@ def _force_done( await eng.dispose() asyncio.run(_go()) + + +def _force_running( + postgres_url: str, + job_id, # type: ignore[no-untyped-def] + seconds_ago: int = 10, +) -> None: + """Flip a pending job to ``running`` with a backdated ``started_at``. + + The fragment renders "Running for MM:SS" which needs a ``started_at`` in + the past; 10s is enough to produce a deterministic non-zero MM:SS. + """ + + import asyncio + from datetime import UTC, datetime, timedelta + + from sqlalchemy import text + + async def _go(): # type: ignore[no-untyped-def] + eng = create_async_engine(postgres_url) + try: + async with eng.begin() as conn: + await conn.execute( + text( + "UPDATE ix_jobs SET status='running', started_at=:t " + "WHERE job_id=:jid" + ), + { + "t": datetime.now(UTC) - timedelta(seconds=seconds_ago), + "jid": str(job_id), + }, + ) + finally: + await eng.dispose() + + asyncio.run(_go()) diff --git a/tests/unit/test_contracts.py b/tests/unit/test_contracts.py index 4e81c53..9d7578c 100644 --- a/tests/unit/test_contracts.py +++ b/tests/unit/test_contracts.py @@ -50,6 +50,24 @@ class TestFileRef: assert fr.headers == {"Authorization": "Token abc"} assert fr.max_bytes == 1_000_000 + def test_display_name_defaults_to_none(self) -> None: + fr = FileRef(url="file:///tmp/ix/ui/abc.pdf") + assert fr.display_name is None + + def test_display_name_roundtrip(self) -> None: + fr = FileRef( + url="file:///tmp/ix/ui/abc.pdf", + display_name="my statement.pdf", + ) + assert fr.display_name == "my statement.pdf" + dumped = fr.model_dump_json() + rt = FileRef.model_validate_json(dumped) + assert rt.display_name == "my statement.pdf" + # Backward-compat: a serialised FileRef without display_name still + # validates cleanly (older stored jobs predate the field). + legacy = FileRef.model_validate({"url": "file:///x.pdf"}) + assert legacy.display_name is None + class TestOptionDefaults: def test_ocr_defaults_match_spec(self) -> None: diff --git a/tests/unit/test_surya_client.py b/tests/unit/test_surya_client.py index a713813..e9ac443 100644 --- a/tests/unit/test_surya_client.py +++ b/tests/unit/test_surya_client.py @@ -161,6 +161,78 @@ def _write_tiny_png(path: Path) -> None: Image.new("RGB", (2, 2), color="white").save(path, format="PNG") +class TestGpuAvailableFlag: + def test_default_is_none(self) -> None: + client = SuryaOCRClient() + assert client.gpu_available is None + + def test_warm_up_probes_cuda_true(self) -> None: + """When torch reports CUDA, warm_up records True on the instance.""" + + client = SuryaOCRClient() + fake_foundation = MagicMock() + fake_recognition = MagicMock() + fake_detection = MagicMock() + fake_torch = SimpleNamespace( + cuda=SimpleNamespace(is_available=lambda: True) + ) + + module_patches = { + "surya.detection": SimpleNamespace( + DetectionPredictor=lambda: fake_detection + ), + "surya.foundation": SimpleNamespace( + FoundationPredictor=lambda: fake_foundation + ), + "surya.recognition": SimpleNamespace( + RecognitionPredictor=lambda _f: fake_recognition + ), + "torch": fake_torch, + } + with patch.dict("sys.modules", module_patches): + client.warm_up() + + assert client.gpu_available is True + assert client._recognition_predictor is fake_recognition + assert client._detection_predictor is fake_detection + + def test_warm_up_probes_cuda_false(self) -> None: + """CPU-mode host → warm_up records False.""" + + client = SuryaOCRClient() + fake_torch = SimpleNamespace( + cuda=SimpleNamespace(is_available=lambda: False) + ) + module_patches = { + "surya.detection": SimpleNamespace( + DetectionPredictor=lambda: MagicMock() + ), + "surya.foundation": SimpleNamespace( + FoundationPredictor=lambda: MagicMock() + ), + "surya.recognition": SimpleNamespace( + RecognitionPredictor=lambda _f: MagicMock() + ), + "torch": fake_torch, + } + with patch.dict("sys.modules", module_patches): + client.warm_up() + + assert client.gpu_available is False + + def test_warm_up_is_idempotent_for_probe(self) -> None: + """Second warm_up short-circuits; probed flag is preserved.""" + + client = SuryaOCRClient() + client._recognition_predictor = MagicMock() + client._detection_predictor = MagicMock() + client.gpu_available = True + + # No module patches — warm_up must NOT touch sys.modules or torch. + client.warm_up() + assert client.gpu_available is True + + @pytest.mark.parametrize("unused", [None]) # keep pytest happy if file ever runs alone def test_module_imports(unused: None) -> None: assert SuryaOCRClient is not None