infoxtractor/src/ix/store/jobs_repo.py
Dirk Riemann 141153ffa7
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
feat(store): JobsRepo CRUD over ix_jobs + integration fixtures (spec §4)
JobsRepo covers the full job-lifecycle surface:

- insert_pending: idempotent on (client_id, request_id) via ON CONFLICT
  DO NOTHING + re-select; assigns a 16-hex ix_id.
- claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never
  double-dispatch a row.
- get / get_by_correlation: hydrates JSONB back through Pydantic.
- mark_done: done iff response.error is None, else error.
- mark_error: explicit convenience wrapper.
- update_callback_status: delivered | failed (no status transition).
- sweep_orphans: time-based rescue of stuck running rows; attempts++.

Integration fixtures (tests/integration/conftest.py):
- Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is
  set (unit suite stays runnable on a bare laptop).
- Alembic upgrade/downgrade runs in a subprocess so its internal
  asyncio.run() doesn't collide with pytest-asyncio's loop.
- Per-test engine + truncate so loops never cross and tests start clean.

15 integration tests against a live postgres:16, including SKIP LOCKED
concurrency + orphan sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:43:11 +02:00

274 lines
8.7 KiB
Python

"""Async CRUD over ``ix_jobs`` — the one module the worker / REST touches.
Every method takes an :class:`AsyncSession` (caller-owned transaction). The
caller commits. We don't manage transactions inside repo methods because the
worker sometimes needs to claim + run-pipeline + mark-done inside one
long-running unit of work, and an inside-the-method commit would break that.
A few invariants worth stating up front:
* ``ix_id`` is a 16-char hex string assigned by :func:`insert_pending` on
first insert. Callers MUST NOT pass one (we generate it); if a
``RequestIX`` arrives with ``ix_id`` set it is ignored.
* ``(client_id, request_id)`` is unique — on collision we return the
existing row unchanged. Callback URLs on the second insert are ignored;
the first insert's metadata wins.
* Claim uses ``FOR UPDATE SKIP LOCKED`` so concurrent workers never pick the
same row, and a session holding a lock doesn't block a sibling claimer.
* Status transitions: ``pending → running → (done | error)``. The sweeper is
the only path back to ``pending`` (and only from ``running``); terminal
states are stable.
"""
from __future__ import annotations
import secrets
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal
from uuid import UUID, uuid4
from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from ix.contracts.job import Job
from ix.contracts.request import RequestIX
from ix.contracts.response import ResponseIX
from ix.store.models import IxJob
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession
def _new_ix_id() -> str:
"""Transport-assigned 16-hex handle.
``secrets.token_hex(8)`` gives 16 characters of entropy; good enough to
tag logs per spec §3 without collision risk across the lifetime of the
service.
"""
return secrets.token_hex(8)
def _orm_to_job(row: IxJob) -> Job:
"""Round-trip ORM row back through the Pydantic ``Job`` contract.
The JSONB columns come out as plain dicts; we let Pydantic re-validate
them into :class:`RequestIX` / :class:`ResponseIX`. Catching validation
errors here would mask real bugs; we let them surface.
"""
return Job(
job_id=row.job_id,
ix_id=row.ix_id,
client_id=row.client_id,
request_id=row.request_id,
status=row.status, # type: ignore[arg-type]
request=RequestIX.model_validate(row.request),
response=(
ResponseIX.model_validate(row.response) if row.response is not None else None
),
callback_url=row.callback_url,
callback_status=row.callback_status, # type: ignore[arg-type]
attempts=row.attempts,
created_at=row.created_at,
started_at=row.started_at,
finished_at=row.finished_at,
)
async def insert_pending(
session: AsyncSession,
request: RequestIX,
callback_url: str | None,
) -> Job:
"""Insert a pending row; return the new or existing :class:`Job`.
Uses ``INSERT ... ON CONFLICT DO NOTHING`` on the
``(client_id, request_id)`` unique index, then re-selects. If the insert
was a no-op the existing row is returned verbatim (status / callback_url
unchanged) — callers rely on this for idempotent resubmission.
"""
ix_id = request.ix_id or _new_ix_id()
job_id = uuid4()
# Serialise the request through Pydantic so JSONB gets plain JSON types,
# not datetime / Decimal instances asyncpg would reject.
request_json = request.model_copy(update={"ix_id": ix_id}).model_dump(
mode="json"
)
stmt = (
pg_insert(IxJob)
.values(
job_id=job_id,
ix_id=ix_id,
client_id=request.ix_client_id,
request_id=request.request_id,
status="pending",
request=request_json,
response=None,
callback_url=callback_url,
callback_status=None,
attempts=0,
)
.on_conflict_do_nothing(index_elements=["client_id", "request_id"])
)
await session.execute(stmt)
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == request.ix_client_id,
IxJob.request_id == request.request_id,
)
)
assert row is not None, "insert_pending: row missing after upsert"
return _orm_to_job(row)
async def claim_next_pending(session: AsyncSession) -> Job | None:
"""Atomically pick the oldest pending row and flip it to running.
``FOR UPDATE SKIP LOCKED`` means a sibling worker can never deadlock on
our row; they'll skip past it and grab the next pending entry. The
sibling test in :mod:`tests/integration/test_jobs_repo` asserts this.
"""
stmt = (
select(IxJob)
.where(IxJob.status == "pending")
.order_by(IxJob.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
row = await session.scalar(stmt)
if row is None:
return None
row.status = "running"
row.started_at = datetime.now(UTC)
await session.flush()
return _orm_to_job(row)
async def get(session: AsyncSession, job_id: UUID) -> Job | None:
row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id))
return _orm_to_job(row) if row is not None else None
async def get_by_correlation(
session: AsyncSession, client_id: str, request_id: str
) -> Job | None:
row = await session.scalar(
select(IxJob).where(
IxJob.client_id == client_id,
IxJob.request_id == request_id,
)
)
return _orm_to_job(row) if row is not None else None
async def mark_done(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Write the pipeline's response and move to terminal state.
Status is ``done`` iff ``response.error is None``; any non-None error
flips us to ``error``. Spec §3 lifecycle invariant.
"""
status = "done" if response.error is None else "error"
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status=status,
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def mark_error(
session: AsyncSession, job_id: UUID, response: ResponseIX
) -> None:
"""Convenience wrapper that always writes status='error'.
Separate from :func:`mark_done` for readability at call sites: when the
worker knows it caught an exception the pipeline didn't handle itself,
``mark_error`` signals intent even if the response body happens to have
a populated error field.
"""
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(
status="error",
response=response.model_dump(mode="json"),
finished_at=datetime.now(UTC),
)
)
async def update_callback_status(
session: AsyncSession,
job_id: UUID,
status: Literal["delivered", "failed"],
) -> None:
await session.execute(
update(IxJob)
.where(IxJob.job_id == job_id)
.values(callback_status=status)
)
async def sweep_orphans(
session: AsyncSession,
now: datetime,
max_running_seconds: int,
) -> list[UUID]:
"""Reset stale ``running`` rows back to ``pending`` and bump ``attempts``.
Called once at worker startup (spec §3) to rescue jobs whose owner died
mid-pipeline. The threshold is time-based on ``started_at`` so a still-
running worker never reclaims its own in-flight job — callers pass
``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS`` per spec.
"""
# Pick candidates and return their ids so the worker can log what it
# did. Two-step (SELECT then UPDATE) is clearer than RETURNING for
# callers who want the id list alongside a plain UPDATE.
candidates = (
await session.scalars(
select(IxJob.job_id).where(
IxJob.status == "running",
IxJob.started_at < now - _as_interval(max_running_seconds),
)
)
).all()
if not candidates:
return []
await session.execute(
update(IxJob)
.where(IxJob.job_id.in_(candidates))
.values(
status="pending",
started_at=None,
attempts=IxJob.attempts + 1,
)
)
return list(candidates)
def _as_interval(seconds: int): # type: ignore[no-untyped-def]
"""Return a SQL interval expression for ``seconds``.
We build the interval via ``func.make_interval`` so asyncpg doesn't have
to guess at a text-form cast — the server-side ``make_interval(secs :=)``
is unambiguous and avoids locale-dependent parsing.
"""
return func.make_interval(0, 0, 0, 0, 0, 0, seconds)