Compare commits

...

4 commits

Author SHA1 Message Date
42a0086ba1 feat(ui): /ui/jobs listing page (#47)
All checks were successful
tests / test (push) Successful in 4m29s
2026-04-18 22:00:45 +00:00
673dc60178 feat(ui): add /ui/jobs listing page with filters + pagination
All checks were successful
tests / test (push) Successful in 1m27s
tests / test (pull_request) Successful in 1m17s
* `JobsRepo.list_recent` — paginated, filterable view over ix_jobs,
  newest first, returning (jobs, total) so the template can render
  "Showing N of M".
* `GET /ui/jobs` — filter bar (multi status + client_id), prev/next
  pagination, links to `/ui/jobs/{id}` per row. Surfaces filename from
  `FileRef.display_name` with URL-basename fallback for legacy rows.
* Persistent nav header gets a "Recent jobs" link on both `/ui` and the
  per-job page so users can tab between submit and history.
* Integration tests cover: ordering, status/client filters (single +
  multi), pagination, legacy fallback, header links.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 22:28:10 +02:00
029c20c39e feat(ui): queue position, elapsed time, filename, CPU-mode notice (#46)
All checks were successful
tests / test (push) Successful in 1m44s
Co-authored-by: Dirk Riemann <ditori@gmail.com>
Co-committed-by: Dirk Riemann <ditori@gmail.com>
2026-04-18 20:06:35 +00:00
136e31c82c Merge pull request 'feat(ui): add browser UI at /ui' (#45) from feat/ui into main
All checks were successful
tests / test (push) Successful in 3m29s
2026-04-18 19:31:24 +00:00
17 changed files with 1628 additions and 29 deletions

View file

@ -8,6 +8,8 @@ Status: MVP deployed (2026-04-18) at `http://192.168.68.42:8994` — LAN only. B
Use cases: the built-in registry lives in `src/ix/use_cases/__init__.py` (`bank_statement_header` for MVP). Callers without a registered entry can ship an ad-hoc schema inline via `RequestIX.use_case_inline` (see README "Ad-hoc use cases"); the pipeline builds the Pydantic classes on the fly per request. The `/ui` page exposes this as a "custom" option so non-engineering users can experiment without a deploy.
UX notes: the `/ui` job page surfaces queue position + elapsed MM:SS on each poll, renders the client-provided filename (stored via `FileRef.display_name`, optional metadata — the pipeline ignores it for execution), and shows a CPU-mode notice when `/healthz` reports `ocr_gpu: false`. A paginated history lives at `/ui/jobs` (status + client_id filters, newest first).
## Guiding Principles
- **On-prem always.** All LLM inference, OCR, and user-data processing run on the home server (192.168.68.42). No cloud APIs — OpenAI, Anthropic, Azure, AWS Bedrock/Textract, Google Document AI, Mistral, etc. are not to be used for user data or inference. LLM backend is Ollama (:11434); OCR runs locally (pluggable `OCRClient` interface, first engine: Surya on the RTX 3090); job state lives in local Postgres on the postgis container. The spec's references to Azure / AWS / OpenAI are examples to *replace*, not inherit.

View file

@ -10,6 +10,8 @@ Given a document (PDF, image, text) and a named *use case*, ix returns a structu
A minimal browser UI lives at [`http://192.168.68.42:8994/ui`](http://192.168.68.42:8994/ui): drop a PDF, pick a registered use case or define one inline, submit, see the pretty-printed result. HTMX polls the job status every 2 s until the pipeline finishes. LAN-only, no auth.
Past submissions are browsable at [`/ui/jobs`](http://192.168.68.42:8994/ui/jobs) — a paginated list (newest first) with status + `client_id` filters. Each row links to `/ui/jobs/{job_id}` for the full request/response view.
- Full reference spec: [`docs/spec-core-pipeline.md`](docs/spec-core-pipeline.md) (aspirational; MVP is a strict subset)
- **MVP design:** [`docs/superpowers/specs/2026-04-18-ix-mvp-design.md`](docs/superpowers/specs/2026-04-18-ix-mvp-design.md)
- **Implementation plan:** [`docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md`](docs/superpowers/plans/2026-04-18-ix-mvp-implementation.md)
@ -76,6 +78,25 @@ uv run pytest tests/unit -v # hermetic unit + integration sui
IX_TEST_OLLAMA=1 uv run pytest tests/live -v # needs LAN access to Ollama + GPU
```
### UI jobs list
`GET /ui/jobs` renders a paginated, newest-first table of submitted jobs. Query params:
- `status=pending|running|done|error` — repeat for multi-select.
- `client_id=<str>` — exact match (e.g. `ui`, `mammon`).
- `limit=<n>` (default 50, max 200) + `offset=<n>` for paging.
Each row shows status badge, original filename (`FileRef.display_name` or URL basename), use case, client id, submitted time + relative, and elapsed wall-clock (terminal rows only). Each row links to `/ui/jobs/{job_id}` for the full response JSON.
### UI queue + progress UX
The `/ui` job page polls `GET /ui/jobs/{id}/fragment` every 2 s and surfaces:
- **Queue position** while pending: "Queue position: N ahead — M jobs total in flight (single worker)" so it's obvious a new submission is waiting on an earlier job rather than stuck. "About to start" when the worker has just freed up.
- **Elapsed time** while running ("Running for MM:SS") and on finish ("Finished in MM:SS").
- **Original filename** — the UI stashes the client-provided upload name in `FileRef.display_name` so the browser shows `your_statement.pdf` instead of the on-disk UUID.
- **CPU-mode notice** when `/healthz` reports `ocr_gpu: false` (the Surya OCR client observed `torch.cuda.is_available() == False`): a collapsed `<details>` pointing at the deployment runbook.
## Deploying
```bash

View file

@ -85,6 +85,7 @@ class FileRef(BaseModel):
url: str # http(s):// or file://
headers: dict[str, str] = {} # e.g. {"Authorization": "Token …"}
max_bytes: Optional[int] = None # per-file override; defaults to IX_FILE_MAX_BYTES
display_name: Optional[str] = None # UI-only metadata; client-provided filename for display (pipeline ignores)
class Options(BaseModel):
ocr: OCROptions = OCROptions()
@ -225,14 +226,15 @@ Callers that prefer direct SQL (the `pg_queue_adapter` contract): insert a row w
| `POST` | `/jobs` | Body = `RequestIX` (+ optional `callback_url`). → `201 {job_id, ix_id, status: "pending"}`. Idempotent on `(ix_client_id, request_id)` — same pair returns the existing `job_id` with `200`. |
| `GET` | `/jobs/{job_id}` | → full `Job`. Source of truth regardless of submission path or callback outcome. |
| `GET` | `/jobs?client_id=…&request_id=…` | Lookup-by-correlation (caller idempotency helper). The pair is UNIQUE in the table → at most one match. Returns the job or `404`. |
| `GET` | `/healthz` | `{postgres, ollama, ocr}`. See below for semantics. Used by `infrastructure` monitoring dashboard. |
| `GET` | `/healthz` | `{postgres, ollama, ocr, ocr_gpu}`. See below for semantics. Used by `infrastructure` monitoring dashboard. `ocr_gpu` is additive metadata (not part of the gate). |
| `GET` | `/metrics` | Counters over the last 24 hours: `jobs_pending`, `jobs_running`, `jobs_done_24h`, `jobs_error_24h`, per-use-case avg seconds over the same window. Plain JSON, no Prometheus format for MVP. |
**`/healthz` semantics:**
- `postgres`: `SELECT 1` on the job store pool; `ok` iff the query returns within 2 s.
- `ollama`: `GET {IX_OLLAMA_URL}/api/tags` within 5 s; `ok` iff reachable AND the default model (`IX_DEFAULT_MODEL`) is listed in the tags response; `degraded` iff reachable but the model is missing (ops action: run `ollama pull <model>` on the host); `fail` on any other error.
- `ocr`: `SuryaOCRClient.selfcheck()` — returns `ok` iff CUDA is available and the Surya text-recognition model is loaded into GPU memory at process start. `fail` on any error.
- Overall HTTP status: `200` iff all three are `ok`; `503` otherwise. The monitoring dashboard only surfaces `200`/`non-200`.
- `ocr_gpu`: `true | false | null`. Additive metadata: reports whether the OCR client observed `torch.cuda.is_available() == True` at first warm-up. `null` means not yet probed (fresh process, fake client, etc.). The UI reads this to surface a CPU-mode slowdown notice; never part of the 200/503 gate.
- Overall HTTP status: `200` iff all three core statuses (`postgres`, `ollama`, `ocr`) are `ok`; `503` otherwise. `ocr_gpu` does not affect the gate. The monitoring dashboard only surfaces `200`/`non-200`.
**Callback delivery** (when `callback_url` is set): one POST of the full `Job` body, 10 s timeout. 2xx → `callback_status='delivered'`. Anything else → `'failed'`. No retry. Callers always have `GET /jobs/{id}` as the authoritative fallback.

View file

@ -18,7 +18,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Annotated, Literal
from uuid import UUID
@ -44,10 +44,15 @@ class Probes:
keeping them sync lets tests pass plain lambdas. Real probes that need
async work run the call through ``asyncio.run_in_executor`` inside the
callable (Chunk 4).
``ocr_gpu`` is additive metadata for the UI (not a health gate): returns
``True`` iff the OCR client reports CUDA is available, ``False`` for
explicit CPU-mode, ``None`` if unknown (fake client, not yet warmed up).
"""
ollama: Callable[[], Literal["ok", "degraded", "fail"]]
ocr: Callable[[], Literal["ok", "fail"]]
ocr_gpu: Callable[[], bool | None] = field(default=lambda: None)
def get_session_factory_dep() -> async_sessionmaker[AsyncSession]:
@ -163,8 +168,16 @@ async def healthz(
except Exception:
ocr_state = "fail"
try:
ocr_gpu_state: bool | None = probes.ocr_gpu()
except Exception:
ocr_gpu_state = None
body = HealthStatus(
postgres=postgres_state, ollama=ollama_state, ocr=ocr_state
postgres=postgres_state,
ollama=ollama_state,
ocr=ocr_state,
ocr_gpu=ocr_gpu_state,
)
if postgres_state != "ok" or ollama_state != "ok" or ocr_state != "ok":
response.status_code = 503

View file

@ -28,9 +28,15 @@ class HealthStatus(BaseModel):
"""Body of GET /healthz.
Each field reports per-subsystem state. Overall HTTP status is 200 iff
every field is ``"ok"`` (spec §5). ``ollama`` can be ``"degraded"``
when the backend is reachable but the default model isn't pulled —
monitoring surfaces that as non-200.
every of the three core status keys is ``"ok"`` (spec §5). ``ollama`` can
be ``"degraded"`` when the backend is reachable but the default model
isn't pulled — monitoring surfaces that as non-200.
``ocr_gpu`` is additive metadata, not part of the health gate: it reports
whether the Surya OCR client observed ``torch.cuda.is_available() == True``
on first warm-up. ``None`` means we haven't probed yet (fresh process,
fake client, or warm_up hasn't happened). The UI reads this to surface a
CPU-mode slowdown warning to users.
"""
model_config = ConfigDict(extra="forbid")
@ -38,6 +44,7 @@ class HealthStatus(BaseModel):
postgres: Literal["ok", "fail"]
ollama: Literal["ok", "degraded", "fail"]
ocr: Literal["ok", "fail"]
ocr_gpu: bool | None = None
class MetricsResponse(BaseModel):

View file

@ -108,6 +108,20 @@ def _make_ocr_probe(ocr: OCRClient) -> Callable[[], Literal["ok", "fail"]]:
return probe
def _make_ocr_gpu_probe(ocr: OCRClient) -> Callable[[], bool | None]:
"""Adapter: read the OCR client's recorded ``gpu_available`` attribute.
The attribute is set by :meth:`SuryaOCRClient.warm_up` on first load.
Returns ``None`` when the client has no such attribute (e.g. FakeOCRClient
in test mode) or warm_up hasn't happened yet. Never raises.
"""
def probe() -> bool | None:
return getattr(ocr, "gpu_available", None)
return probe
def _run_async_sync(make_coro, *, fallback: str) -> str: # type: ignore[no-untyped-def]
"""Run ``make_coro()`` on a fresh loop in a thread; return its result.
@ -167,6 +181,7 @@ def create_app(*, spawn_worker: bool = True) -> FastAPI:
lambda: Probes(
ollama=_make_ollama_probe(genai_client, cfg),
ocr=_make_ocr_probe(ocr_client),
ocr_gpu=_make_ocr_gpu_probe(ocr_client),
),
)

View file

@ -22,6 +22,11 @@ class FileRef(BaseModel):
Used when the file URL needs authentication (e.g. Paperless ``Token``) or a
tighter size cap than :envvar:`IX_FILE_MAX_BYTES`. Plain URLs that need no
headers can stay as bare ``str`` values in :attr:`Context.files`.
``display_name`` is pure UI metadata the pipeline never consults it for
execution. When the UI uploads a PDF under a random ``{uuid}.pdf`` name on
disk, it stashes the client-provided filename here so the browser can
surface "your_statement.pdf" instead of "8f3a...pdf" back to the user.
"""
model_config = ConfigDict(extra="forbid")
@ -29,6 +34,7 @@ class FileRef(BaseModel):
url: str
headers: dict[str, str] = Field(default_factory=dict)
max_bytes: int | None = None
display_name: str | None = None
class Context(BaseModel):

View file

@ -48,6 +48,11 @@ class SuryaOCRClient:
def __init__(self) -> None:
self._recognition_predictor: Any = None
self._detection_predictor: Any = None
# ``None`` until warm_up() has run at least once. After that it's the
# observed value of ``torch.cuda.is_available()`` at load time. We
# cache it on the instance so ``/healthz`` / the UI can surface a
# CPU-mode warning without re-probing torch each request.
self.gpu_available: bool | None = None
def warm_up(self) -> None:
"""Load the detection + recognition predictors. Idempotent.
@ -72,6 +77,18 @@ class SuryaOCRClient:
self._recognition_predictor = RecognitionPredictor(foundation)
self._detection_predictor = DetectionPredictor()
# Best-effort CUDA probe — only after predictors loaded cleanly so we
# know torch is fully importable. ``torch`` is a Surya transitive
# dependency so if we got this far it's on sys.path. We swallow any
# exception to keep warm_up() sturdy: the attribute stays None and the
# UI falls back to "unknown" gracefully.
try:
import torch # type: ignore[import-not-found]
self.gpu_available = bool(torch.cuda.is_available())
except Exception:
self.gpu_available = None
async def ocr(
self,
pages: list[Page],

View file

@ -23,6 +23,7 @@ A few invariants worth stating up front:
from __future__ import annotations
import secrets
from collections.abc import Iterable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
from uuid import UUID, uuid4
@ -157,6 +158,76 @@ async def get(session: AsyncSession, job_id: UUID) -> Job | None:
return _orm_to_job(row) if row is not None else None
async def queue_position(
session: AsyncSession, job_id: UUID
) -> tuple[int, int]:
"""Return ``(ahead, total_active)`` for a pending/running job.
``ahead`` counts active jobs (``pending`` or ``running``) that would be
claimed by the worker before this one:
* any ``running`` job is always ahead it has the worker already.
* other ``pending`` jobs with a strictly older ``created_at`` are ahead
(the worker picks pending rows in ``ORDER BY created_at`` per
:func:`claim_next_pending`).
``total_active`` is the total count of ``pending`` + ``running`` rows.
Terminal jobs (``done`` / ``error``) always return ``(0, 0)`` there is
no meaningful "position" for a finished job.
"""
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
if row is None:
return (0, 0)
if row.status not in ("pending", "running"):
return (0, 0)
total_active = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status.in_(("pending", "running")))
)
or 0
)
if row.status == "running":
# A running row is at the head of the queue for our purposes.
return (0, total_active)
# Pending: count running rows (always ahead) + older pending rows.
# We tiebreak on ``job_id`` for deterministic ordering when multiple
# rows share a ``created_at`` (e.g. the same transaction inserts two
# jobs, which Postgres stamps with identical ``now()`` values).
running_ahead = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "running")
)
or 0
)
pending_ahead = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(
IxJob.status == "pending",
(
(IxJob.created_at < row.created_at)
| (
(IxJob.created_at == row.created_at)
& (IxJob.job_id < row.job_id)
)
),
)
)
or 0
)
return (running_ahead + pending_ahead, total_active)
async def get_by_correlation(
session: AsyncSession, client_id: str, request_id: str
) -> Job | None:
@ -263,6 +334,75 @@ async def sweep_orphans(
return list(candidates)
_LIST_RECENT_LIMIT_CAP = 200
async def list_recent(
session: AsyncSession,
*,
limit: int = 50,
offset: int = 0,
status: str | Iterable[str] | None = None,
client_id: str | None = None,
) -> tuple[list[Job], int]:
"""Return a page of recent jobs, newest first, plus total matching count.
Powers the ``/ui/jobs`` listing page. Ordering is ``created_at DESC``.
``total`` reflects matching rows *before* limit/offset so the template
can render "showing N of M".
Parameters
----------
limit:
Maximum rows to return. Capped at
:data:`_LIST_RECENT_LIMIT_CAP` (200) to bound the JSON payload
size callers that pass a larger value get clamped silently.
offset:
Non-negative row offset. Negative values raise ``ValueError``
because the template treats offset as a page cursor; a negative
cursor is a bug at the call site, not something to paper over.
status:
If set, restrict to the given status(es). Accepts a single
:data:`Job.status` value or any iterable (list/tuple/set). Values
outside the lifecycle enum simply match nothing we don't try
to validate here; the DB CHECK constraint already bounds the set.
client_id:
If set, exact match on :attr:`IxJob.client_id`. No substring /
prefix match simple and predictable.
"""
if offset < 0:
raise ValueError(f"offset must be >= 0, got {offset}")
effective_limit = max(0, min(limit, _LIST_RECENT_LIMIT_CAP))
filters = []
if status is not None:
if isinstance(status, str):
filters.append(IxJob.status == status)
else:
status_list = list(status)
if not status_list:
# Empty iterable → no rows match. Return a sentinel
# IN-list that can never hit so we don't blow up.
filters.append(IxJob.status.in_(status_list))
else:
filters.append(IxJob.status.in_(status_list))
if client_id is not None:
filters.append(IxJob.client_id == client_id)
total_q = select(func.count()).select_from(IxJob)
list_q = select(IxJob).order_by(IxJob.created_at.desc())
for f in filters:
total_q = total_q.where(f)
list_q = list_q.where(f)
total = int(await session.scalar(total_q) or 0)
rows = (
await session.scalars(list_q.limit(effective_limit).offset(offset))
).all()
return [_orm_to_job(r) for r in rows], total
def _as_interval(seconds: int): # type: ignore[no-untyped-def]
"""Return a SQL interval expression for ``seconds``.

View file

@ -15,14 +15,21 @@ Design notes:
pending/running, the fragment auto-refreshes every 2s via
``hx-trigger="every 2s"``; when terminal, the trigger is dropped and the
pretty-printed response is rendered with highlight.js.
* A process-wide 60-second cache of the OCR GPU flag (read from the
injected :class:`Probes`) gates a "Surya is running on CPU" notice on
the fragment. The fragment is polled every 2 s; re-probing the OCR
client on every poll is waste one probe per minute is plenty.
"""
from __future__ import annotations
import json
import time
import uuid
from datetime import UTC, datetime
from pathlib import Path
from typing import Annotated
from urllib.parse import unquote, urlencode, urlsplit
from uuid import UUID
import aiofiles
@ -32,6 +39,7 @@ from fastapi import (
File,
Form,
HTTPException,
Query,
Request,
UploadFile,
)
@ -39,7 +47,7 @@ from fastapi.responses import HTMLResponse, RedirectResponse, Response
from fastapi.templating import Jinja2Templates
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from ix.adapters.rest.routes import get_session_factory_dep
from ix.adapters.rest.routes import Probes, get_probes, get_session_factory_dep
from ix.config import AppConfig, get_config
from ix.contracts.request import (
Context,
@ -58,6 +66,12 @@ from ix.use_cases import REGISTRY
TEMPLATES_DIR = Path(__file__).parent / "templates"
STATIC_DIR = Path(__file__).parent / "static"
# Module-level cache for the OCR GPU flag. The tuple is ``(value, expires_at)``
# where ``expires_at`` is a monotonic-clock deadline. A per-request call to
# :func:`_cached_ocr_gpu` re-probes only once the deadline has passed.
_OCR_GPU_CACHE: tuple[bool | None, float] = (None, 0.0)
_OCR_GPU_TTL_SECONDS = 60.0
def _templates() -> Jinja2Templates:
"""One Jinja env per process; cheap enough to build per DI call."""
@ -73,6 +87,105 @@ def _ui_tmp_dir(cfg: AppConfig) -> Path:
return d
def _cached_ocr_gpu(probes: Probes) -> bool | None:
"""Read the cached OCR GPU flag, re-probing if the TTL has elapsed.
Used by the index + fragment routes so the HTMX poll loop doesn't hit
the OCR client's torch-probe every 2 seconds. Falls back to ``None``
(unknown) on any probe error.
"""
global _OCR_GPU_CACHE
value, expires_at = _OCR_GPU_CACHE
now = time.monotonic()
if now >= expires_at:
try:
value = probes.ocr_gpu()
except Exception:
value = None
_OCR_GPU_CACHE = (value, now + _OCR_GPU_TTL_SECONDS)
return value
_VALID_STATUSES = ("pending", "running", "done", "error")
_JOBS_LIST_DEFAULT_LIMIT = 50
_JOBS_LIST_MAX_LIMIT = 200
def _use_case_label(request: RequestIX | None) -> str:
"""Prefer inline use-case label, fall back to the registered name."""
if request is None:
return ""
if request.use_case_inline is not None:
return request.use_case_inline.use_case_name or request.use_case
return request.use_case or ""
def _row_elapsed_seconds(job) -> int | None: # type: ignore[no-untyped-def]
"""Wall-clock seconds for a terminal row (finished - started).
Used in the list view's "Elapsed" column. Returns ``None`` for rows
that haven't run yet (pending / running-with-missing-started_at) so
the template can render ```` instead.
"""
if job.status in ("done", "error") and job.started_at and job.finished_at:
return max(0, int((job.finished_at - job.started_at).total_seconds()))
return None
def _humanize_delta(seconds: int) -> str:
"""Coarse-grained "N min ago" for the list view.
The list renders many rows; we don't need second-accuracy here. For
sub-minute values we still say "just now" to avoid a jumpy display.
"""
if seconds < 45:
return "just now"
mins = seconds // 60
if mins < 60:
return f"{mins} min ago"
hours = mins // 60
if hours < 24:
return f"{hours} h ago"
days = hours // 24
return f"{days} d ago"
def _fmt_elapsed_seconds(seconds: int | None) -> str:
if seconds is None:
return ""
return f"{seconds // 60:02d}:{seconds % 60:02d}"
def _file_display_entries(
request: RequestIX | None,
) -> list[str]:
"""Human-readable filename(s) for a request's context.files.
Prefers :attr:`FileRef.display_name`. Falls back to the URL's basename
(``unquote``ed so ``%20`` space). Plain string entries use the same
basename rule. Empty list for a request with no files.
"""
if request is None:
return []
out: list[str] = []
for entry in request.context.files:
if isinstance(entry, FileRef):
if entry.display_name:
out.append(entry.display_name)
continue
url = entry.url
else:
url = entry
basename = unquote(urlsplit(url).path.rsplit("/", 1)[-1]) or url
out.append(basename)
return out
def build_router() -> APIRouter:
"""Return a fresh router. Kept as a factory so :mod:`ix.app` can wire DI."""
@ -80,7 +193,10 @@ def build_router() -> APIRouter:
@router.get("", response_class=HTMLResponse)
@router.get("/", response_class=HTMLResponse)
async def index(request: Request) -> Response:
async def index(
request: Request,
probes: Annotated[Probes, Depends(get_probes)],
) -> Response:
tpl = _templates()
return tpl.TemplateResponse(
request,
@ -90,6 +206,93 @@ def build_router() -> APIRouter:
"job": None,
"form_error": None,
"form_values": {},
"file_names": [],
"cpu_mode": _cached_ocr_gpu(probes) is False,
},
)
@router.get("/jobs", response_class=HTMLResponse)
async def jobs_list(
request: Request,
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
status: Annotated[list[str] | None, Query()] = None,
client_id: Annotated[str | None, Query()] = None,
limit: Annotated[int, Query(ge=1, le=_JOBS_LIST_MAX_LIMIT)] = _JOBS_LIST_DEFAULT_LIMIT,
offset: Annotated[int, Query(ge=0)] = 0,
) -> Response:
# Drop unknown statuses silently — we don't want a stray query
# param to 400. The filter bar only offers valid values anyway.
status_filter: list[str] = []
if status:
status_filter = [s for s in status if s in _VALID_STATUSES]
client_filter = (client_id or "").strip() or None
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(
session,
limit=limit,
offset=offset,
status=status_filter if status_filter else None,
client_id=client_filter,
)
now = datetime.now(UTC)
rows = []
for job in jobs:
files = _file_display_entries(job.request)
display = files[0] if files else ""
created = job.created_at
created_delta = _humanize_delta(
int((now - created).total_seconds())
) if created is not None else ""
created_local = (
created.strftime("%Y-%m-%d %H:%M:%S")
if created is not None
else ""
)
rows.append(
{
"job_id": str(job.job_id),
"status": job.status,
"display_name": display,
"use_case": _use_case_label(job.request),
"client_id": job.client_id,
"created_at": created_local,
"created_delta": created_delta,
"elapsed": _fmt_elapsed_seconds(_row_elapsed_seconds(job)),
}
)
prev_offset = max(0, offset - limit) if offset > 0 else None
next_offset = offset + limit if (offset + limit) < total else None
def _link(new_offset: int) -> str:
params: list[tuple[str, str]] = []
for s in status_filter:
params.append(("status", s))
if client_filter:
params.append(("client_id", client_filter))
params.append(("limit", str(limit)))
params.append(("offset", str(new_offset)))
return f"/ui/jobs?{urlencode(params)}"
tpl = _templates()
return tpl.TemplateResponse(
request,
"jobs_list.html",
{
"rows": rows,
"total": total,
"shown": len(rows),
"limit": limit,
"offset": offset,
"status_filter": status_filter,
"client_filter": client_filter or "",
"valid_statuses": _VALID_STATUSES,
"prev_link": _link(prev_offset) if prev_offset is not None else None,
"next_link": _link(next_offset) if next_offset is not None else None,
},
)
@ -100,6 +303,7 @@ def build_router() -> APIRouter:
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
probes: Annotated[Probes, Depends(get_probes)],
) -> Response:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
@ -114,6 +318,8 @@ def build_router() -> APIRouter:
"job": job,
"form_error": None,
"form_values": {},
"file_names": _file_display_entries(job.request),
"cpu_mode": _cached_ocr_gpu(probes) is False,
},
)
@ -124,11 +330,16 @@ def build_router() -> APIRouter:
session_factory: Annotated[
async_sessionmaker[AsyncSession], Depends(get_session_factory_dep)
],
probes: Annotated[Probes, Depends(get_probes)],
) -> Response:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is None:
raise HTTPException(status_code=404, detail="job not found")
ahead, total_active = await jobs_repo.queue_position(
session, job_id
)
response_json: str | None = None
if job.response is not None:
response_json = json.dumps(
@ -137,11 +348,23 @@ def build_router() -> APIRouter:
sort_keys=True,
default=str,
)
elapsed_text = _format_elapsed(job)
file_names = _file_display_entries(job.request)
tpl = _templates()
return tpl.TemplateResponse(
request,
"job_fragment.html",
{"job": job, "response_json": response_json},
{
"job": job,
"response_json": response_json,
"ahead": ahead,
"total_active": total_active,
"elapsed_text": elapsed_text,
"file_names": file_names,
"cpu_mode": _cached_ocr_gpu(probes) is False,
},
)
@router.post("/jobs")
@ -256,6 +479,12 @@ def build_router() -> APIRouter:
ctx_texts = [texts.strip()]
req_id = request_id.strip() or uuid.uuid4().hex
# Preserve the client-provided filename so the UI can surface the
# original name to the user (the on-disk name is a UUID). Strip any
# path prefix a browser included.
original_name = (pdf.filename or "").rsplit("/", 1)[-1].rsplit(
"\\", 1
)[-1] or None
try:
request_ix = RequestIX(
use_case=use_case_name or "adhoc",
@ -263,7 +492,12 @@ def build_router() -> APIRouter:
ix_client_id=(ix_client_id.strip() or "ui"),
request_id=req_id,
context=Context(
files=[FileRef(url=f"file://{target.resolve()}")],
files=[
FileRef(
url=f"file://{target.resolve()}",
display_name=original_name,
)
],
texts=ctx_texts,
),
options=Options(
@ -305,3 +539,30 @@ def _flag(value: str, *, default: bool) -> bool:
if value == "":
return default
return value.lower() in ("on", "true", "1", "yes")
def _format_elapsed(job) -> str | None: # type: ignore[no-untyped-def]
"""Render a ``MM:SS`` elapsed string for the fragment template.
* running time since ``started_at``
* done/error ``finished_at - created_at`` (total wall-clock including
queue time)
* pending / missing timestamps ``None`` (template omits the line)
"""
from datetime import UTC, datetime
def _fmt(seconds: float) -> str:
s = max(0, int(seconds))
return f"{s // 60:02d}:{s % 60:02d}"
if job.status == "running" and job.started_at is not None:
now = datetime.now(UTC)
return _fmt((now - job.started_at).total_seconds())
if (
job.status in ("done", "error")
and job.finished_at is not None
and job.created_at is not None
):
return _fmt((job.finished_at - job.created_at).total_seconds())
return None

View file

@ -3,7 +3,9 @@
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>infoxtractor UI</title>
<title>
InfoXtractor{% if job %} &mdash; job {{ job.job_id }}{% endif %}
</title>
<link
rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css"
@ -15,15 +17,57 @@
<script src="https://unpkg.com/htmx.org@1.9.12"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/11.10.0/highlight.min.js"></script>
<style>
main { padding-top: 2rem; padding-bottom: 4rem; }
main { padding-top: 1.5rem; padding-bottom: 4rem; }
pre code.hljs { padding: 1rem; border-radius: 0.4rem; }
.form-error { color: var(--pico-del-color, #c44); font-weight: 600; }
details[open] > summary { margin-bottom: 0.5rem; }
.field-hint { font-size: 0.85rem; color: var(--pico-muted-color); }
nav.ix-header {
display: flex; gap: 1rem; align-items: baseline;
padding: 0.6rem 0; border-bottom: 1px solid var(--pico-muted-border-color, #ddd);
margin-bottom: 1rem; flex-wrap: wrap;
}
nav.ix-header .brand { font-weight: 700; margin-right: auto; }
nav.ix-header code { font-size: 0.9em; }
.status-panel, .result-panel { margin-top: 0.75rem; }
.status-panel header, .result-panel header { font-size: 0.95rem; }
.job-files code { font-size: 0.9em; }
.cpu-notice { margin-top: 0.6rem; font-size: 0.9rem; color: var(--pico-muted-color); }
.live-dot {
display: inline-block; margin-left: 0.3rem;
animation: ix-blink 1.2s ease-in-out infinite;
color: var(--pico-primary, #4f8cc9);
}
@keyframes ix-blink {
0%, 100% { opacity: 0.2; }
50% { opacity: 1; }
}
.copy-btn {
margin-left: 0.3rem; padding: 0.1rem 0.5rem;
font-size: 0.8rem; line-height: 1.2;
}
</style>
</head>
<body>
<main class="container">
<nav class="ix-header" aria-label="InfoXtractor navigation">
<span class="brand">InfoXtractor</span>
<a href="/ui">Upload a new extraction</a>
<a href="/ui/jobs">Recent jobs</a>
{% if job %}
<span>
Job:
<code id="current-job-id">{{ job.job_id }}</code>
<button
type="button"
class="secondary outline copy-btn"
onclick="navigator.clipboard && navigator.clipboard.writeText('{{ job.job_id }}')"
aria-label="Copy job id to clipboard"
>Copy</button>
</span>
{% endif %}
</nav>
<hgroup>
<h1>infoxtractor</h1>
<p>Drop a PDF, pick or define a use case, run the pipeline.</p>
@ -35,6 +79,7 @@
</article>
{% endif %}
{% if not job %}
<article>
<form
action="/ui/jobs"
@ -156,12 +201,21 @@
<button type="submit">Submit</button>
</form>
</article>
{% endif %}
{% if job %}
<article id="job-panel">
<header>
<strong>Job</strong> <code>{{ job.job_id }}</code>
<br /><small>ix_id: <code>{{ job.ix_id }}</code></small>
{% if file_names %}
<br /><small>
File{% if file_names|length > 1 %}s{% endif %}:
{% for name in file_names %}
<code>{{ name }}</code>{% if not loop.last %}, {% endif %}
{% endfor %}
</small>
{% endif %}
</header>
<div
id="job-status"
@ -169,7 +223,7 @@
hx-trigger="load"
hx-swap="innerHTML"
>
Loading
Loading&hellip;
</div>
</article>
{% endif %}

View file

@ -9,16 +9,69 @@
hx-swap="outerHTML"
{% endif %}
>
<article class="status-panel">
<header>
<strong>Job status</strong>
</header>
<p>
Status: <strong>{{ job.status }}</strong>
Status:
<strong>{{ job.status }}</strong>
{% if not terminal %}
<progress></progress>
<span class="live-dot" aria-hidden="true">&#9679;</span>
{% endif %}
</p>
{% if file_names %}
<p class="job-files">
File{% if file_names|length > 1 %}s{% endif %}:
{% for name in file_names %}
<code>{{ name }}</code>{% if not loop.last %}, {% endif %}
{% endfor %}
</p>
{% endif %}
{% if job.status == "pending" %}
<p>
{% if ahead == 0 %}
About to start &mdash; the worker just freed up.
{% else %}
Queue position: {{ ahead }} ahead &mdash; {{ total_active }} job{% if total_active != 1 %}s{% endif %} total in flight (single worker).
{% endif %}
</p>
<progress></progress>
{% elif job.status == "running" %}
{% if elapsed_text %}
<p>Running for {{ elapsed_text }}.</p>
{% endif %}
<progress></progress>
{% elif terminal %}
{% if elapsed_text %}
<p>Finished in {{ elapsed_text }}.</p>
{% endif %}
{% endif %}
{% if cpu_mode and not terminal %}
<details class="cpu-notice">
<summary>Surya is running on CPU (~1&ndash;2 min/page)</summary>
<p>
A host NVIDIA driver upgrade would unlock GPU extraction; tracked in
<code>docs/deployment.md</code>.
</p>
</details>
{% endif %}
</article>
<article class="result-panel">
<header>
<strong>Result</strong>
</header>
{% if terminal and response_json %}
<pre><code class="language-json">{{ response_json }}</code></pre>
{% elif terminal %}
<p><em>No response body.</em></p>
{% else %}
<p><em>Waiting for the pipeline to finish&hellip;</em></p>
{% endif %}
</article>
</div>

View file

@ -0,0 +1,164 @@
<!doctype html>
<html lang="en" data-theme="light">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>InfoXtractor &mdash; Recent jobs</title>
<link
rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/@picocss/pico@2/css/pico.min.css"
/>
<style>
main { padding-top: 1.5rem; padding-bottom: 4rem; }
nav.ix-header {
display: flex; gap: 1rem; align-items: baseline;
padding: 0.6rem 0; border-bottom: 1px solid var(--pico-muted-border-color, #ddd);
margin-bottom: 1rem; flex-wrap: wrap;
}
nav.ix-header .brand { font-weight: 700; margin-right: auto; }
.breadcrumb {
font-size: 0.9rem; color: var(--pico-muted-color);
margin-bottom: 0.75rem;
}
.breadcrumb a { text-decoration: none; }
.filter-bar {
display: flex; flex-wrap: wrap; gap: 1rem; align-items: flex-end;
margin-bottom: 1rem;
}
.filter-bar fieldset { margin: 0; padding: 0; border: none; }
.filter-bar label.inline { display: inline-flex; gap: 0.3rem; align-items: center; margin-right: 0.8rem; font-weight: normal; }
.counter { color: var(--pico-muted-color); margin-bottom: 0.5rem; }
table.jobs-table { width: 100%; font-size: 0.92rem; }
table.jobs-table th { white-space: nowrap; }
table.jobs-table td { vertical-align: middle; }
td.col-created small { color: var(--pico-muted-color); display: block; }
.status-badge {
display: inline-block; padding: 0.1rem 0.55rem;
border-radius: 0.8rem; font-size: 0.78rem; font-weight: 600;
text-transform: uppercase; letter-spacing: 0.04em;
}
.status-done { background: #d1f4dc; color: #1a6d35; }
.status-error { background: #fadadd; color: #8a1d2b; }
.status-pending, .status-running { background: #fff1c2; color: #805600; }
.pagination {
display: flex; gap: 0.75rem; margin-top: 1rem;
align-items: center; flex-wrap: wrap;
}
.empty-note { color: var(--pico-muted-color); font-style: italic; }
td.col-filename code { font-size: 0.9em; word-break: break-all; }
</style>
</head>
<body>
<main class="container">
<nav class="ix-header" aria-label="InfoXtractor navigation">
<span class="brand">InfoXtractor</span>
<a href="/ui">Upload a new extraction</a>
<a href="/ui/jobs">Recent jobs</a>
</nav>
<p class="breadcrumb">
<a href="/ui">Home</a> &rsaquo; Jobs
</p>
<hgroup>
<h1>Recent jobs</h1>
<p>All submitted extractions, newest first.</p>
</hgroup>
<form class="filter-bar" method="get" action="/ui/jobs">
<fieldset>
<legend><small>Status</small></legend>
{% for s in valid_statuses %}
<label class="inline">
<input
type="checkbox"
name="status"
value="{{ s }}"
{% if s in status_filter %}checked{% endif %}
/>
{{ s }}
</label>
{% endfor %}
</fieldset>
<label>
Client id
<input
type="text"
name="client_id"
value="{{ client_filter }}"
placeholder="e.g. ui, mammon"
/>
</label>
<label>
Page size
<input
type="number"
name="limit"
min="1"
max="200"
value="{{ limit }}"
/>
</label>
<button type="submit">Apply</button>
</form>
<p class="counter">
Showing {{ shown }} of {{ total }} job{% if total != 1 %}s{% endif %}.
</p>
{% if rows %}
<figure>
<table class="jobs-table" role="grid">
<thead>
<tr>
<th>Status</th>
<th>Filename</th>
<th>Use case</th>
<th>Client</th>
<th>Submitted</th>
<th>Elapsed</th>
<th></th>
</tr>
</thead>
<tbody>
{% for row in rows %}
<tr>
<td>
<span class="status-badge status-{{ row.status }}">{{ row.status }}</span>
</td>
<td class="col-filename"><code>{{ row.display_name }}</code></td>
<td>{{ row.use_case }}</td>
<td>{{ row.client_id }}</td>
<td class="col-created">
{{ row.created_at }}
<small>{{ row.created_delta }}</small>
</td>
<td>{{ row.elapsed }}</td>
<td>
<a href="/ui/jobs/{{ row.job_id }}">open &rsaquo;</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</figure>
{% else %}
<p class="empty-note">No jobs match the current filters.</p>
{% endif %}
<div class="pagination">
{% if prev_link %}
<a href="{{ prev_link }}" role="button" class="secondary outline">&laquo; Prev</a>
{% else %}
<span aria-disabled="true" class="secondary outline" role="button" style="opacity: 0.4;">&laquo; Prev</span>
{% endif %}
<span class="counter">Offset {{ offset }}</span>
{% if next_link %}
<a href="{{ next_link }}" role="button" class="secondary outline">Next &raquo;</a>
{% else %}
<span aria-disabled="true" class="secondary outline" role="button" style="opacity: 0.4;">Next &raquo;</span>
{% endif %}
</div>
</main>
</body>
</html>

View file

@ -341,6 +341,117 @@ async def test_sweep_orphans_leaves_fresh_running_alone(
assert after.status == "running"
async def test_queue_position_pending_only(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Three pending rows in insertion order → positions 0, 1, 2; total 3.
Each row is committed in its own transaction so the DB stamps a
distinct ``created_at`` per row (``now()`` is transaction-stable).
"""
async with session_factory() as session:
a = await jobs_repo.insert_pending(
session, _make_request("c", "qp-a"), callback_url=None
)
await session.commit()
async with session_factory() as session:
b = await jobs_repo.insert_pending(
session, _make_request("c", "qp-b"), callback_url=None
)
await session.commit()
async with session_factory() as session:
c = await jobs_repo.insert_pending(
session, _make_request("c", "qp-c"), callback_url=None
)
await session.commit()
async with session_factory() as session:
pa = await jobs_repo.queue_position(session, a.job_id)
pb = await jobs_repo.queue_position(session, b.job_id)
pc = await jobs_repo.queue_position(session, c.job_id)
# All three active; total == 3.
assert pa == (0, 3)
assert pb == (1, 3)
assert pc == (2, 3)
async def test_queue_position_running_plus_pending(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""One running + two pending → running:(0,3), next:(1,3), last:(2,3)."""
async with session_factory() as session:
first = await jobs_repo.insert_pending(
session, _make_request("c", "qp-r-1"), callback_url=None
)
await session.commit()
async with session_factory() as session:
second = await jobs_repo.insert_pending(
session, _make_request("c", "qp-r-2"), callback_url=None
)
await session.commit()
async with session_factory() as session:
third = await jobs_repo.insert_pending(
session, _make_request("c", "qp-r-3"), callback_url=None
)
await session.commit()
# Claim the first → it becomes running.
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
await session.commit()
assert claimed is not None
assert claimed.job_id == first.job_id
async with session_factory() as session:
p_running = await jobs_repo.queue_position(session, first.job_id)
p_second = await jobs_repo.queue_position(session, second.job_id)
p_third = await jobs_repo.queue_position(session, third.job_id)
# Running row reports 0 ahead (itself is the head).
assert p_running == (0, 3)
# Second pending: running is ahead (1) + zero older pendings.
assert p_second == (1, 3)
# Third pending: running ahead + one older pending.
assert p_third == (2, 3)
async def test_queue_position_terminal_returns_zero_zero(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Finished jobs have no queue position — always (0, 0)."""
async with session_factory() as session:
inserted = await jobs_repo.insert_pending(
session, _make_request("c", "qp-term"), callback_url=None
)
await session.commit()
response = ResponseIX(
use_case="bank_statement_header",
ix_client_id="c",
request_id="qp-term",
)
async with session_factory() as session:
await jobs_repo.mark_done(session, inserted.job_id, response)
await session.commit()
async with session_factory() as session:
pos = await jobs_repo.queue_position(session, inserted.job_id)
assert pos == (0, 0)
async def test_queue_position_unknown_id_returns_zero_zero(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
pos = await jobs_repo.queue_position(session, uuid4())
assert pos == (0, 0)
async def test_concurrent_claim_never_double_dispatches(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
@ -365,3 +476,204 @@ async def test_concurrent_claim_never_double_dispatches(
non_null = [r for r in results if r is not None]
# Every inserted id appears at most once.
assert sorted(non_null) == sorted(ids)
# ---------- list_recent ---------------------------------------------------
#
# The UI's ``/ui/jobs`` page needs a paginated, filterable view of recent
# jobs. We keep the contract intentionally small: list_recent returns
# ``(jobs, total)`` — ``total`` is the count after filters but before
# limit/offset — so the template can render "Showing N of M".
async def test_list_recent_empty_db(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(session, limit=50, offset=0)
assert jobs == []
assert total == 0
async def test_list_recent_orders_newest_first(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
ids: list[UUID] = []
for i in range(3):
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, _make_request("c", f"lr-{i}"), callback_url=None
)
await session.commit()
ids.append(job.job_id)
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(session, limit=50, offset=0)
assert total == 3
# Newest first → reverse of insertion order.
assert [j.job_id for j in jobs] == list(reversed(ids))
async def test_list_recent_status_single_filter(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
# Two pending, one done.
async with session_factory() as session:
for i in range(3):
await jobs_repo.insert_pending(
session, _make_request("c", f"sf-{i}"), callback_url=None
)
await session.commit()
async with session_factory() as session:
claimed = await jobs_repo.claim_next_pending(session)
assert claimed is not None
await jobs_repo.mark_done(
session,
claimed.job_id,
ResponseIX(
use_case="bank_statement_header",
ix_client_id="c",
request_id=claimed.request_id,
),
)
await session.commit()
async with session_factory() as session:
done_jobs, done_total = await jobs_repo.list_recent(
session, limit=50, offset=0, status="done"
)
assert done_total == 1
assert len(done_jobs) == 1
assert done_jobs[0].status == "done"
async with session_factory() as session:
pending_jobs, pending_total = await jobs_repo.list_recent(
session, limit=50, offset=0, status="pending"
)
assert pending_total == 2
assert all(j.status == "pending" for j in pending_jobs)
async def test_list_recent_status_iterable_filter(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
# Two pending, one done, one errored.
async with session_factory() as session:
for i in range(4):
await jobs_repo.insert_pending(
session, _make_request("c", f"if-{i}"), callback_url=None
)
await session.commit()
async with session_factory() as session:
a = await jobs_repo.claim_next_pending(session)
assert a is not None
await jobs_repo.mark_done(
session,
a.job_id,
ResponseIX(
use_case="bank_statement_header",
ix_client_id="c",
request_id=a.request_id,
),
)
await session.commit()
async with session_factory() as session:
b = await jobs_repo.claim_next_pending(session)
assert b is not None
await jobs_repo.mark_error(session, b.job_id, ResponseIX(error="boom"))
await session.commit()
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(
session, limit=50, offset=0, status=["done", "error"]
)
assert total == 2
assert {j.status for j in jobs} == {"done", "error"}
async def test_list_recent_client_id_filter(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
await jobs_repo.insert_pending(
session, _make_request("alpha", "a-1"), callback_url=None
)
await jobs_repo.insert_pending(
session, _make_request("beta", "b-1"), callback_url=None
)
await jobs_repo.insert_pending(
session, _make_request("alpha", "a-2"), callback_url=None
)
await session.commit()
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(
session, limit=50, offset=0, client_id="alpha"
)
assert total == 2
assert all(j.client_id == "alpha" for j in jobs)
async def test_list_recent_pagination(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
ids: list[UUID] = []
for i in range(7):
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, _make_request("c", f"pg-{i}"), callback_url=None
)
await session.commit()
ids.append(job.job_id)
async with session_factory() as session:
page1, total1 = await jobs_repo.list_recent(
session, limit=3, offset=0
)
assert total1 == 7
assert len(page1) == 3
# Newest three are the last three inserted.
assert [j.job_id for j in page1] == list(reversed(ids[-3:]))
async with session_factory() as session:
page2, total2 = await jobs_repo.list_recent(
session, limit=3, offset=3
)
assert total2 == 7
assert len(page2) == 3
expected = list(reversed(ids))[3:6]
assert [j.job_id for j in page2] == expected
async with session_factory() as session:
page3, total3 = await jobs_repo.list_recent(
session, limit=3, offset=6
)
assert total3 == 7
assert len(page3) == 1
assert page3[0].job_id == ids[0]
async def test_list_recent_caps_limit(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""limit is capped at 200 — asking for 9999 gets clamped."""
async with session_factory() as session:
jobs, total = await jobs_repo.list_recent(
session, limit=9999, offset=0
)
assert total == 0
assert jobs == []
async def test_list_recent_rejects_negative_offset(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
async with session_factory() as session:
import pytest as _pytest
with _pytest.raises(ValueError):
await jobs_repo.list_recent(session, limit=50, offset=-1)

View file

@ -18,7 +18,7 @@ from __future__ import annotations
import json
from collections.abc import Iterator
from pathlib import Path
from uuid import uuid4
from uuid import UUID, uuid4
import pytest
from fastapi.testclient import TestClient
@ -205,6 +205,38 @@ class TestSubmitJobCustom:
)
class TestDisplayName:
def test_post_persists_display_name_in_file_ref(
self,
app: TestClient,
postgres_url: str,
) -> None:
"""The client-provided upload filename lands in FileRef.display_name."""
request_id = f"ui-name-{uuid4().hex[:8]}"
with FIXTURE_PDF.open("rb") as fh:
resp = app.post(
"/ui/jobs",
data={
"use_case_mode": "registered",
"use_case_name": "bank_statement_header",
"ix_client_id": "ui-test",
"request_id": request_id,
},
files={
"pdf": ("my statement.pdf", fh, "application/pdf")
},
follow_redirects=False,
)
assert resp.status_code in (200, 303), resp.text
job_row = _find_job(postgres_url, "ui-test", request_id)
assert job_row is not None
entry = job_row.request["context"]["files"][0]
assert isinstance(entry, dict)
assert entry["display_name"] == "my statement.pdf"
class TestFragment:
def test_fragment_pending_has_trigger(
self,
@ -234,6 +266,117 @@ class TestFragment:
assert "hx-trigger" in body
assert "2s" in body
assert "pending" in body.lower() or "running" in body.lower()
# New queue-awareness copy.
assert "Queue position" in body or "About to start" in body
def test_fragment_pending_shows_filename(
self,
app: TestClient,
postgres_url: str,
) -> None:
request_id = f"ui-frag-pf-{uuid4().hex[:8]}"
with FIXTURE_PDF.open("rb") as fh:
app.post(
"/ui/jobs",
data={
"use_case_mode": "registered",
"use_case_name": "bank_statement_header",
"ix_client_id": "ui-test",
"request_id": request_id,
},
files={
"pdf": (
"client-side-name.pdf",
fh,
"application/pdf",
)
},
follow_redirects=False,
)
job_row = _find_job(postgres_url, "ui-test", request_id)
assert job_row is not None
resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment")
assert resp.status_code == 200
assert "client-side-name.pdf" in resp.text
def test_fragment_running_shows_elapsed(
self,
app: TestClient,
postgres_url: str,
) -> None:
"""After flipping a row to running with a backdated started_at, the
fragment renders a ``Running for MM:SS`` line."""
request_id = f"ui-frag-r-{uuid4().hex[:8]}"
with FIXTURE_PDF.open("rb") as fh:
app.post(
"/ui/jobs",
data={
"use_case_mode": "registered",
"use_case_name": "bank_statement_header",
"ix_client_id": "ui-test",
"request_id": request_id,
},
files={"pdf": ("sample.pdf", fh, "application/pdf")},
follow_redirects=False,
)
job_row = _find_job(postgres_url, "ui-test", request_id)
assert job_row is not None
_force_running(postgres_url, job_row.job_id)
resp = app.get(f"/ui/jobs/{job_row.job_id}/fragment")
assert resp.status_code == 200
body = resp.text
assert "Running for" in body
# MM:SS; our backdate is ~10s so expect 00:1? or higher.
import re
assert re.search(r"\d{2}:\d{2}", body), body
def test_fragment_backward_compat_no_display_name(
self,
app: TestClient,
postgres_url: str,
) -> None:
"""Older rows (stored before display_name existed) must still render."""
from ix.contracts.request import Context, FileRef, RequestIX
legacy_req = RequestIX(
use_case="bank_statement_header",
ix_client_id="ui-test",
request_id=f"ui-legacy-{uuid4().hex[:8]}",
context=Context(
files=[
FileRef(url="file:///tmp/ix/ui/legacy.pdf")
]
),
)
import asyncio
from ix.store import jobs_repo as _repo
async def _insert() -> UUID:
eng = create_async_engine(postgres_url)
sf = async_sessionmaker(eng, expire_on_commit=False)
try:
async with sf() as session:
job = await _repo.insert_pending(
session, legacy_req, callback_url=None
)
await session.commit()
return job.job_id
finally:
await eng.dispose()
job_id = asyncio.run(_insert())
resp = app.get(f"/ui/jobs/{job_id}/fragment")
assert resp.status_code == 200
body = resp.text
# Must not crash; must include the fallback basename from the URL.
assert "legacy.pdf" in body
def test_fragment_done_shows_pretty_json(
self,
@ -250,7 +393,13 @@ class TestFragment:
"ix_client_id": "ui-test",
"request_id": request_id,
},
files={"pdf": ("sample.pdf", fh, "application/pdf")},
files={
"pdf": (
"my-done-doc.pdf",
fh,
"application/pdf",
)
},
follow_redirects=False,
)
job_row = _find_job(postgres_url, "ui-test", request_id)
@ -274,6 +423,263 @@ class TestFragment:
# JSON present.
assert "UBS AG" in body
assert "CHF" in body
# Filename surfaced on the done fragment.
assert "my-done-doc.pdf" in body
class TestJobsListPage:
"""Tests for the ``GET /ui/jobs`` listing page (feat/ui-jobs-list)."""
def _submit(
self,
app: TestClient,
client_id: str,
request_id: str,
filename: str = "sample.pdf",
) -> None:
with FIXTURE_PDF.open("rb") as fh:
app.post(
"/ui/jobs",
data={
"use_case_mode": "registered",
"use_case_name": "bank_statement_header",
"ix_client_id": client_id,
"request_id": request_id,
},
files={"pdf": (filename, fh, "application/pdf")},
follow_redirects=False,
)
def test_jobs_list_returns_html(
self,
app: TestClient,
postgres_url: str,
) -> None:
for i in range(3):
self._submit(
app,
"ui-list",
f"lp-{uuid4().hex[:6]}-{i}",
filename=f"doc-{i}.pdf",
)
resp = app.get("/ui/jobs")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
body = resp.text
# Breadcrumb / header shows "Jobs".
assert "Jobs" in body
# display_name surfaces for each row.
for i in range(3):
assert f"doc-{i}.pdf" in body
# Showing N of M counter present.
assert "Showing" in body
assert "of" in body
def test_jobs_list_links_to_job_detail(
self,
app: TestClient,
postgres_url: str,
) -> None:
rid = f"lp-link-{uuid4().hex[:6]}"
self._submit(app, "ui-list", rid)
row = _find_job(postgres_url, "ui-list", rid)
assert row is not None
resp = app.get("/ui/jobs")
assert resp.status_code == 200
assert f"/ui/jobs/{row.job_id}" in resp.text
def test_jobs_list_status_filter_single(
self,
app: TestClient,
postgres_url: str,
) -> None:
# Create two jobs, flip one to done.
rid_pending = f"lp-p-{uuid4().hex[:6]}"
rid_done = f"lp-d-{uuid4().hex[:6]}"
self._submit(app, "ui-filt", rid_pending, filename="pending-doc.pdf")
self._submit(app, "ui-filt", rid_done, filename="done-doc.pdf")
done_row = _find_job(postgres_url, "ui-filt", rid_done)
assert done_row is not None
_force_done(
postgres_url,
done_row.job_id,
response_body={"use_case": "bank_statement_header"},
)
# ?status=done → only done row shown.
resp = app.get("/ui/jobs?status=done")
assert resp.status_code == 200
assert "done-doc.pdf" in resp.text
assert "pending-doc.pdf" not in resp.text
def test_jobs_list_status_filter_multi(
self,
app: TestClient,
postgres_url: str,
) -> None:
rid_p = f"lp-mp-{uuid4().hex[:6]}"
rid_d = f"lp-md-{uuid4().hex[:6]}"
rid_e = f"lp-me-{uuid4().hex[:6]}"
self._submit(app, "ui-multi", rid_p, filename="pending-m.pdf")
self._submit(app, "ui-multi", rid_d, filename="done-m.pdf")
self._submit(app, "ui-multi", rid_e, filename="error-m.pdf")
done_row = _find_job(postgres_url, "ui-multi", rid_d)
err_row = _find_job(postgres_url, "ui-multi", rid_e)
assert done_row is not None and err_row is not None
_force_done(
postgres_url,
done_row.job_id,
response_body={"use_case": "bank_statement_header"},
)
_force_error(postgres_url, err_row.job_id)
resp = app.get("/ui/jobs?status=done&status=error")
assert resp.status_code == 200
body = resp.text
assert "done-m.pdf" in body
assert "error-m.pdf" in body
assert "pending-m.pdf" not in body
def test_jobs_list_client_id_filter(
self,
app: TestClient,
postgres_url: str,
) -> None:
rid_a = f"lp-a-{uuid4().hex[:6]}"
rid_b = f"lp-b-{uuid4().hex[:6]}"
self._submit(app, "client-alpha", rid_a, filename="alpha.pdf")
self._submit(app, "client-beta", rid_b, filename="beta.pdf")
resp = app.get("/ui/jobs?client_id=client-alpha")
assert resp.status_code == 200
body = resp.text
assert "alpha.pdf" in body
assert "beta.pdf" not in body
def test_jobs_list_pagination(
self,
app: TestClient,
postgres_url: str,
) -> None:
rids = []
for i in range(7):
rid = f"lp-pg-{uuid4().hex[:6]}-{i}"
rids.append(rid)
self._submit(app, "ui-pg", rid, filename=f"pg-{i}.pdf")
resp_p1 = app.get("/ui/jobs?limit=5&offset=0&client_id=ui-pg")
assert resp_p1.status_code == 200
body_p1 = resp_p1.text
# Newest-first: last 5 uploaded are pg-6..pg-2.
for i in (2, 3, 4, 5, 6):
assert f"pg-{i}.pdf" in body_p1
assert "pg-1.pdf" not in body_p1
assert "pg-0.pdf" not in body_p1
resp_p2 = app.get("/ui/jobs?limit=5&offset=5&client_id=ui-pg")
assert resp_p2.status_code == 200
body_p2 = resp_p2.text
assert "pg-1.pdf" in body_p2
assert "pg-0.pdf" in body_p2
# Showing 2 of 7 on page 2.
assert "of 7" in body_p2
def test_jobs_list_missing_display_name_falls_back_to_basename(
self,
app: TestClient,
postgres_url: str,
) -> None:
"""Legacy rows without display_name must still render via basename."""
from ix.contracts.request import Context, FileRef, RequestIX
legacy_req = RequestIX(
use_case="bank_statement_header",
ix_client_id="ui-legacy",
request_id=f"lp-legacy-{uuid4().hex[:6]}",
context=Context(
files=[FileRef(url="file:///tmp/ix/ui/listing-legacy.pdf")]
),
)
import asyncio
from ix.store import jobs_repo as _repo
async def _insert() -> UUID:
eng = create_async_engine(postgres_url)
sf = async_sessionmaker(eng, expire_on_commit=False)
try:
async with sf() as session:
job = await _repo.insert_pending(
session, legacy_req, callback_url=None
)
await session.commit()
return job.job_id
finally:
await eng.dispose()
asyncio.run(_insert())
resp = app.get("/ui/jobs?client_id=ui-legacy")
assert resp.status_code == 200
assert "listing-legacy.pdf" in resp.text
def test_jobs_list_header_link_from_index(
self,
app: TestClient,
) -> None:
resp = app.get("/ui")
assert resp.status_code == 200
assert 'href="/ui/jobs"' in resp.text
def test_jobs_list_header_link_from_detail(
self,
app: TestClient,
postgres_url: str,
) -> None:
rid = f"lp-hd-{uuid4().hex[:6]}"
self._submit(app, "ui-hd", rid)
row = _find_job(postgres_url, "ui-hd", rid)
assert row is not None
resp = app.get(f"/ui/jobs/{row.job_id}")
assert resp.status_code == 200
assert 'href="/ui/jobs"' in resp.text
def _force_error(
postgres_url: str,
job_id, # type: ignore[no-untyped-def]
) -> None:
"""Flip a pending/running job to ``error`` with a canned error body."""
import asyncio
from datetime import UTC, datetime
from sqlalchemy import text
async def _go(): # type: ignore[no-untyped-def]
eng = create_async_engine(postgres_url)
try:
async with eng.begin() as conn:
await conn.execute(
text(
"UPDATE ix_jobs SET status='error', "
"response=CAST(:resp AS JSONB), finished_at=:now "
"WHERE job_id=:jid"
),
{
"resp": json.dumps({"error": "IX_002_000: forced"}),
"now": datetime.now(UTC),
"jid": str(job_id),
},
)
finally:
await eng.dispose()
asyncio.run(_go())
def _find_job(postgres_url: str, client_id: str, request_id: str): # type: ignore[no-untyped-def]
@ -348,3 +754,39 @@ def _force_done(
await eng.dispose()
asyncio.run(_go())
def _force_running(
postgres_url: str,
job_id, # type: ignore[no-untyped-def]
seconds_ago: int = 10,
) -> None:
"""Flip a pending job to ``running`` with a backdated ``started_at``.
The fragment renders "Running for MM:SS" which needs a ``started_at`` in
the past; 10s is enough to produce a deterministic non-zero MM:SS.
"""
import asyncio
from datetime import UTC, datetime, timedelta
from sqlalchemy import text
async def _go(): # type: ignore[no-untyped-def]
eng = create_async_engine(postgres_url)
try:
async with eng.begin() as conn:
await conn.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{
"t": datetime.now(UTC) - timedelta(seconds=seconds_ago),
"jid": str(job_id),
},
)
finally:
await eng.dispose()
asyncio.run(_go())

View file

@ -50,6 +50,24 @@ class TestFileRef:
assert fr.headers == {"Authorization": "Token abc"}
assert fr.max_bytes == 1_000_000
def test_display_name_defaults_to_none(self) -> None:
fr = FileRef(url="file:///tmp/ix/ui/abc.pdf")
assert fr.display_name is None
def test_display_name_roundtrip(self) -> None:
fr = FileRef(
url="file:///tmp/ix/ui/abc.pdf",
display_name="my statement.pdf",
)
assert fr.display_name == "my statement.pdf"
dumped = fr.model_dump_json()
rt = FileRef.model_validate_json(dumped)
assert rt.display_name == "my statement.pdf"
# Backward-compat: a serialised FileRef without display_name still
# validates cleanly (older stored jobs predate the field).
legacy = FileRef.model_validate({"url": "file:///x.pdf"})
assert legacy.display_name is None
class TestOptionDefaults:
def test_ocr_defaults_match_spec(self) -> None:

View file

@ -161,6 +161,78 @@ def _write_tiny_png(path: Path) -> None:
Image.new("RGB", (2, 2), color="white").save(path, format="PNG")
class TestGpuAvailableFlag:
def test_default_is_none(self) -> None:
client = SuryaOCRClient()
assert client.gpu_available is None
def test_warm_up_probes_cuda_true(self) -> None:
"""When torch reports CUDA, warm_up records True on the instance."""
client = SuryaOCRClient()
fake_foundation = MagicMock()
fake_recognition = MagicMock()
fake_detection = MagicMock()
fake_torch = SimpleNamespace(
cuda=SimpleNamespace(is_available=lambda: True)
)
module_patches = {
"surya.detection": SimpleNamespace(
DetectionPredictor=lambda: fake_detection
),
"surya.foundation": SimpleNamespace(
FoundationPredictor=lambda: fake_foundation
),
"surya.recognition": SimpleNamespace(
RecognitionPredictor=lambda _f: fake_recognition
),
"torch": fake_torch,
}
with patch.dict("sys.modules", module_patches):
client.warm_up()
assert client.gpu_available is True
assert client._recognition_predictor is fake_recognition
assert client._detection_predictor is fake_detection
def test_warm_up_probes_cuda_false(self) -> None:
"""CPU-mode host → warm_up records False."""
client = SuryaOCRClient()
fake_torch = SimpleNamespace(
cuda=SimpleNamespace(is_available=lambda: False)
)
module_patches = {
"surya.detection": SimpleNamespace(
DetectionPredictor=lambda: MagicMock()
),
"surya.foundation": SimpleNamespace(
FoundationPredictor=lambda: MagicMock()
),
"surya.recognition": SimpleNamespace(
RecognitionPredictor=lambda _f: MagicMock()
),
"torch": fake_torch,
}
with patch.dict("sys.modules", module_patches):
client.warm_up()
assert client.gpu_available is False
def test_warm_up_is_idempotent_for_probe(self) -> None:
"""Second warm_up short-circuits; probed flag is preserved."""
client = SuryaOCRClient()
client._recognition_predictor = MagicMock()
client._detection_predictor = MagicMock()
client.gpu_available = True
# No module patches — warm_up must NOT touch sys.modules or torch.
client.warm_up()
assert client.gpu_available is True
@pytest.mark.parametrize("unused", [None]) # keep pytest happy if file ever runs alone
def test_module_imports(unused: None) -> None:
assert SuryaOCRClient is not None