infoxtractor/tests/integration/test_pg_queue_adapter.py
Dirk Riemann 050f80dcd7
All checks were successful
tests / test (push) Successful in 1m8s
tests / test (pull_request) Successful in 1m9s
feat(pg-queue): LISTEN ix_jobs_new + 10s fallback poll (spec §4)
PgQueueListener:
- Dedicated asyncpg connection outside the SQLAlchemy pool (LISTEN
  needs a persistent connection; pooled connections check in/out).
- Exposes wait_for_work(timeout) — resolves on NOTIFY or timeout,
  whichever fires first. The worker treats both wakes identically.
- asyncpg_dsn_from_sqlalchemy_url strips the +asyncpg driver segment
  and percent-decodes the password so the same URL in IX_POSTGRES_URL
  works for both SQLAlchemy and raw asyncpg.

app.py lifespan now also spawns the listener alongside the worker;
both are gated on spawn_worker=True so REST-only tests stay fast.

2 new integration tests: NOTIFY path (wake within 2 s despite 60 s
poll) + missed-NOTIFY path (fallback poll recovers within 5 s). 33
integration tests total, 209 unit. Forgejo Actions trigger is flaky;
local verification is the gate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:52:26 +02:00

153 lines
4.8 KiB
Python

"""Integration tests for the PgQueueListener + worker integration (Task 3.6).
Two scenarios:
1. NOTIFY delivered — worker wakes within ~1 s and picks the row up.
2. Missed NOTIFY — the row still gets picked up by the fallback poll.
Both run a real worker + listener against a live Postgres. We drive them via
``asyncio.gather`` + a "until done" watchdog.
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
from sqlalchemy import text
from ix.adapters.pg_queue.listener import PgQueueListener, asyncpg_dsn_from_sqlalchemy_url
from ix.contracts.request import Context, RequestIX
from ix.pipeline.pipeline import Pipeline
from ix.pipeline.step import Step
from ix.store import jobs_repo
from ix.worker.loop import Worker
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class _PassingStep(Step):
"""Same minimal fake as test_worker_loop — keeps these suites independent."""
step_name = "fake_pass"
async def validate(self, request_ix, response_ix): # type: ignore[no-untyped-def]
return True
async def process(self, request_ix, response_ix): # type: ignore[no-untyped-def]
response_ix.use_case = request_ix.use_case
return response_ix
def _factory() -> Pipeline:
return Pipeline(steps=[_PassingStep()])
async def _wait_for_status(
session_factory: async_sessionmaker[AsyncSession],
job_id,
target: str,
timeout_s: float,
) -> bool:
deadline = asyncio.get_event_loop().time() + timeout_s
while asyncio.get_event_loop().time() < deadline:
async with session_factory() as session:
job = await jobs_repo.get(session, job_id)
if job is not None and job.status == target:
return True
await asyncio.sleep(0.1)
return False
async def test_notify_wakes_worker_within_2s(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Direct INSERT + NOTIFY → worker picks it up fast (not via the poll)."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# 60 s fallback poll — if we still find the row within 2 s it's
# because NOTIFY woke us, not the poll.
poll_interval_seconds=60.0,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
# Give the worker one tick to reach the sleep_or_wake branch.
await asyncio.sleep(0.3)
# Insert a pending row manually + NOTIFY — simulates a direct-SQL client
# like an external batch script.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="notify-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
async with session_factory() as session:
await session.execute(
text(f"NOTIFY ix_jobs_new, '{job.job_id}'")
)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 3.0), (
"worker didn't pick up the NOTIFY'd row in time"
)
stop.set()
await worker_task
await listener.stop()
async def test_missed_notify_falls_back_to_poll(
session_factory: async_sessionmaker[AsyncSession],
postgres_url: str,
) -> None:
"""Row lands without a NOTIFY; fallback poll still picks it up."""
listener = PgQueueListener(dsn=asyncpg_dsn_from_sqlalchemy_url(postgres_url))
await listener.start()
worker = Worker(
session_factory=session_factory,
pipeline_factory=_factory,
# Short poll so the fallback kicks in quickly — we need the test
# to finish in seconds, not the spec's 10 s.
poll_interval_seconds=0.5,
max_running_seconds=3600,
wait_for_work=listener.wait_for_work,
)
stop = asyncio.Event()
worker_task = asyncio.create_task(worker.run(stop))
await asyncio.sleep(0.3)
# Insert without NOTIFY: simulate a buggy writer.
request = RequestIX(
use_case="bank_statement_header",
ix_client_id="pgq",
request_id="missed-1",
context=Context(texts=["hi"]),
)
async with session_factory() as session:
job = await jobs_repo.insert_pending(session, request, callback_url=None)
await session.commit()
assert await _wait_for_status(session_factory, job.job_id, "done", 5.0), (
"fallback poll didn't pick up the row"
)
stop.set()
await worker_task
await listener.stop()