infoxtractor/src/ix/store/jobs_repo.py
Dirk Riemann 673dc60178
All checks were successful
tests / test (push) Successful in 1m27s
tests / test (pull_request) Successful in 1m17s
feat(ui): add /ui/jobs listing page with filters + pagination
* `JobsRepo.list_recent` — paginated, filterable view over ix_jobs,
  newest first, returning (jobs, total) so the template can render
  "Showing N of M".
* `GET /ui/jobs` — filter bar (multi status + client_id), prev/next
  pagination, links to `/ui/jobs/{id}` per row. Surfaces filename from
  `FileRef.display_name` with URL-basename fallback for legacy rows.
* Persistent nav header gets a "Recent jobs" link on both `/ui` and the
  per-job page so users can tab between submit and history.
* Integration tests cover: ordering, status/client filters (single +
  multi), pagination, legacy fallback, header links.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 22:28:10 +02:00

414 lines
13 KiB
Python

"""Async CRUD over ``ix_jobs`` — the one module the worker / REST touches.
Every method takes an :class:`AsyncSession` (caller-owned transaction). The
caller commits. We don't manage transactions inside repo methods because the
worker sometimes needs to claim + run-pipeline + mark-done inside one
long-running unit of work, and an inside-the-method commit would break that.
A few invariants worth stating up front:
* ``ix_id`` is a 16-char hex string assigned by :func:`insert_pending` on
first insert. Callers MUST NOT pass one (we generate it); if a
``RequestIX`` arrives with ``ix_id`` set it is ignored.
* ``(client_id, request_id)`` is unique — on collision we return the
existing row unchanged. Callback URLs on the second insert are ignored;
the first insert's metadata wins.
* Claim uses ``FOR UPDATE SKIP LOCKED`` so concurrent workers never pick the
same row, and a session holding a lock doesn't block a sibling claimer.
* Status transitions: ``pending → running → (done | error)``. The sweeper is
the only path back to ``pending`` (and only from ``running``); terminal
states are stable.
"""
from __future__ import annotations
import secrets
from collections.abc import Iterable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
from uuid import UUID, uuid4
from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.contracts.response import ResponseIX
from ix.store.models import IxJob
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
def _new_ix_id() -> str:
"""Transport-assigned 16-hex handle.
``secrets.token_hex(8)`` gives 16 characters of entropy; good enough to
tag logs per spec §3 without collision risk across the lifetime of the
service.
"""
return secrets.token_hex(8)
def _orm_to_job(row: IxJob) -> Job:
"""Round-trip ORM row back through the Pydantic ``Job`` contract.
The JSONB columns come out as plain dicts; we let Pydantic re-validate
them into :class:`RequestIX` / :class:`ResponseIX`. Catching validation
errors here would mask real bugs; we let them surface.
"""
return Job(
job_id=row.job_id,
ix_id=row.ix_id,
client_id=row.client_id,
request_id=row.request_id,
status=row.status, # type: ignore[arg-type]
request=RequestIX.model_validate(row.request),
response=(
ResponseIX.model_validate(row.response) if row.response is not None else None
),
callback_url=row.callback_url,
callback_status=row.callback_status, # type: ignore[arg-type]
attempts=row.attempts,
created_at=row.created_at,
started_at=row.started_at,
finished_at=row.finished_at,
)
async def insert_pending(
session: AsyncSession,
request: RequestIX,
callback_url: str | None,
) -> Job:
"""Insert a pending row; return the new or existing :class:`Job`.
Uses ``INSERT ... ON CONFLICT DO NOTHING`` on the
``(client_id, request_id)`` unique index, then re-selects. If the insert
was a no-op the existing row is returned verbatim (status / callback_url
unchanged) — callers rely on this for idempotent resubmission.
"""
ix_id = request.ix_id or _new_ix_id()
job_id = uuid4()
# Serialise the request through Pydantic so JSONB gets plain JSON types,
# not datetime / Decimal instances asyncpg would reject.
request_json = request.model_copy(update={"ix_id": ix_id}).model_dump(
mode="json"
)
stmt = (
pg_insert(IxJob)
.values(
job_id=job_id,
ix_id=ix_id,
client_id=request.ix_client_id,
request_id=request.request_id,
status="pending",
request=request_json,
response=None,
callback_url=callback_url,
callback_status=None,
attempts=0,
)
.on_conflict_do_nothing(index_elements=["client_id", "request_id"])
)
await session.execute(stmt)
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == request.ix_client_id,
IxJob.request_id == request.request_id,
)
)
assert row is not None, "insert_pending: row missing after upsert"
return _orm_to_job(row)
async def claim_next_pending(session: AsyncSession) -> Job | None:
"""Atomically pick the oldest pending row and flip it to running.
``FOR UPDATE SKIP LOCKED`` means a sibling worker can never deadlock on
our row; they'll skip past it and grab the next pending entry. The
sibling test in :mod:`tests/integration/test_jobs_repo` asserts this.
"""
stmt = (
select(IxJob)
.where(IxJob.status == "pending")
.order_by(IxJob.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
row = await session.scalar(stmt)
if row is None:
return None
row.status = "running"
row.started_at = datetime.now(UTC)
await session.flush()
return _orm_to_job(row)
async def get(session: AsyncSession, job_id: UUID) -> Job | None:
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
return _orm_to_job(row) if row is not None else None
async def queue_position(
session: AsyncSession, job_id: UUID
) -> tuple[int, int]:
"""Return ``(ahead, total_active)`` for a pending/running job.
``ahead`` counts active jobs (``pending`` or ``running``) that would be
claimed by the worker before this one:
* any ``running`` job is always ahead — it has the worker already.
* other ``pending`` jobs with a strictly older ``created_at`` are ahead
(the worker picks pending rows in ``ORDER BY created_at`` per
:func:`claim_next_pending`).
``total_active`` is the total count of ``pending`` + ``running`` rows.
Terminal jobs (``done`` / ``error``) always return ``(0, 0)`` — there is
no meaningful "position" for a finished job.
"""
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
if row is None:
return (0, 0)
if row.status not in ("pending", "running"):
return (0, 0)
total_active = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status.in_(("pending", "running")))
)
or 0
)
if row.status == "running":
# A running row is at the head of the queue for our purposes.
return (0, total_active)
# Pending: count running rows (always ahead) + older pending rows.
# We tiebreak on ``job_id`` for deterministic ordering when multiple
# rows share a ``created_at`` (e.g. the same transaction inserts two
# jobs, which Postgres stamps with identical ``now()`` values).
running_ahead = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(IxJob.status == "running")
)
or 0
)
pending_ahead = int(
await session.scalar(
select(func.count())
.select_from(IxJob)
.where(
IxJob.status == "pending",
(
(IxJob.created_at < row.created_at)
| (
(IxJob.created_at == row.created_at)
& (IxJob.job_id < row.job_id)
)
),
)
)
or 0
)
return (running_ahead + pending_ahead, total_active)
async def get_by_correlation(
session: AsyncSession, client_id: str, request_id: str
) -> Job | None:
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == client_id,
IxJob.request_id == request_id,
)
)
return _orm_to_job(row) if row is not None else None
async def mark_done(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Write the pipeline's response and move to terminal state.
Status is ``done`` iff ``response.error is None``; any non-None error
flips us to ``error``. Spec §3 lifecycle invariant.
"""
status = "done" if response.error is None else "error"
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status=status,
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def mark_error(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Convenience wrapper that always writes status='error'.
Separate from :func:`mark_done` for readability at call sites: when the
worker knows it caught an exception the pipeline didn't handle itself,
``mark_error`` signals intent even if the response body happens to have
a populated error field.
"""
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status="error",
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def update_callback_status(
session: AsyncSession,
job_id: UUID,
status: Literal["delivered", "failed"],
) -> None:
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(callback_status=status)
)
async def sweep_orphans(
session: AsyncSession,
now: datetime,
max_running_seconds: int,
) -> list[UUID]:
"""Reset stale ``running`` rows back to ``pending`` and bump ``attempts``.
Called once at worker startup (spec §3) to rescue jobs whose owner died
mid-pipeline. The threshold is time-based on ``started_at`` so a still-
running worker never reclaims its own in-flight job — callers pass
``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS`` per spec.
"""
# Pick candidates and return their ids so the worker can log what it
# did. Two-step (SELECT then UPDATE) is clearer than RETURNING for
# callers who want the id list alongside a plain UPDATE.
candidates = (
await session.scalars(
select(IxJob.job_id).where(
IxJob.status == "running",
IxJob.started_at < now - _as_interval(max_running_seconds),
)
)
).all()
if not candidates:
return []
await session.execute(
update(IxJob)
.where(IxJob.job_id.in_(candidates))
.values(
status="pending",
started_at=None,
attempts=IxJob.attempts + 1,
)
)
return list(candidates)
_LIST_RECENT_LIMIT_CAP = 200
async def list_recent(
session: AsyncSession,
*,
limit: int = 50,
offset: int = 0,
status: str | Iterable[str] | None = None,
client_id: str | None = None,
) -> tuple[list[Job], int]:
"""Return a page of recent jobs, newest first, plus total matching count.
Powers the ``/ui/jobs`` listing page. Ordering is ``created_at DESC``.
``total`` reflects matching rows *before* limit/offset so the template
can render "showing N of M".
Parameters
----------
limit:
Maximum rows to return. Capped at
:data:`_LIST_RECENT_LIMIT_CAP` (200) to bound the JSON payload
size — callers that pass a larger value get clamped silently.
offset:
Non-negative row offset. Negative values raise ``ValueError``
because the template treats offset as a page cursor; a negative
cursor is a bug at the call site, not something to paper over.
status:
If set, restrict to the given status(es). Accepts a single
:data:`Job.status` value or any iterable (list/tuple/set). Values
outside the lifecycle enum simply match nothing — we don't try
to validate here; the DB CHECK constraint already bounds the set.
client_id:
If set, exact match on :attr:`IxJob.client_id`. No substring /
prefix match — simple and predictable.
"""
if offset < 0:
raise ValueError(f"offset must be >= 0, got {offset}")
effective_limit = max(0, min(limit, _LIST_RECENT_LIMIT_CAP))
filters = []
if status is not None:
if isinstance(status, str):
filters.append(IxJob.status == status)
else:
status_list = list(status)
if not status_list:
# Empty iterable → no rows match. Return a sentinel
# IN-list that can never hit so we don't blow up.
filters.append(IxJob.status.in_(status_list))
else:
filters.append(IxJob.status.in_(status_list))
if client_id is not None:
filters.append(IxJob.client_id == client_id)
total_q = select(func.count()).select_from(IxJob)
list_q = select(IxJob).order_by(IxJob.created_at.desc())
for f in filters:
total_q = total_q.where(f)
list_q = list_q.where(f)
total = int(await session.scalar(total_q) or 0)
rows = (
await session.scalars(list_q.limit(effective_limit).offset(offset))
).all()
return [_orm_to_job(r) for r in rows], total
def _as_interval(seconds: int): # type: ignore[no-untyped-def]
"""Return a SQL interval expression for ``seconds``.
We build the interval via ``func.make_interval`` so asyncpg doesn't have
to guess at a text-form cast — the server-side ``make_interval(secs :=)``
is unambiguous and avoids locale-dependent parsing.
"""
return func.make_interval(0, 0, 0, 0, 0, 0, seconds)