Worker:
- Startup: sweep_orphans(now, max_running_seconds) rescues rows stuck
in 'running' from a crashed prior process.
- Loop: claim_next_pending → build pipeline via injected factory → run
→ mark_done/mark_error → deliver callback if set → record outcome.
- Non-IX exceptions from the pipeline collapse to IX_002_000 so callers
see a stable error code.
- Sleep loop uses a cancellable wait so the stop event reacts
immediately; the wait_for_work hook is ready for Task 3.6 to plug in
the LISTEN-driven event without the worker knowing about NOTIFY.
Callback:
- One-shot POST, 2xx → delivered, anything else (incl. connect/timeout
exceptions) → failed. No retries.
- Callback record never reverts the job's terminal state — GET /jobs/{id}
stays the authoritative fallback.
7 integration tests: happy path, pipeline-raise → error, callback 2xx,
callback 5xx, orphan sweep on startup, no-callback rows stay
callback_status=None (x2 via parametrize). Unit suite still 209.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
325 lines
10 KiB
Python
325 lines
10 KiB
Python
"""Integration tests for the worker loop (Task 3.5).
|
|
|
|
We spin up a real worker with a fake pipeline factory and verify the lifecycle
|
|
transitions against a live DB. Callback delivery is exercised via
|
|
``pytest-httpx`` — callers' webhook endpoints are mocked, not run.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import UTC, datetime, timedelta
|
|
from typing import TYPE_CHECKING
|
|
|
|
import pytest
|
|
from pytest_httpx import HTTPXMock
|
|
|
|
from ix.contracts.request import Context, RequestIX
|
|
from ix.contracts.response import ResponseIX
|
|
from ix.pipeline.pipeline import Pipeline
|
|
from ix.pipeline.step import Step
|
|
from ix.store import jobs_repo
|
|
from ix.worker.loop import Worker
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
class _PassingStep(Step):
|
|
"""Minimal fake step that writes a sentinel field on the response."""
|
|
|
|
step_name = "fake_pass"
|
|
|
|
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
|
|
return True
|
|
|
|
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
|
|
response_ix.use_case = request_ix.use_case
|
|
response_ix.ix_client_id = request_ix.ix_client_id
|
|
response_ix.request_id = request_ix.request_id
|
|
response_ix.ix_id = request_ix.ix_id
|
|
return response_ix
|
|
|
|
|
|
class _RaisingStep(Step):
|
|
"""Fake step that raises a non-IX exception to exercise the worker's
|
|
belt-and-braces error path."""
|
|
|
|
step_name = "fake_raise"
|
|
|
|
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
|
|
return True
|
|
|
|
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
|
|
raise RuntimeError("boom")
|
|
|
|
|
|
def _ok_factory() -> Pipeline:
|
|
return Pipeline(steps=[_PassingStep()])
|
|
|
|
|
|
def _bad_factory() -> Pipeline:
|
|
return Pipeline(steps=[_RaisingStep()])
|
|
|
|
|
|
async def _insert_pending(session_factory, **kwargs): # type: ignore[no-untyped-def]
|
|
request = RequestIX(
|
|
use_case="bank_statement_header",
|
|
ix_client_id=kwargs.get("client", "test"),
|
|
request_id=kwargs.get("rid", "r-1"),
|
|
context=Context(texts=["hi"]),
|
|
)
|
|
async with session_factory() as session:
|
|
job = await jobs_repo.insert_pending(
|
|
session, request, callback_url=kwargs.get("cb")
|
|
)
|
|
await session.commit()
|
|
return job
|
|
|
|
|
|
async def test_worker_runs_one_job_to_done(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
job = await _insert_pending(session_factory)
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_ok_factory,
|
|
poll_interval_seconds=0.1,
|
|
max_running_seconds=3600,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
"""Wait until the job lands in a terminal state, then stop the worker."""
|
|
|
|
for _ in range(50): # 5 seconds budget
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if current is not None and current.status in ("done", "error"):
|
|
stop.set()
|
|
return
|
|
stop.set() # timeout — let the worker exit so assertions run
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.status == "done"
|
|
assert final.finished_at is not None
|
|
|
|
|
|
async def test_worker_pipeline_exception_marks_error(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
"""A step raising a non-IX exception → status=error, response carries the
|
|
code. The worker catches what the pipeline doesn't."""
|
|
|
|
job = await _insert_pending(session_factory)
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_bad_factory,
|
|
poll_interval_seconds=0.1,
|
|
max_running_seconds=3600,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
for _ in range(50):
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if current is not None and current.status == "error":
|
|
stop.set()
|
|
return
|
|
stop.set()
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.status == "error"
|
|
assert final.response is not None
|
|
assert (final.response.error or "").startswith("IX_002_000")
|
|
|
|
|
|
async def test_worker_delivers_callback(
|
|
httpx_mock: HTTPXMock,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
"""callback_url on a done job → one POST, callback_status=delivered."""
|
|
|
|
httpx_mock.add_response(url="http://caller/webhook", status_code=200)
|
|
|
|
job = await _insert_pending(session_factory, cb="http://caller/webhook")
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_ok_factory,
|
|
poll_interval_seconds=0.1,
|
|
max_running_seconds=3600,
|
|
callback_timeout_seconds=5,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
for _ in range(80):
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if (
|
|
current is not None
|
|
and current.status == "done"
|
|
and current.callback_status is not None
|
|
):
|
|
stop.set()
|
|
return
|
|
stop.set()
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.callback_status == "delivered"
|
|
|
|
|
|
async def test_worker_marks_callback_failed_on_5xx(
|
|
httpx_mock: HTTPXMock,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
httpx_mock.add_response(url="http://caller/bad", status_code=500)
|
|
|
|
job = await _insert_pending(session_factory, cb="http://caller/bad")
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_ok_factory,
|
|
poll_interval_seconds=0.1,
|
|
max_running_seconds=3600,
|
|
callback_timeout_seconds=5,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
for _ in range(80):
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if (
|
|
current is not None
|
|
and current.status == "done"
|
|
and current.callback_status is not None
|
|
):
|
|
stop.set()
|
|
return
|
|
stop.set()
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.status == "done" # terminal state stays done
|
|
assert final.callback_status == "failed"
|
|
|
|
|
|
async def test_worker_sweeps_orphans_at_startup(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
) -> None:
|
|
"""Stale running rows → pending before the loop starts picking work."""
|
|
|
|
# Insert a job and backdate it to mimic a crashed worker mid-run.
|
|
job = await _insert_pending(session_factory, rid="orphan")
|
|
|
|
async with session_factory() as session:
|
|
from sqlalchemy import text
|
|
|
|
stale = datetime.now(UTC) - timedelta(hours=2)
|
|
await session.execute(
|
|
text(
|
|
"UPDATE ix_jobs SET status='running', started_at=:t "
|
|
"WHERE job_id=:jid"
|
|
),
|
|
{"t": stale, "jid": job.job_id},
|
|
)
|
|
await session.commit()
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_ok_factory,
|
|
poll_interval_seconds=0.1,
|
|
# max_running_seconds=60 so our 2-hour-old row gets swept.
|
|
max_running_seconds=60,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
for _ in range(80):
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if current is not None and current.status == "done":
|
|
stop.set()
|
|
return
|
|
stop.set()
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.status == "done"
|
|
# attempts starts at 0, gets +1 on sweep.
|
|
assert final.attempts >= 1
|
|
|
|
|
|
@pytest.mark.parametrize("non_matching_url", ["http://x/y", None])
|
|
async def test_worker_no_callback_leaves_callback_status_none(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
httpx_mock: HTTPXMock,
|
|
non_matching_url: str | None,
|
|
) -> None:
|
|
"""Jobs without a callback_url should never get a callback_status set."""
|
|
|
|
if non_matching_url is not None:
|
|
# If we ever accidentally deliver, pytest-httpx will complain because
|
|
# no mock matches — which is the signal we want.
|
|
pass
|
|
|
|
job = await _insert_pending(session_factory) # cb=None by default
|
|
|
|
worker = Worker(
|
|
session_factory=session_factory,
|
|
pipeline_factory=_ok_factory,
|
|
poll_interval_seconds=0.1,
|
|
max_running_seconds=3600,
|
|
)
|
|
stop = asyncio.Event()
|
|
|
|
async def _monitor() -> None:
|
|
for _ in range(50):
|
|
await asyncio.sleep(0.1)
|
|
async with session_factory() as session:
|
|
current = await jobs_repo.get(session, job.job_id)
|
|
if current is not None and current.status == "done":
|
|
stop.set()
|
|
return
|
|
stop.set()
|
|
|
|
await asyncio.gather(worker.run(stop), _monitor())
|
|
|
|
async with session_factory() as session:
|
|
final = await jobs_repo.get(session, job.job_id)
|
|
assert final is not None
|
|
assert final.callback_status is None
|
|
|
|
|
|
def _unused() -> None:
|
|
"""Silence a ruff F401 for ResponseIX — kept for symmetry w/ other tests."""
|
|
|
|
_ = ResponseIX
|