"""Integration tests for the PgQueueListener + worker integration (Task 3.6). Two scenarios: 1. NOTIFY delivered — worker wakes within ~1 s and picks the row up. 2. Missed NOTIFY — the row still gets picked up by the fallback poll. Both run a real worker + listener against a live Postgres. We drive them via ``asyncio.gather`` + a "until done" watchdog. """ from __future__ import annotations import asyncio from typing import TYPE_CHECKING from sqlalchemy import text from ix.adapters.pg_queue.listener import PgQueueListener, asyncpg_dsn_from_sqlalchemy_url from ix.contracts.request import Context, RequestIX from ix.pipeline.pipeline import Pipeline from ix.pipeline.step import Step from ix.store import jobs_repo from ix.worker.loop import Worker if TYPE_CHECKING: from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker class _PassingStep(Step): """Same minimal fake as test_worker_loop — keeps these suites independent.""" step_name = "fake_pass" async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def] return True async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def] response_ix.use_case = request_ix.use_case return response_ix def _factory() -> Pipeline: return Pipeline(steps=[_PassingStep()]) async def _wait_for_status( session_factory: async_sessionmaker[AsyncSession], job_id, target: str, timeout_s: float, ) -> bool: deadline = asyncio.get_event_loop().time() + timeout_s while asyncio.get_event_loop().time() < deadline: async with session_factory() as session: job = await jobs_repo.get(session, job_id) if job is not None and job.status == target: return True await asyncio.sleep(0.1) return False async def test_notify_wakes_worker_within_2s( session_factory: async_sessionmaker[AsyncSession], postgres_url: str, ) -> None: """Direct INSERT + NOTIFY → worker picks it up fast (not via the poll).""" listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url)) await listener.start() worker = Worker( session_factory=session_factory, pipeline_factory=_factory, # 60 s fallback poll — if we still find the row within 2 s it's # because NOTIFY woke us, not the poll. poll_interval_seconds=60.0, max_running_seconds=3600, wait_for_work=listener.wait_for_work, ) stop = asyncio.Event() worker_task = asyncio.create_task(worker.run(stop)) # Give the worker one tick to reach the sleep_or_wake branch. await asyncio.sleep(0.3) # Insert a pending row manually + NOTIFY — simulates a direct-SQL client # like an external batch script. request = RequestIX( use_case="bank_statement_header", ix_client_id="pgq", request_id="notify-1", context=Context(texts=["hi"]), ) async with session_factory() as session: job = await jobs_repo.insert_pending(session, request, callback_url=None) await session.commit() async with session_factory() as session: await session.execute( text(f"NOTIFY ix_jobs_new, '{job.job_id}'") ) await session.commit() assert await _wait_for_status(session_factory, job.job_id, "done", 3.0), ( "worker didn't pick up the NOTIFY'd row in time" ) stop.set() await worker_task await listener.stop() async def test_missed_notify_falls_back_to_poll( session_factory: async_sessionmaker[AsyncSession], postgres_url: str, ) -> None: """Row lands without a NOTIFY; fallback poll still picks it up.""" listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url)) await listener.start() worker = Worker( session_factory=session_factory, pipeline_factory=_factory, # Short poll so the fallback kicks in quickly — we need the test # to finish in seconds, not the spec's 10 s. poll_interval_seconds=0.5, max_running_seconds=3600, wait_for_work=listener.wait_for_work, ) stop = asyncio.Event() worker_task = asyncio.create_task(worker.run(stop)) await asyncio.sleep(0.3) # Insert without NOTIFY: simulate a buggy writer. request = RequestIX( use_case="bank_statement_header", ix_client_id="pgq", request_id="missed-1", context=Context(texts=["hi"]), ) async with session_factory() as session: job = await jobs_repo.insert_pending(session, request, callback_url=None) await session.commit() assert await _wait_for_status(session_factory, job.job_id, "done", 5.0), ( "fallback poll didn't pick up the row" ) stop.set() await worker_task await listener.stop()