infoxtractor/tests/integration/conftest.py
Dirk Riemann 141153ffa7
All checks were successful
tests / test (push) Successful in 1m10s
tests / test (pull_request) Successful in 1m10s
feat(store): JobsRepo CRUD over ix_jobs + integration fixtures (spec §4)
JobsRepo covers the full job-lifecycle surface:

- insert_pending: idempotent on (client_id, request_id) via ON CONFLICT
  DO NOTHING + re-select; assigns a 16-hex ix_id.
- claim_next_pending: FOR UPDATE SKIP LOCKED so concurrent workers never
  double-dispatch a row.
- get / get_by_correlation: hydrates JSONB back through Pydantic.
- mark_done: done iff response.error is None, else error.
- mark_error: explicit convenience wrapper.
- update_callback_status: delivered | failed (no status transition).
- sweep_orphans: time-based rescue of stuck running rows; attempts++.

Integration fixtures (tests/integration/conftest.py):
- Skip cleanly when neither IX_TEST_DATABASE_URL nor IX_POSTGRES_URL is
  set (unit suite stays runnable on a bare laptop).
- Alembic upgrade/downgrade runs in a subprocess so its internal
  asyncio.run() doesn't collide with pytest-asyncio's loop.
- Per-test engine + truncate so loops never cross and tests start clean.

15 integration tests against a live postgres:16, including SKIP LOCKED
concurrency + orphan sweep.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:43:11 +02:00

124 lines
4.2 KiB
Python

"""Integration-test fixtures — real Postgres required.
Policy: tests that import these fixtures skip cleanly when no DB is
configured. We check ``IX_TEST_DATABASE_URL`` first (local developer
override, usually a disposable docker container), then ``IX_POSTGRES_URL``
(what Forgejo Actions already sets). If neither is present the fixture
short-circuits with ``pytest.skip`` so a developer running
``pytest tests/unit`` in an unconfigured shell doesn't see the integration
suite hang or raise cryptic ``OperationalError``.
Schema lifecycle:
* session scope: ``alembic upgrade head`` once, ``alembic downgrade base``
at session end. We tried ``Base.metadata.create_all`` at first — faster,
but it meant migrations stayed untested by the integration suite and a
developer who broke ``001_initial_ix_jobs.py`` wouldn't find out until
deploy. Current shape keeps migrations in the hot path.
* per-test: ``TRUNCATE ix_jobs`` (via the ``_reset_schema`` autouse fixture)
— faster than recreating the schema and preserves indexes/constraints so
tests that want to assert ON a unique-violation path actually get one.
"""
from __future__ import annotations
import os
import subprocess
import sys
from collections.abc import AsyncIterator, Iterator
from pathlib import Path
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
REPO_ROOT = Path(__file__).resolve().parents[2]
def _resolve_postgres_url() -> str | None:
"""Pick the database URL per policy: test override → CI URL → none."""
return os.environ.get("IX_TEST_DATABASE_URL") or os.environ.get("IX_POSTGRES_URL")
@pytest.fixture(scope="session")
def postgres_url() -> str:
url = _resolve_postgres_url()
if not url:
pytest.skip(
"no postgres configured — set IX_TEST_DATABASE_URL or IX_POSTGRES_URL"
)
return url
def _run_alembic(direction: str, postgres_url: str) -> None:
"""Invoke Alembic in a subprocess so its ``asyncio.run`` inside ``env.py``
doesn't collide with the pytest-asyncio event loop.
We pass the URL via ``IX_POSTGRES_URL`` — not ``-x url=...`` — because
percent-encoded characters in developer passwords trip up alembic's
configparser-backed ini loader. The env var lane skips configparser.
"""
env = os.environ.copy()
env["IX_POSTGRES_URL"] = postgres_url
subprocess.run(
[sys.executable, "-m", "alembic", direction, "head" if direction == "upgrade" else "base"],
cwd=REPO_ROOT,
env=env,
check=True,
)
@pytest.fixture(scope="session", autouse=True)
def _prepare_schema(postgres_url: str) -> Iterator[None]:
"""Run migrations once per session, torn down at the end.
pytest-asyncio creates one event loop per test (function-scoped by
default) and asyncpg connections can't survive a loop switch. That
forces a function-scoped engine below — but migrations are expensive,
so we keep those session-scoped via a subprocess call (no loop
involvement at all).
"""
_run_alembic("downgrade", postgres_url)
_run_alembic("upgrade", postgres_url)
yield
_run_alembic("downgrade", postgres_url)
@pytest_asyncio.fixture
async def engine(postgres_url: str) -> AsyncIterator[AsyncEngine]:
"""Per-test async engine.
Built fresh each test so its asyncpg connections live on the same loop
as the test itself. Dispose on teardown — otherwise asyncpg leaks tasks
into the next test's loop and we get ``got Future attached to a
different loop`` errors on the second test in a file.
"""
eng = create_async_engine(postgres_url, pool_pre_ping=True)
try:
yield eng
finally:
await eng.dispose()
@pytest_asyncio.fixture
async def session_factory(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]:
"""Per-test session factory. ``expire_on_commit=False`` per prod parity."""
return async_sessionmaker(engine, expire_on_commit=False)
@pytest_asyncio.fixture(autouse=True)
async def _reset_schema(engine: AsyncEngine) -> None:
"""Truncate ix_jobs between tests so each test starts from empty state."""
async with engine.begin() as conn:
await conn.exec_driver_sql("TRUNCATE ix_jobs")