From 141153ffa7926f572b15a0277efb9cab33f07f53 Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 11:43:11 +0200 Subject: [PATCH] =?UTF-8?q?feat(store):=20JobsRepo=20CRUD=20over=20ix=5Fjo?= =?UTF-8?q?bs=20+=20integration=20fixtures=20(spec=20=C2=A74)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit JobsRepo covers the full job-lifecycle surface: - insert_pending: idempotent on (client_id, request_id) via ON CONFLICT DO NOTHING + re-select; assigns a 16-hex ix_id. - claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never double-dispatch a row. - get / get_by_correlation: hydrates JSONB back through Pydantic. - mark_done: done iff response.error is None, else error. - mark_error: explicit convenience wrapper. - update_callback_status: delivered | failed (no status transition). - sweep_orphans: time-based rescue of stuck running rows; attempts++. Integration fixtures (tests/integration/conftest.py): - Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is set (unit suite stays runnable on a bare laptop). - Alembic upgrade/downgrade runs in a subprocess so its internal asyncio.run() doesn't collide with pytest-asyncio's loop. - Per-test engine + truncate so loops never cross and tests start clean. 15 integration tests against a live postgres:16, including SKIP LOCKED concurrency + orphan sweep. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/ix/store/jobs_repo.py | 274 +++++++++++++++++++++ tests/integration/conftest.py | 124 ++++++++++ tests/integration/test_jobs_repo.py | 367 ++++++++++++++++++++++++++++ 3 files changed, 765 insertions(+) create mode 100644 src/ix/store/jobs_repo.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/test_jobs_repo.py diff --git a/src/ix/store/jobs_repo.py b/src/ix/store/jobs_repo.py new file mode 100644 index 0000000..7f7d368 --- /dev/null +++ b/src/ix/store/jobs_repo.py @@ -0,0 +1,274 @@ +"""Async CRUD over ``ix_jobs`` — the one module the worker / REST touches. + +Every method takes an :class:`AsyncSession` (caller-owned transaction). The +caller commits. We don't manage transactions inside repo methods because the +worker sometimes needs to claim + run-pipeline + mark-done inside one +long-running unit of work, and an inside-the-method commit would break that. + +A few invariants worth stating up front: + +* ``ix_id`` is a 16-char hex string assigned by :func:`insert_pending` on + first insert. Callers MUST NOT pass one (we generate it); if a + ``RequestIX`` arrives with ``ix_id`` set it is ignored. +* ``(client_id, request_id)`` is unique — on collision we return the + existing row unchanged. Callback URLs on the second insert are ignored; + the first insert's metadata wins. +* Claim uses ``FOR UPDATE SKIP LOCKED`` so concurrent workers never pick the + same row, and a session holding a lock doesn't block a sibling claimer. +* Status transitions: ``pending → running → (done | error)``. The sweeper is + the only path back to ``pending`` (and only from ``running``); terminal + states are stable. +""" + +from __future__ import annotations + +import secrets +from datetime import UTC, datetime +from typing import TYPE_CHECKING, Literal +from uuid import UUID, uuid4 + +from sqlalchemy import func, select, update +from sqlalchemy.dialects.postgresql import insert as pg_insert + +from ix.contracts.job import Job +from ix.contracts.request import RequestIX +from ix.contracts.response import ResponseIX +from ix.store.models import IxJob + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession + + +def _new_ix_id() -> str: + """Transport-assigned 16-hex handle. + + ``secrets.token_hex(8)`` gives 16 characters of entropy; good enough to + tag logs per spec §3 without collision risk across the lifetime of the + service. + """ + + return secrets.token_hex(8) + + +def _orm_to_job(row: IxJob) -> Job: + """Round-trip ORM row back through the Pydantic ``Job`` contract. + + The JSONB columns come out as plain dicts; we let Pydantic re-validate + them into :class:`RequestIX` / :class:`ResponseIX`. Catching validation + errors here would mask real bugs; we let them surface. + """ + + return Job( + job_id=row.job_id, + ix_id=row.ix_id, + client_id=row.client_id, + request_id=row.request_id, + status=row.status, # type: ignore[arg-type] + request=RequestIX.model_validate(row.request), + response=( + ResponseIX.model_validate(row.response) if row.response is not None else None + ), + callback_url=row.callback_url, + callback_status=row.callback_status, # type: ignore[arg-type] + attempts=row.attempts, + created_at=row.created_at, + started_at=row.started_at, + finished_at=row.finished_at, + ) + + +async def insert_pending( + session: AsyncSession, + request: RequestIX, + callback_url: str | None, +) -> Job: + """Insert a pending row; return the new or existing :class:`Job`. + + Uses ``INSERT ... ON CONFLICT DO NOTHING`` on the + ``(client_id, request_id)`` unique index, then re-selects. If the insert + was a no-op the existing row is returned verbatim (status / callback_url + unchanged) — callers rely on this for idempotent resubmission. + """ + + ix_id = request.ix_id or _new_ix_id() + job_id = uuid4() + + # Serialise the request through Pydantic so JSONB gets plain JSON types, + # not datetime / Decimal instances asyncpg would reject. + request_json = request.model_copy(update={"ix_id": ix_id}).model_dump( + mode="json" + ) + + stmt = ( + pg_insert(IxJob) + .values( + job_id=job_id, + ix_id=ix_id, + client_id=request.ix_client_id, + request_id=request.request_id, + status="pending", + request=request_json, + response=None, + callback_url=callback_url, + callback_status=None, + attempts=0, + ) + .on_conflict_do_nothing(index_elements=["client_id", "request_id"]) + ) + await session.execute(stmt) + + row = await session.scalar( + select(IxJob).where( + IxJob.client_id == request.ix_client_id, + IxJob.request_id == request.request_id, + ) + ) + assert row is not None, "insert_pending: row missing after upsert" + return _orm_to_job(row) + + +async def claim_next_pending(session: AsyncSession) -> Job | None: + """Atomically pick the oldest pending row and flip it to running. + + ``FOR UPDATE SKIP LOCKED`` means a sibling worker can never deadlock on + our row; they'll skip past it and grab the next pending entry. The + sibling test in :mod:`tests/integration/test_jobs_repo` asserts this. + """ + + stmt = ( + select(IxJob) + .where(IxJob.status == "pending") + .order_by(IxJob.created_at) + .limit(1) + .with_for_update(skip_locked=True) + ) + row = await session.scalar(stmt) + if row is None: + return None + + row.status = "running" + row.started_at = datetime.now(UTC) + await session.flush() + return _orm_to_job(row) + + +async def get(session: AsyncSession, job_id: UUID) -> Job | None: + row = await session.scalar(select(IxJob).where(IxJob.job_id == job_id)) + return _orm_to_job(row) if row is not None else None + + +async def get_by_correlation( + session: AsyncSession, client_id: str, request_id: str +) -> Job | None: + row = await session.scalar( + select(IxJob).where( + IxJob.client_id == client_id, + IxJob.request_id == request_id, + ) + ) + return _orm_to_job(row) if row is not None else None + + +async def mark_done( + session: AsyncSession, job_id: UUID, response: ResponseIX +) -> None: + """Write the pipeline's response and move to terminal state. + + Status is ``done`` iff ``response.error is None``; any non-None error + flips us to ``error``. Spec §3 lifecycle invariant. + """ + + status = "done" if response.error is None else "error" + await session.execute( + update(IxJob) + .where(IxJob.job_id == job_id) + .values( + status=status, + response=response.model_dump(mode="json"), + finished_at=datetime.now(UTC), + ) + ) + + +async def mark_error( + session: AsyncSession, job_id: UUID, response: ResponseIX +) -> None: + """Convenience wrapper that always writes status='error'. + + Separate from :func:`mark_done` for readability at call sites: when the + worker knows it caught an exception the pipeline didn't handle itself, + ``mark_error`` signals intent even if the response body happens to have + a populated error field. + """ + + await session.execute( + update(IxJob) + .where(IxJob.job_id == job_id) + .values( + status="error", + response=response.model_dump(mode="json"), + finished_at=datetime.now(UTC), + ) + ) + + +async def update_callback_status( + session: AsyncSession, + job_id: UUID, + status: Literal["delivered", "failed"], +) -> None: + await session.execute( + update(IxJob) + .where(IxJob.job_id == job_id) + .values(callback_status=status) + ) + + +async def sweep_orphans( + session: AsyncSession, + now: datetime, + max_running_seconds: int, +) -> list[UUID]: + """Reset stale ``running`` rows back to ``pending`` and bump ``attempts``. + + Called once at worker startup (spec §3) to rescue jobs whose owner died + mid-pipeline. The threshold is time-based on ``started_at`` so a still- + running worker never reclaims its own in-flight job — callers pass + ``2 * IX_PIPELINE_REQUEST_TIMEOUT_SECONDS`` per spec. + """ + + # Pick candidates and return their ids so the worker can log what it + # did. Two-step (SELECT then UPDATE) is clearer than RETURNING for + # callers who want the id list alongside a plain UPDATE. + candidates = ( + await session.scalars( + select(IxJob.job_id).where( + IxJob.status == "running", + IxJob.started_at < now - _as_interval(max_running_seconds), + ) + ) + ).all() + if not candidates: + return [] + + await session.execute( + update(IxJob) + .where(IxJob.job_id.in_(candidates)) + .values( + status="pending", + started_at=None, + attempts=IxJob.attempts + 1, + ) + ) + return list(candidates) + + +def _as_interval(seconds: int): # type: ignore[no-untyped-def] + """Return a SQL interval expression for ``seconds``. + + We build the interval via ``func.make_interval`` so asyncpg doesn't have + to guess at a text-form cast — the server-side ``make_interval(secs :=)`` + is unambiguous and avoids locale-dependent parsing. + """ + + return func.make_interval(0, 0, 0, 0, 0, 0, seconds) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..6b4e5ea --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,124 @@ +"""Integration-test fixtures — real Postgres required. + +Policy: tests that import these fixtures skip cleanly when no DB is +configured. We check ``IX_TEST_DATABASE_URL`` first (local developer +override, usually a disposable docker container), then ``IX_POSTGRES_URL`` +(what Forgejo Actions already sets). If neither is present the fixture +short-circuits with ``pytest.skip`` so a developer running +``pytest tests/unit`` in an unconfigured shell doesn't see the integration +suite hang or raise cryptic ``OperationalError``. + +Schema lifecycle: + +* session scope: ``alembic upgrade head`` once, ``alembic downgrade base`` + at session end. We tried ``Base.metadata.create_all`` at first — faster, + but it meant migrations stayed untested by the integration suite and a + developer who broke ``001_initial_ix_jobs.py`` wouldn't find out until + deploy. Current shape keeps migrations in the hot path. +* per-test: ``TRUNCATE ix_jobs`` (via the ``_reset_schema`` autouse fixture) + — faster than recreating the schema and preserves indexes/constraints so + tests that want to assert ON a unique-violation path actually get one. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from collections.abc import AsyncIterator, Iterator +from pathlib import Path + +import pytest +import pytest_asyncio +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +REPO_ROOT = Path(__file__).resolve().parents[2] + + +def _resolve_postgres_url() -> str | None: + """Pick the database URL per policy: test override → CI URL → none.""" + + return os.environ.get("IX_TEST_DATABASE_URL") or os.environ.get("IX_POSTGRES_URL") + + +@pytest.fixture(scope="session") +def postgres_url() -> str: + url = _resolve_postgres_url() + if not url: + pytest.skip( + "no postgres configured — set IX_TEST_DATABASE_URL or IX_POSTGRES_URL" + ) + return url + + +def _run_alembic(direction: str, postgres_url: str) -> None: + """Invoke Alembic in a subprocess so its ``asyncio.run`` inside ``env.py`` + doesn't collide with the pytest-asyncio event loop. + + We pass the URL via ``IX_POSTGRES_URL`` — not ``-x url=...`` — because + percent-encoded characters in developer passwords trip up alembic's + configparser-backed ini loader. The env var lane skips configparser. + """ + + env = os.environ.copy() + env["IX_POSTGRES_URL"] = postgres_url + subprocess.run( + [sys.executable, "-m", "alembic", direction, "head" if direction == "upgrade" else "base"], + cwd=REPO_ROOT, + env=env, + check=True, + ) + + +@pytest.fixture(scope="session", autouse=True) +def _prepare_schema(postgres_url: str) -> Iterator[None]: + """Run migrations once per session, torn down at the end. + + pytest-asyncio creates one event loop per test (function-scoped by + default) and asyncpg connections can't survive a loop switch. That + forces a function-scoped engine below — but migrations are expensive, + so we keep those session-scoped via a subprocess call (no loop + involvement at all). + """ + + _run_alembic("downgrade", postgres_url) + _run_alembic("upgrade", postgres_url) + yield + _run_alembic("downgrade", postgres_url) + + +@pytest_asyncio.fixture +async def engine(postgres_url: str) -> AsyncIterator[AsyncEngine]: + """Per-test async engine. + + Built fresh each test so its asyncpg connections live on the same loop + as the test itself. Dispose on teardown — otherwise asyncpg leaks tasks + into the next test's loop and we get ``got Future attached to a + different loop`` errors on the second test in a file. + """ + + eng = create_async_engine(postgres_url, pool_pre_ping=True) + try: + yield eng + finally: + await eng.dispose() + + +@pytest_asyncio.fixture +async def session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: + """Per-test session factory. ``expire_on_commit=False`` per prod parity.""" + + return async_sessionmaker(engine, expire_on_commit=False) + + +@pytest_asyncio.fixture(autouse=True) +async def _reset_schema(engine: AsyncEngine) -> None: + """Truncate ix_jobs between tests so each test starts from empty state.""" + + async with engine.begin() as conn: + await conn.exec_driver_sql("TRUNCATE ix_jobs") diff --git a/tests/integration/test_jobs_repo.py b/tests/integration/test_jobs_repo.py new file mode 100644 index 0000000..e6be127 --- /dev/null +++ b/tests/integration/test_jobs_repo.py @@ -0,0 +1,367 @@ +"""Integration tests for :mod:`ix.store.jobs_repo` — run against a real DB. + +Every test exercises one repo method end-to-end. A few go further and +concurrently spin up two sessions to demonstrate the claim query behaves +correctly under ``SKIP LOCKED`` (two claimers should never see the same row). + +Skipped cleanly when no Postgres is configured — see integration/conftest.py. +""" + +from __future__ import annotations + +import asyncio +from datetime import UTC, datetime, timedelta +from typing import TYPE_CHECKING +from uuid import UUID, uuid4 + +from ix.contracts.request import Context, RequestIX +from ix.contracts.response import ResponseIX +from ix.store import jobs_repo + +if TYPE_CHECKING: + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker + + +def _make_request(client: str = "mammon", request_id: str = "r-1") -> RequestIX: + return RequestIX( + use_case="bank_statement_header", + ix_client_id=client, + request_id=request_id, + context=Context(texts=["hello"]), + ) + + +async def test_insert_pending_creates_row_and_assigns_ix_id( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + job = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + assert job.status == "pending" + assert isinstance(job.job_id, UUID) + # ix_id is a 16-hex string per spec §3 — transport-assigned. + assert isinstance(job.ix_id, str) + assert len(job.ix_id) == 16 + assert all(c in "0123456789abcdef" for c in job.ix_id) + assert job.attempts == 0 + + +async def test_insert_pending_is_idempotent_on_correlation_key( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """(client_id, request_id) collides → existing row comes back unchanged.""" + + async with session_factory() as session: + first = await jobs_repo.insert_pending( + session, _make_request("mammon", "same-id"), callback_url="http://x/cb" + ) + await session.commit() + + async with session_factory() as session: + second = await jobs_repo.insert_pending( + session, _make_request("mammon", "same-id"), callback_url="http://y/cb" + ) + await session.commit() + + assert second.job_id == first.job_id + assert second.ix_id == first.ix_id + # The callback_url of the FIRST insert wins — we don't overwrite. + assert second.callback_url == "http://x/cb" + + +async def test_get_returns_full_job( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + async with session_factory() as session: + fetched = await jobs_repo.get(session, inserted.job_id) + + assert fetched is not None + assert fetched.job_id == inserted.job_id + assert fetched.request.use_case == "bank_statement_header" + assert fetched.status == "pending" + + +async def test_get_unknown_id_returns_none( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + result = await jobs_repo.get(session, uuid4()) + assert result is None + + +async def test_get_by_correlation( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request("mammon", "req-42"), callback_url=None + ) + await session.commit() + + async with session_factory() as session: + found = await jobs_repo.get_by_correlation(session, "mammon", "req-42") + assert found is not None + assert found.job_id == inserted.job_id + + async with session_factory() as session: + missing = await jobs_repo.get_by_correlation(session, "mammon", "nope") + assert missing is None + + +async def test_claim_next_pending_advances_status( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + async with session_factory() as session: + claimed = await jobs_repo.claim_next_pending(session) + await session.commit() + + assert claimed is not None + assert claimed.job_id == inserted.job_id + assert claimed.status == "running" + assert claimed.started_at is not None + + +async def test_claim_next_pending_returns_none_when_empty( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + claimed = await jobs_repo.claim_next_pending(session) + await session.commit() + assert claimed is None + + +async def test_claim_next_pending_skips_locked( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """Two concurrent claimers pick different rows (SKIP LOCKED in action).""" + + async with session_factory() as session: + a = await jobs_repo.insert_pending( + session, _make_request("c", "a"), callback_url=None + ) + b = await jobs_repo.insert_pending( + session, _make_request("c", "b"), callback_url=None + ) + await session.commit() + + session_a = session_factory() + session_b = session_factory() + try: + # Start the first claim but *don't* commit yet — its row is locked. + first = await jobs_repo.claim_next_pending(session_a) + # Second claimer runs while the first is still holding its lock. It + # must see the 'a' row as pending but SKIP it, returning the 'b' row. + second = await jobs_repo.claim_next_pending(session_b) + + assert first is not None and second is not None + assert {first.job_id, second.job_id} == {a.job_id, b.job_id} + assert first.job_id != second.job_id + + await session_a.commit() + await session_b.commit() + finally: + await session_a.close() + await session_b.close() + + +async def test_mark_done_writes_response_and_finishes( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + response = ResponseIX( + use_case="bank_statement_header", + ix_client_id="mammon", + request_id="r-1", + ) + + async with session_factory() as session: + await jobs_repo.mark_done(session, inserted.job_id, response) + await session.commit() + + async with session_factory() as session: + after = await jobs_repo.get(session, inserted.job_id) + assert after is not None + assert after.status == "done" + assert after.response is not None + assert after.finished_at is not None + + +async def test_mark_done_with_error_response_moves_to_error( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """`done` iff response.error is None — otherwise status='error'.""" + + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + bad = ResponseIX(error="IX_002_000: boom") + + async with session_factory() as session: + await jobs_repo.mark_done(session, inserted.job_id, bad) + await session.commit() + + async with session_factory() as session: + after = await jobs_repo.get(session, inserted.job_id) + assert after is not None + assert after.status == "error" + assert after.response is not None + assert (after.response.error or "").startswith("IX_002_000") + + +async def test_mark_error_always_error( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + bad = ResponseIX(error="IX_000_005: unsupported") + + async with session_factory() as session: + await jobs_repo.mark_error(session, inserted.job_id, bad) + await session.commit() + + async with session_factory() as session: + after = await jobs_repo.get(session, inserted.job_id) + assert after is not None + assert after.status == "error" + + +async def test_update_callback_status( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url="http://cb" + ) + await session.commit() + + async with session_factory() as session: + await jobs_repo.update_callback_status(session, inserted.job_id, "delivered") + await session.commit() + + async with session_factory() as session: + after = await jobs_repo.get(session, inserted.job_id) + assert after is not None + assert after.callback_status == "delivered" + + +async def test_sweep_orphans_resets_stale_running( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """Running rows older than (now - max_running_seconds) go back to pending.""" + + async with session_factory() as session: + inserted = await jobs_repo.insert_pending( + session, _make_request(), callback_url=None + ) + await session.commit() + + # Backdate started_at by an hour to simulate a crashed worker mid-job. + async with session_factory() as session: + from sqlalchemy import text + + stale = datetime.now(UTC) - timedelta(hours=1) + await session.execute( + text( + "UPDATE ix_jobs SET status='running', started_at=:t " + "WHERE job_id=:jid" + ), + {"t": stale, "jid": inserted.job_id}, + ) + await session.commit() + + # Max age of 60 s → our hour-old row gets swept. + async with session_factory() as session: + rescued = await jobs_repo.sweep_orphans( + session, datetime.now(UTC), max_running_seconds=60 + ) + await session.commit() + + assert inserted.job_id in rescued + async with session_factory() as session: + after = await jobs_repo.get(session, inserted.job_id) + assert after is not None + assert after.status == "pending" + assert after.attempts == 1 + + +async def test_sweep_orphans_leaves_fresh_running_alone( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """A just-claimed row must not get reclaimed by the sweeper.""" + + async with session_factory() as session: + await jobs_repo.insert_pending(session, _make_request(), callback_url=None) + await session.commit() + + async with session_factory() as session: + claimed = await jobs_repo.claim_next_pending(session) + await session.commit() + assert claimed is not None + + # Sweep with a huge threshold (1 hour). Our just-claimed row is fresh, so + # it stays running. + async with session_factory() as session: + rescued = await jobs_repo.sweep_orphans( + session, datetime.now(UTC), max_running_seconds=3600 + ) + await session.commit() + + assert rescued == [] + + async with session_factory() as session: + after = await jobs_repo.get(session, claimed.job_id) + assert after is not None + assert after.status == "running" + + +async def test_concurrent_claim_never_double_dispatches( + session_factory: async_sessionmaker[AsyncSession], +) -> None: + """Spin a batch of concurrent claimers; every insert is claimed exactly once.""" + + async with session_factory() as session: + ids = [] + for i in range(5): + job = await jobs_repo.insert_pending( + session, _make_request("mass", f"r-{i}"), callback_url=None + ) + ids.append(job.job_id) + await session.commit() + + async def claim_one() -> UUID | None: + async with session_factory() as session: + claimed = await jobs_repo.claim_next_pending(session) + await session.commit() + return claimed.job_id if claimed else None + + results = await asyncio.gather(*(claim_one() for _ in range(10))) + non_null = [r for r in results if r is not None] + # Every inserted id appears at most once. + assert sorted(non_null) == sorted(ids)