"""Integration tests for the worker loop (Task 3.5). We spin up a real worker with a fake pipeline factory and verify the lifecycle transitions against a live DB. Callback delivery is exercised via ``pytest-httpx`` — callers' webhook endpoints are mocked, not run. """ from __future__ import annotations import asyncio from datetime import UTC, datetime, timedelta from typing import TYPE_CHECKING import pytest from pytest_httpx import HTTPXMock from ix.contracts.request import Context, RequestIX from ix.contracts.response import ResponseIX from ix.pipeline.pipeline import Pipeline from ix.pipeline.step import Step from ix.store import jobs_repo from ix.worker.loop import Worker if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker class _PassingStep(Step): """Minimal fake step that writes a sentinel field on the response.""" step_name = "fake_pass" async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def] return True async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def] response_ix.use_case = request_ix.use_case response_ix.ix_client_id = request_ix.ix_client_id response_ix.request_id = request_ix.request_id response_ix.ix_id = request_ix.ix_id return response_ix class _RaisingStep(Step): """Fake step that raises a non-IX exception to exercise the worker's belt-and-braces error path.""" step_name = "fake_raise" async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def] return True async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def] raise RuntimeError("boom") def _ok_factory() -> Pipeline: return Pipeline(steps=[_PassingStep()]) def _bad_factory() -> Pipeline: return Pipeline(steps=[_RaisingStep()]) async def _insert_pending(session_factory, **kwargs): # type: ignore[no-untyped-def] request = RequestIX( use_case="bank_statement_header", ix_client_id=kwargs.get("client", "test"), request_id=kwargs.get("rid", "r-1"), context=Context(texts=["hi"]), ) async with session_factory() as session: job = await jobs_repo.insert_pending( session, request, callback_url=kwargs.get("cb") ) await session.commit() return job async def test_worker_runs_one_job_to_done( session_factory: async_sessionmaker[AsyncSession], ) -> None: job = await _insert_pending(session_factory) worker = Worker( session_factory=session_factory, pipeline_factory=_ok_factory, poll_interval_seconds=0.1, max_running_seconds=3600, ) stop = asyncio.Event() async def _monitor() -> None: """Wait until the job lands in a terminal state, then stop the worker.""" for _ in range(50): # 5 seconds budget await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if current is not None and current.status in ("done", "error"): stop.set() return stop.set() # timeout — let the worker exit so assertions run await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.status == "done" assert final.finished_at is not None async def test_worker_pipeline_exception_marks_error( session_factory: async_sessionmaker[AsyncSession], ) -> None: """A step raising a non-IX exception → status=error, response carries the code. The worker catches what the pipeline doesn't.""" job = await _insert_pending(session_factory) worker = Worker( session_factory=session_factory, pipeline_factory=_bad_factory, poll_interval_seconds=0.1, max_running_seconds=3600, ) stop = asyncio.Event() async def _monitor() -> None: for _ in range(50): await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if current is not None and current.status == "error": stop.set() return stop.set() await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.status == "error" assert final.response is not None assert (final.response.error or "").startswith("IX_002_000") async def test_worker_delivers_callback( httpx_mock: HTTPXMock, session_factory: async_sessionmaker[AsyncSession], ) -> None: """callback_url on a done job → one POST, callback_status=delivered.""" httpx_mock.add_response(url="http://caller/webhook", status_code=200) job = await _insert_pending(session_factory, cb="http://caller/webhook") worker = Worker( session_factory=session_factory, pipeline_factory=_ok_factory, poll_interval_seconds=0.1, max_running_seconds=3600, callback_timeout_seconds=5, ) stop = asyncio.Event() async def _monitor() -> None: for _ in range(80): await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if ( current is not None and current.status == "done" and current.callback_status is not None ): stop.set() return stop.set() await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.callback_status == "delivered" async def test_worker_marks_callback_failed_on_5xx( httpx_mock: HTTPXMock, session_factory: async_sessionmaker[AsyncSession], ) -> None: httpx_mock.add_response(url="http://caller/bad", status_code=500) job = await _insert_pending(session_factory, cb="http://caller/bad") worker = Worker( session_factory=session_factory, pipeline_factory=_ok_factory, poll_interval_seconds=0.1, max_running_seconds=3600, callback_timeout_seconds=5, ) stop = asyncio.Event() async def _monitor() -> None: for _ in range(80): await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if ( current is not None and current.status == "done" and current.callback_status is not None ): stop.set() return stop.set() await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.status == "done" # terminal state stays done assert final.callback_status == "failed" async def test_worker_sweeps_orphans_at_startup( session_factory: async_sessionmaker[AsyncSession], ) -> None: """Stale running rows → pending before the loop starts picking work.""" # Insert a job and backdate it to mimic a crashed worker mid-run. job = await _insert_pending(session_factory, rid="orphan") async with session_factory() as session: from sqlalchemy import text stale = datetime.now(UTC) - timedelta(hours=2) await session.execute( text( "UPDATE ix_jobs SET status='running', started_at=:t " "WHERE job_id=:jid" ), {"t": stale, "jid": job.job_id}, ) await session.commit() worker = Worker( session_factory=session_factory, pipeline_factory=_ok_factory, poll_interval_seconds=0.1, # max_running_seconds=60 so our 2-hour-old row gets swept. max_running_seconds=60, ) stop = asyncio.Event() async def _monitor() -> None: for _ in range(80): await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if current is not None and current.status == "done": stop.set() return stop.set() await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.status == "done" # attempts starts at 0, gets +1 on sweep. assert final.attempts >= 1 @pytest.mark.parametrize("non_matching_url", ["http://x/y", None]) async def test_worker_no_callback_leaves_callback_status_none( session_factory: async_sessionmaker[AsyncSession], httpx_mock: HTTPXMock, non_matching_url: str | None, ) -> None: """Jobs without a callback_url should never get a callback_status set.""" if non_matching_url is not None: # If we ever accidentally deliver, pytest-httpx will complain because # no mock matches — which is the signal we want. pass job = await _insert_pending(session_factory) # cb=None by default worker = Worker( session_factory=session_factory, pipeline_factory=_ok_factory, poll_interval_seconds=0.1, max_running_seconds=3600, ) stop = asyncio.Event() async def _monitor() -> None: for _ in range(50): await asyncio.sleep(0.1) async with session_factory() as session: current = await jobs_repo.get(session, job.job_id) if current is not None and current.status == "done": stop.set() return stop.set() await asyncio.gather(worker.run(stop), _monitor()) async with session_factory() as session: final = await jobs_repo.get(session, job.job_id) assert final is not None assert final.callback_status is None def _unused() -> None: """Silence a ruff F401 for ResponseIX — kept for symmetry w/ other tests.""" _ = ResponseIX