infoxtractor/tests/integration/test_worker_loop.py
Dirk Riemann 406a7ea2fd
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m8s
feat(worker): async worker loop + one-shot callback delivery (spec §5)
Worker:
- Startup: sweep_orphans(now, max_running_seconds) rescues rows stuck
  in 'running' from a crashed prior process.
- Loop: claim_next_pending → build pipeline via injected factory → run
  → mark_done/mark_error → deliver callback if set → record outcome.
- Non-IX exceptions from the pipeline collapse to IX_002_000 so callers
  see a stable error code.
- Sleep loop uses a cancellable wait so the stop event reacts
  immediately; the wait_for_work hook is ready for Task 3.6 to plug in
  the LISTEN-driven event without the worker knowing about NOTIFY.

Callback:
- One-shot POST, 2xx → delivered, anything else (incl. connect/timeout
  exceptions) → failed. No retries.
- Callback record never reverts the job's terminal state — GET /jobs/{id}
  stays the authoritative fallback.

7 integration tests: happy path, pipeline-raise → error, callback 2xx,
callback 5xx, orphan sweep on startup, no-callback rows stay
callback_status=None (x2 via parametrize). Unit suite still 209.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:49:54 +02:00

325 lines
10 KiB
Python

"""Integration tests for the worker loop (Task 3.5).
We spin up a real worker with a fake pipeline factory and verify the lifecycle
transitions against a live DB. Callback delivery is exercised via
``pytest-httpx`` — callers' webhook endpoints are mocked, not run.
"""
from __future__ import annotations
import asyncio
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING
import pytest
from pytest_httpx import HTTPXMock
from ix.contracts.request import Context, RequestIX
from ix.contracts.response import ResponseIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Minimal fake step that writes a sentinel field on the response."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
response_ix.ix_client_id = request_ix.ix_client_id
response_ix.request_id = request_ix.request_id
response_ix.ix_id = request_ix.ix_id
return response_ix
class _RaisingStep(Step):
"""Fake step that raises a non-IX exception to exercise the worker's
belt-and-braces error path."""
step_name = "fake_raise"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
raise RuntimeError("boom")
def _ok_factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
def _bad_factory() -> Pipeline:
return Pipeline(steps=[_RaisingStep()])
async def _insert_pending(session_factory, **kwargs): # type: ignore[no-untyped-def]
request = RequestIX(
use_case="bank_statement_header",
ix_client_id=kwargs.get("client", "test"),
request_id=kwargs.get("rid", "r-1"),
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(
session, request, callback_url=kwargs.get("cb")
)
await session.commit()
return job
async def test_worker_runs_one_job_to_done(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
"""Wait until the job lands in a terminal state, then stop the worker."""
for _ in range(50): # 5 seconds budget
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status in ("done", "error"):
stop.set()
return
stop.set() # timeout — let the worker exit so assertions run
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
assert final.finished_at is not None
async def test_worker_pipeline_exception_marks_error(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""A step raising a non-IX exception → status=error, response carries the
code. The worker catches what the pipeline doesn't."""
job = await _insert_pending(session_factory)
worker = Worker(
session_factory=session_factory,
pipeline_factory=_bad_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "error":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "error"
assert final.response is not None
assert (final.response.error or "").startswith("IX_002_000")
async def test_worker_delivers_callback(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""callback_url on a done job → one POST, callback_status=delivered."""
httpx_mock.add_response(url="http://caller/webhook", status_code=200)
job = await _insert_pending(session_factory, cb="http://caller/webhook")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status == "delivered"
async def test_worker_marks_callback_failed_on_5xx(
httpx_mock: HTTPXMock,
session_factory: async_sessionmaker[AsyncSession],
) -> None:
httpx_mock.add_response(url="http://caller/bad", status_code=500)
job = await _insert_pending(session_factory, cb="http://caller/bad")
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
callback_timeout_seconds=5,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if (
current is not None
and current.status == "done"
and current.callback_status is not None
):
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done" # terminal state stays done
assert final.callback_status == "failed"
async def test_worker_sweeps_orphans_at_startup(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
"""Stale running rows → pending before the loop starts picking work."""
# Insert a job and backdate it to mimic a crashed worker mid-run.
job = await _insert_pending(session_factory, rid="orphan")
async with session_factory() as session:
from sqlalchemy import text
stale = datetime.now(UTC) - timedelta(hours=2)
await session.execute(
text(
"UPDATE ix_jobs SET status='running', started_at=:t "
"WHERE job_id=:jid"
),
{"t": stale, "jid": job.job_id},
)
await session.commit()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
# max_running_seconds=60 so our 2-hour-old row gets swept.
max_running_seconds=60,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(80):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.status == "done"
# attempts starts at 0, gets +1 on sweep.
assert final.attempts >= 1
@pytest.mark.parametrize("non_matching_url", ["http://x/y", None])
async def test_worker_no_callback_leaves_callback_status_none(
session_factory: async_sessionmaker[AsyncSession],
httpx_mock: HTTPXMock,
non_matching_url: str | None,
) -> None:
"""Jobs without a callback_url should never get a callback_status set."""
if non_matching_url is not None:
# If we ever accidentally deliver, pytest-httpx will complain because
# no mock matches — which is the signal we want.
pass
job = await _insert_pending(session_factory) # cb=None by default
worker = Worker(
session_factory=session_factory,
pipeline_factory=_ok_factory,
poll_interval_seconds=0.1,
max_running_seconds=3600,
)
stop = asyncio.Event()
async def _monitor() -> None:
for _ in range(50):
await asyncio.sleep(0.1)
async with session_factory() as session:
current = await jobs_repo.get(session, job.job_id)
if current is not None and current.status == "done":
stop.set()
return
stop.set()
await asyncio.gather(worker.run(stop), _monitor())
async with session_factory() as session:
final = await jobs_repo.get(session, job.job_id)
assert final is not None
assert final.callback_status is None
def _unused() -> None:
"""Silence a ruff F401 for ResponseIX — kept for symmetry w/ other tests."""
_ = ResponseIX