"""Integration tests for :mod:`ix.store.jobs_repo` — run against a real DB. Every test exercises one repo method end-to-end. A few go further and concurrently spin up two sessions to demonstrate the claim query behaves correctly under ``SKIP LOCKED`` (two claimers should never see the same row). Skipped cleanly when no Postgres is configured — see integration/conftest.py. """ from __future__ import annotations import asyncio from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING from uuid import UUID, uuid4 from ix.contracts.request import Context, RequestIX from ix.contracts.response import ResponseIX from ix.store import jobs_repo if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker def _make_request(client: str = "mammon", request_id: str = "r-1") -> RequestIX: return RequestIX( use_case="bank_statement_header", ix_client_id=client, request_id=request_id, context=Context(texts=["hello"]), ) async def test_insert_pending_creates_row_and_assigns_ix_id( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: job = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() assert job.status == "pending" assert isinstance(job.job_id, UUID) # ix_id is a 16-hex string per spec §3 — transport-assigned. assert isinstance(job.ix_id, str) assert len(job.ix_id) == 16 assert all(c in "0123456789abcdef" for c in job.ix_id) assert job.attempts == 0 async def test_insert_pending_is_idempotent_on_correlation_key( session_factory: async_sessionmaker[AsyncSession], ) -> None: """(client_id, request_id) collides → existing row comes back unchanged.""" async with session_factory() as session: first = await jobs_repo.insert_pending( session, _make_request("mammon", "same-id"), callback_url="http://x/cb" ) await session.commit() async with session_factory() as session: second = await jobs_repo.insert_pending( session, _make_request("mammon", "same-id"), callback_url="http://y/cb" ) await session.commit() assert second.job_id == first.job_id assert second.ix_id == first.ix_id # The callback_url of the FIRST insert wins — we don't overwrite. assert second.callback_url == "http://x/cb" async def test_get_returns_full_job( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() async with session_factory() as session: fetched = await jobs_repo.get(session, inserted.job_id) assert fetched is not None assert fetched.job_id == inserted.job_id assert fetched.request.use_case == "bank_statement_header" assert fetched.status == "pending" async def test_get_unknown_id_returns_none( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: result = await jobs_repo.get(session, uuid4()) assert result is None async def test_get_by_correlation( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request("mammon", "req-42"), callback_url=None ) await session.commit() async with session_factory() as session: found = await jobs_repo.get_by_correlation(session, "mammon", "req-42") assert found is not None assert found.job_id == inserted.job_id async with session_factory() as session: missing = await jobs_repo.get_by_correlation(session, "mammon", "nope") assert missing is None async def test_claim_next_pending_advances_status( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() async with session_factory() as session: claimed = await jobs_repo.claim_next_pending(session) await session.commit() assert claimed is not None assert claimed.job_id == inserted.job_id assert claimed.status == "running" assert claimed.started_at is not None async def test_claim_next_pending_returns_none_when_empty( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: claimed = await jobs_repo.claim_next_pending(session) await session.commit() assert claimed is None async def test_claim_next_pending_skips_locked( session_factory: async_sessionmaker[AsyncSession], ) -> None: """Two concurrent claimers pick different rows (SKIP LOCKED in action).""" async with session_factory() as session: a = await jobs_repo.insert_pending( session, _make_request("c", "a"), callback_url=None ) b = await jobs_repo.insert_pending( session, _make_request("c", "b"), callback_url=None ) await session.commit() session_a = session_factory() session_b = session_factory() try: # Start the first claim but *don't* commit yet — its row is locked. first = await jobs_repo.claim_next_pending(session_a) # Second claimer runs while the first is still holding its lock. It # must see the 'a' row as pending but SKIP it, returning the 'b' row. second = await jobs_repo.claim_next_pending(session_b) assert first is not None and second is not None assert {first.job_id, second.job_id} == {a.job_id, b.job_id} assert first.job_id != second.job_id await session_a.commit() await session_b.commit() finally: await session_a.close() await session_b.close() async def test_mark_done_writes_response_and_finishes( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() response = ResponseIX( use_case="bank_statement_header", ix_client_id="mammon", request_id="r-1", ) async with session_factory() as session: await jobs_repo.mark_done(session, inserted.job_id, response) await session.commit() async with session_factory() as session: after = await jobs_repo.get(session, inserted.job_id) assert after is not None assert after.status == "done" assert after.response is not None assert after.finished_at is not None async def test_mark_done_with_error_response_moves_to_error( session_factory: async_sessionmaker[AsyncSession], ) -> None: """`done` iff response.error is None — otherwise status='error'.""" async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() bad = ResponseIX(error="IX_002_000: boom") async with session_factory() as session: await jobs_repo.mark_done(session, inserted.job_id, bad) await session.commit() async with session_factory() as session: after = await jobs_repo.get(session, inserted.job_id) assert after is not None assert after.status == "error" assert after.response is not None assert (after.response.error or "").startswith("IX_002_000") async def test_mark_error_always_error( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() bad = ResponseIX(error="IX_000_005: unsupported") async with session_factory() as session: await jobs_repo.mark_error(session, inserted.job_id, bad) await session.commit() async with session_factory() as session: after = await jobs_repo.get(session, inserted.job_id) assert after is not None assert after.status == "error" async def test_update_callback_status( session_factory: async_sessionmaker[AsyncSession], ) -> None: async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url="http://cb" ) await session.commit() async with session_factory() as session: await jobs_repo.update_callback_status(session, inserted.job_id, "delivered") await session.commit() async with session_factory() as session: after = await jobs_repo.get(session, inserted.job_id) assert after is not None assert after.callback_status == "delivered" async def test_sweep_orphans_resets_stale_running( session_factory: async_sessionmaker[AsyncSession], ) -> None: """Running rows older than (now - max_running_seconds) go back to pending.""" async with session_factory() as session: inserted = await jobs_repo.insert_pending( session, _make_request(), callback_url=None ) await session.commit() # Backdate started_at by an hour to simulate a crashed worker mid-job. async with session_factory() as session: from sqlalchemy import text stale = datetime.now(UTC) - timedelta(hours=1) await session.execute( text( "UPDATE ix_jobs SET status='running', started_at=:t " "WHERE job_id=:jid" ), {"t": stale, "jid": inserted.job_id}, ) await session.commit() # Max age of 60 s → our hour-old row gets swept. async with session_factory() as session: rescued = await jobs_repo.sweep_orphans( session, datetime.now(UTC), max_running_seconds=60 ) await session.commit() assert inserted.job_id in rescued async with session_factory() as session: after = await jobs_repo.get(session, inserted.job_id) assert after is not None assert after.status == "pending" assert after.attempts == 1 async def test_sweep_orphans_leaves_fresh_running_alone( session_factory: async_sessionmaker[AsyncSession], ) -> None: """A just-claimed row must not get reclaimed by the sweeper.""" async with session_factory() as session: await jobs_repo.insert_pending(session, _make_request(), callback_url=None) await session.commit() async with session_factory() as session: claimed = await jobs_repo.claim_next_pending(session) await session.commit() assert claimed is not None # Sweep with a huge threshold (1 hour). Our just-claimed row is fresh, so # it stays running. async with session_factory() as session: rescued = await jobs_repo.sweep_orphans( session, datetime.now(UTC), max_running_seconds=3600 ) await session.commit() assert rescued == [] async with session_factory() as session: after = await jobs_repo.get(session, claimed.job_id) assert after is not None assert after.status == "running" async def test_concurrent_claim_never_double_dispatches( session_factory: async_sessionmaker[AsyncSession], ) -> None: """Spin a batch of concurrent claimers; every insert is claimed exactly once.""" async with session_factory() as session: ids = [] for i in range(5): job = await jobs_repo.insert_pending( session, _make_request("mass", f"r-{i}"), callback_url=None ) ids.append(job.job_id) await session.commit() async def claim_one() -> UUID | None: async with session_factory() as session: claimed = await jobs_repo.claim_next_pending(session) await session.commit() return claimed.job_id if claimed else None results = await asyncio.gather(*(claim_one() for _ in range(10))) non_null = [r for r in results if r is not None] # Every inserted id appears at most once. assert sorted(non_null) == sorted(ids)