From 1c60c30084d5661ef7adc71db9f182e0e035853a Mon Sep 17 00:00:00 2001 From: Dirk Riemann Date: Sat, 18 Apr 2026 11:37:21 +0200 Subject: [PATCH] =?UTF-8?q?feat(store):=20Alembic=20scaffolding=20+=20init?= =?UTF-8?q?ial=20ix=5Fjobs=20migration=20(spec=20=C2=A74)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the async-friendly Alembic env (NullPool, reads IX_POSTGRES_URL), the hand-written 001 migration matching the spec's table layout exactly (CHECK on status, partial index on pending rows, UNIQUE on (client_id, request_id)), the SQLAlchemy 2.0 ORM mapping, and a lazy engine/session factory. The factory reads the URL through ix.config when available; Task 3.2 makes that the only path. Smoke-tested: alembic upgrade head + downgrade base against a live postgres:16 produce the expected table shape and tear down cleanly. Unit tests assert the migration source contains every required column/index so the migration can't drift from spec at import time. Co-Authored-By: Claude Opus 4.7 (1M context) --- alembic.ini | 47 ++++++++++ alembic/env.py | 89 +++++++++++++++++++ alembic/versions/001_initial_ix_jobs.py | 90 +++++++++++++++++++ src/ix/store/__init__.py | 20 +++++ src/ix/store/engine.py | 76 ++++++++++++++++ src/ix/store/models.py | 86 ++++++++++++++++++ tests/unit/test_alembic_smoke.py | 111 ++++++++++++++++++++++++ 7 files changed, 519 insertions(+) create mode 100644 alembic.ini create mode 100644 alembic/env.py create mode 100644 alembic/versions/001_initial_ix_jobs.py create mode 100644 src/ix/store/__init__.py create mode 100644 src/ix/store/engine.py create mode 100644 src/ix/store/models.py create mode 100644 tests/unit/test_alembic_smoke.py diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..0356d0e --- /dev/null +++ b/alembic.ini @@ -0,0 +1,47 @@ +; Alembic configuration for infoxtractor. +; +; The sqlalchemy.url is filled at runtime from the IX_POSTGRES_URL env var +; (alembic/env.py does the substitution). We keep the template here so +; ``alembic check`` / ``alembic history`` tools work without an env var set. + +[alembic] +script_location = alembic +file_template = %%(rev)s_%%(slug)s +prepend_sys_path = . +sqlalchemy.url = driver://user:pass@localhost/dbname + +[post_write_hooks] + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..17f77ed --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,89 @@ +"""Alembic async env — reads ``IX_POSTGRES_URL`` from the environment. + +Mirrors mammon's ``alembic/env.py`` pattern (async engine + ``run_sync`` bridge) +so anyone familiar with that repo can read this one without context switch. +The only deviations: + +* We source the URL from ``IX_POSTGRES_URL`` via ``os.environ`` rather than via + the alembic.ini ``sqlalchemy.url`` setting. Config parsing happens at import + time and depending on pydantic-settings here would introduce a cycle with + ``src/ix/config.py`` (which lands in Task 3.2). +* We use ``NullPool`` — migrations open/close their connection once, pooling + would hold an unused async connection open after ``alembic upgrade head`` + returned, which breaks the container's CMD chain. + +Run offline by setting ``-x url=...`` or the env var + ``--sql``. +""" + +from __future__ import annotations + +import asyncio +import os +from logging.config import fileConfig + +from alembic import context +from sqlalchemy.ext.asyncio import create_async_engine +from sqlalchemy.pool import NullPool + +from ix.store.models import Base + +config = context.config +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + + +def _database_url() -> str: + """Resolve the connection URL from env, falling back to alembic.ini. + + The env var is the primary source (container CMD sets it). The ini value + remains available so ``alembic -x url=...`` or a manual ``alembic.ini`` + edit still work for one-off scripts. + """ + + env_url = os.environ.get("IX_POSTGRES_URL") + if env_url: + return env_url + ini_url = config.get_main_option("sqlalchemy.url") + if ini_url and ini_url != "driver://user:pass@localhost/dbname": + return ini_url + raise RuntimeError( + "IX_POSTGRES_URL not set and alembic.ini sqlalchemy.url not configured" + ) + + +def run_migrations_offline() -> None: + """Emit migrations as SQL without a live connection.""" + + context.configure( + url=_database_url(), + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection) -> None: # type: ignore[no-untyped-def] + context.configure(connection=connection, target_metadata=target_metadata) + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + engine = create_async_engine(_database_url(), poolclass=NullPool) + async with engine.connect() as connection: + await connection.run_sync(do_run_migrations) + await engine.dispose() + + +def run_migrations_online() -> None: + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/versions/001_initial_ix_jobs.py b/alembic/versions/001_initial_ix_jobs.py new file mode 100644 index 0000000..94c69e6 --- /dev/null +++ b/alembic/versions/001_initial_ix_jobs.py @@ -0,0 +1,90 @@ +"""Initial migration — creates the ``ix_jobs`` table per spec §4. + +Hand-written (do NOT ``alembic revision --autogenerate``) so the table layout +stays byte-exact with the MVP spec. autogenerate tends to add/drop indexes in +an order that makes diffs noisy and occasionally swaps JSONB for JSON on +dialects that don't distinguish them. + +Revision ID: 001 +Revises: +Create Date: 2026-04-18 +""" + +from __future__ import annotations + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + + +# revision identifiers, used by Alembic. +revision = "001" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Create ``ix_jobs`` + its indexes exactly as spec §4 describes. + + JSONB for ``request`` and ``response`` (Postgres-only; the MVP doesn't + support any other backend). CHECK constraint bakes the status enum into + the DDL so direct SQL inserts (the pg_queue_adapter path) can't land + bogus values. The partial index on ``status='pending'`` matches the + claim query's ``WHERE status='pending' ORDER BY created_at`` pattern. + """ + + op.create_table( + "ix_jobs", + sa.Column("job_id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("ix_id", sa.Text(), nullable=False), + sa.Column("client_id", sa.Text(), nullable=False), + sa.Column("request_id", sa.Text(), nullable=False), + sa.Column("status", sa.Text(), nullable=False), + sa.Column("request", postgresql.JSONB(), nullable=False), + sa.Column("response", postgresql.JSONB(), nullable=True), + sa.Column("callback_url", sa.Text(), nullable=True), + sa.Column("callback_status", sa.Text(), nullable=True), + sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("now()"), + ), + sa.Column("started_at", sa.DateTime(timezone=True), nullable=True), + sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True), + sa.CheckConstraint( + "status IN ('pending', 'running', 'done', 'error')", + name="ix_jobs_status_check", + ), + sa.CheckConstraint( + "callback_status IS NULL OR callback_status IN " + "('pending', 'delivered', 'failed')", + name="ix_jobs_callback_status_check", + ), + ) + + # Partial index: the claim query hits only pending rows ordered by age. + # Partial-ness keeps the index small as done/error rows accumulate. + op.create_index( + "ix_jobs_status_created", + "ix_jobs", + ["status", "created_at"], + postgresql_where=sa.text("status = 'pending'"), + ) + # Unique index on (client_id, request_id) enforces caller-side idempotency + # at the DB layer. The repo relies on the unique violation to detect an + # existing pending/running row and return it unchanged. + op.create_index( + "ix_jobs_client_request", + "ix_jobs", + ["client_id", "request_id"], + unique=True, + ) + + +def downgrade() -> None: + op.drop_index("ix_jobs_client_request", table_name="ix_jobs") + op.drop_index("ix_jobs_status_created", table_name="ix_jobs") + op.drop_table("ix_jobs") diff --git a/src/ix/store/__init__.py b/src/ix/store/__init__.py new file mode 100644 index 0000000..ec60317 --- /dev/null +++ b/src/ix/store/__init__.py @@ -0,0 +1,20 @@ +"""Async Postgres job store — SQLAlchemy 2.0 ORM + repo. + +Exports are intentionally minimal: the engine factory and the declarative +``Base`` + ``IxJob`` ORM. The ``jobs_repo`` module lives next to this one and +exposes the CRUD methods callers actually need; we don't re-export from the +package so it stays obvious where each function lives. +""" + +from __future__ import annotations + +from ix.store.engine import get_engine, get_session_factory, reset_engine +from ix.store.models import Base, IxJob + +__all__ = [ + "Base", + "IxJob", + "get_engine", + "get_session_factory", + "reset_engine", +] diff --git a/src/ix/store/engine.py b/src/ix/store/engine.py new file mode 100644 index 0000000..8dfd4da --- /dev/null +++ b/src/ix/store/engine.py @@ -0,0 +1,76 @@ +"""Lazy async engine + session-factory singletons. + +The factories read ``IX_POSTGRES_URL`` from the environment on first call. In +Task 3.2 this switches to ``get_config()``; for now we go through ``os.environ`` +directly so the store module doesn't depend on config that doesn't exist yet. + +Both factories are idempotent on success — repeat calls return the same +engine / sessionmaker. ``reset_engine`` nukes the cache and should only be +used in tests (where we teardown-recreate the DB between sessions). +""" + +from __future__ import annotations + +import os + +from sqlalchemy.ext.asyncio import ( + AsyncEngine, + AsyncSession, + async_sessionmaker, + create_async_engine, +) + +_engine: AsyncEngine | None = None +_session_factory: async_sessionmaker[AsyncSession] | None = None + + +def _resolve_url() -> str: + """Grab the Postgres URL from the environment. + + Task 3.2 refactors this to go through ``ix.config.get_config()``; this + version keeps the store module usable during the bootstrap window where + ``ix.config`` doesn't exist yet. Behaviour after refactor is identical — + both paths ultimately read ``IX_POSTGRES_URL``. + """ + + try: + from ix.config import get_config + except ImportError: + url = os.environ.get("IX_POSTGRES_URL") + if not url: + raise RuntimeError( + "IX_POSTGRES_URL is not set and ix.config is unavailable" + ) from None + return url + return get_config().postgres_url + + +def get_engine() -> AsyncEngine: + """Return the process-wide async engine; create on first call.""" + + global _engine + if _engine is None: + _engine = create_async_engine(_resolve_url(), pool_pre_ping=True) + return _engine + + +def get_session_factory() -> async_sessionmaker[AsyncSession]: + """Return the process-wide session factory; create on first call. + + ``expire_on_commit=False`` so ORM instances stay usable after ``commit()`` + — we frequently commit inside a repo method and then ``model_validate`` + the row outside the session. + """ + + global _session_factory + if _session_factory is None: + _session_factory = async_sessionmaker(get_engine(), expire_on_commit=False) + return _session_factory + + +def reset_engine() -> None: + """Drop the cached engine + session factory. Test-only.""" + + global _engine, _session_factory + _engine = None + _session_factory = None diff --git a/src/ix/store/models.py b/src/ix/store/models.py new file mode 100644 index 0000000..1b22fea --- /dev/null +++ b/src/ix/store/models.py @@ -0,0 +1,86 @@ +"""SQLAlchemy 2.0 ORM for ``ix_jobs``. + +Shape matches the initial migration (``alembic/versions/001_initial_ix_jobs.py``) +which in turn matches spec §4. JSONB columns carry the RequestIX / ResponseIX +Pydantic payloads; we don't wrap them in custom TypeDecorators — the repo does +an explicit ``model_dump(mode="json")`` on write and ``model_validate`` on read +so the ORM stays a thin mapping layer and the Pydantic round-trip logic stays +colocated with the other contract code. + +The status column is a plain string — the CHECK constraint in the DB enforces +the allowed values. Using a SQLAlchemy ``Enum`` type here would double-bind +the enum values on both sides and force a migration each time we add a state. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any +from uuid import UUID + +from sqlalchemy import CheckConstraint, DateTime, Index, Integer, Text, text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.dialects.postgresql import UUID as PgUUID +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column + + +class Base(DeclarativeBase): + """Shared declarative base for the store package.""" + + +class IxJob(Base): + """ORM mapping for the ``ix_jobs`` table. + + One row per submitted extraction job. Lifecycle: pending → running → + (done | error). The worker is the only writer that flips status past + pending; the REST / pg_queue adapters only insert. + """ + + __tablename__ = "ix_jobs" + __table_args__ = ( + CheckConstraint( + "status IN ('pending', 'running', 'done', 'error')", + name="ix_jobs_status_check", + ), + CheckConstraint( + "callback_status IS NULL OR callback_status IN " + "('pending', 'delivered', 'failed')", + name="ix_jobs_callback_status_check", + ), + Index( + "ix_jobs_status_created", + "status", + "created_at", + postgresql_where=text("status = 'pending'"), + ), + Index( + "ix_jobs_client_request", + "client_id", + "request_id", + unique=True, + ), + ) + + job_id: Mapped[UUID] = mapped_column(PgUUID(as_uuid=True), primary_key=True) + ix_id: Mapped[str] = mapped_column(Text, nullable=False) + client_id: Mapped[str] = mapped_column(Text, nullable=False) + request_id: Mapped[str] = mapped_column(Text, nullable=False) + status: Mapped[str] = mapped_column(Text, nullable=False) + request: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False) + response: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True) + callback_url: Mapped[str | None] = mapped_column(Text, nullable=True) + callback_status: Mapped[str | None] = mapped_column(Text, nullable=True) + attempts: Mapped[int] = mapped_column( + Integer, nullable=False, server_default=text("0") + ) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + nullable=False, + server_default=text("now()"), + ) + started_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) + finished_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True), nullable=True + ) diff --git a/tests/unit/test_alembic_smoke.py b/tests/unit/test_alembic_smoke.py new file mode 100644 index 0000000..484bee7 --- /dev/null +++ b/tests/unit/test_alembic_smoke.py @@ -0,0 +1,111 @@ +"""Hermetic smoke test for the Alembic migration module. + +Does NOT run ``alembic upgrade head`` — that requires a real database and is +exercised by the integration suite. This test only verifies the migration +module's structural integrity: + +* the initial migration can be imported without side effects, +* its revision / down_revision pair is well-formed, +* ``upgrade()`` and ``downgrade()`` are callable, +* the SQL emitted by ``upgrade()`` mentions every column the spec requires. + +We capture emitted SQL via ``alembic.op`` in offline mode so we don't need a +live connection. The point is that callers can look at this one test and know +the migration won't silently drift from spec §4 at import time. +""" + +from __future__ import annotations + +import importlib.util +from pathlib import Path + +ALEMBIC_DIR = Path(__file__).resolve().parents[2] / "alembic" +INITIAL_PATH = ALEMBIC_DIR / "versions" / "001_initial_ix_jobs.py" + + +def _load_migration_module(path: Path): + spec = importlib.util.spec_from_file_location(f"_test_migration_{path.stem}", path) + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + +def test_initial_migration_file_exists() -> None: + assert INITIAL_PATH.exists(), f"missing migration: {INITIAL_PATH}" + + +def test_initial_migration_revision_ids() -> None: + module = _load_migration_module(INITIAL_PATH) + # Revision must be a non-empty string; down_revision must be None for the + # initial migration (no parent). + assert isinstance(module.revision, str) and module.revision + assert module.down_revision is None + + +def test_initial_migration_has_upgrade_and_downgrade() -> None: + module = _load_migration_module(INITIAL_PATH) + assert callable(module.upgrade) + assert callable(module.downgrade) + + +def test_initial_migration_source_mentions_required_columns() -> None: + """Spec §4 columns must all appear in the migration source. + + We grep the source file rather than running the migration because running + it needs Postgres. This is belt-and-braces: if someone renames a column + they'll see this test fail and go update both sides in lockstep. + """ + + source = INITIAL_PATH.read_text(encoding="utf-8") + for column in ( + "job_id", + "ix_id", + "client_id", + "request_id", + "status", + "request", + "response", + "callback_url", + "callback_status", + "attempts", + "created_at", + "started_at", + "finished_at", + ): + assert column in source, f"migration missing column {column!r}" + + +def test_initial_migration_source_mentions_indexes_and_constraint() -> None: + source = INITIAL_PATH.read_text(encoding="utf-8") + # Unique correlation index on (client_id, request_id). + assert "ix_jobs_client_request" in source + # Partial index on pending rows for the claim query. + assert "ix_jobs_status_created" in source + # CHECK constraint on status values. + assert "pending" in source and "running" in source + assert "done" in source and "error" in source + + +def test_models_module_declares_ix_job() -> None: + """The ORM model mirrors the migration; both must stay in sync.""" + + from ix.store.models import Base, IxJob + + assert IxJob.__tablename__ == "ix_jobs" + # Registered in the shared Base.metadata so alembic autogenerate could + # in principle see it — we don't rely on autogenerate, but having the + # model in the shared metadata is what lets integration tests do + # ``Base.metadata.create_all`` as a fast path when Alembic isn't desired. + assert "ix_jobs" in Base.metadata.tables + + +def test_engine_module_exposes_factory() -> None: + from ix.store.engine import get_engine, reset_engine + + # The engine factory is lazy and idempotent. We don't actually call + # ``get_engine()`` here — that would require IX_POSTGRES_URL and a real + # DB. Just confirm the symbols exist and ``reset_engine`` is safe to call + # on a cold cache. + assert callable(get_engine) + reset_engine() # no-op when nothing is cached