Compare commits

...

2 commits

Author SHA1 Message Date
dc6d28bda1 Merge pull request 'feat(store): Alembic scaffolding + initial ix_jobs migration' (#18) from feat/alembic-init into main
Some checks are pending
tests / test (push) Waiting to run
2026-04-18 09:37:37 +00:00
1c60c30084 feat(store): Alembic scaffolding + initial ix_jobs migration (spec §4)
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m2s
Lands the async-friendly Alembic env (NullPool, reads IX_POSTGRES_URL), the
hand-written 001 migration matching the spec's table layout exactly
(CHECK on status, partial index on pending rows, UNIQUE on
(client_id, request_id)), the SQLAlchemy 2.0 ORM mapping, and a lazy
engine/session factory. The factory reads the URL through ix.config when
available; Task 3.2 makes that the only path.

Smoke-tested: alembic upgrade head + downgrade base against a live
postgres:16 produce the expected table shape and tear down cleanly.
Unit tests assert the migration source contains every required column/index
so the migration can't drift from spec at import time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:37:21 +02:00
7 changed files with 519 additions and 0 deletions

47
alembic.ini Normal file
View file

@ -0,0 +1,47 @@
; Alembic configuration for infoxtractor.
;
; The sqlalchemy.url is filled at runtime from the IX_POSTGRES_URL env var
; (alembic/env.py does the substitution). We keep the template here so
; ``alembic check`` / ``alembic history`` tools work without an env var set.
[alembic]
script_location = alembic
file_template = %%(rev)s_%%(slug)s
prepend_sys_path = .
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

89
alembic/env.py Normal file
View file

@ -0,0 +1,89 @@
"""Alembic async env — reads ``IX_POSTGRES_URL`` from the environment.
Mirrors mammon's ``alembic/env.py`` pattern (async engine + ``run_sync`` bridge)
so anyone familiar with that repo can read this one without context switch.
The only deviations:
* We source the URL from ``IX_POSTGRES_URL`` via ``os.environ`` rather than via
the alembic.ini ``sqlalchemy.url`` setting. Config parsing happens at import
time and depending on pydantic-settings here would introduce a cycle with
``src/ix/config.py`` (which lands in Task 3.2).
* We use ``NullPool`` migrations open/close their connection once, pooling
would hold an unused async connection open after ``alembic upgrade head``
returned, which breaks the container's CMD chain.
Run offline by setting ``-x url=...`` or the env var + ``--sql``.
"""
from __future__ import annotations
import asyncio
import os
from logging.config import fileConfig
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from ix.store.models import Base
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = Base.metadata
def _database_url() -> str:
"""Resolve the connection URL from env, falling back to alembic.ini.
The env var is the primary source (container CMD sets it). The ini value
remains available so ``alembic -x url=...`` or a manual ``alembic.ini``
edit still work for one-off scripts.
"""
env_url = os.environ.get("IX_POSTGRES_URL")
if env_url:
return env_url
ini_url = config.get_main_option("sqlalchemy.url")
if ini_url and ini_url != "driver://user:pass@localhost/dbname":
return ini_url
raise RuntimeError(
"IX_POSTGRES_URL not set and alembic.ini sqlalchemy.url not configured"
)
def run_migrations_offline() -> None:
"""Emit migrations as SQL without a live connection."""
context.configure(
url=_database_url(),
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection) -> None: # type: ignore[no-untyped-def]
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
engine = create_async_engine(_database_url(), poolclass=NullPool)
async with engine.connect() as connection:
await connection.run_sync(do_run_migrations)
await engine.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View file

@ -0,0 +1,90 @@
"""Initial migration — creates the ``ix_jobs`` table per spec §4.
Hand-written (do NOT ``alembic revision --autogenerate``) so the table layout
stays byte-exact with the MVP spec. autogenerate tends to add/drop indexes in
an order that makes diffs noisy and occasionally swaps JSONB for JSON on
dialects that don't distinguish them.
Revision ID: 001
Revises:
Create Date: 2026-04-18
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create ``ix_jobs`` + its indexes exactly as spec §4 describes.
JSONB for ``request`` and ``response`` (Postgres-only; the MVP doesn't
support any other backend). CHECK constraint bakes the status enum into
the DDL so direct SQL inserts (the pg_queue_adapter path) can't land
bogus values. The partial index on ``status='pending'`` matches the
claim query's ``WHERE status='pending' ORDER BY created_at`` pattern.
"""
op.create_table(
"ix_jobs",
sa.Column("job_id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("ix_id", sa.Text(), nullable=False),
sa.Column("client_id", sa.Text(), nullable=False),
sa.Column("request_id", sa.Text(), nullable=False),
sa.Column("status", sa.Text(), nullable=False),
sa.Column("request", postgresql.JSONB(), nullable=False),
sa.Column("response", postgresql.JSONB(), nullable=True),
sa.Column("callback_url", sa.Text(), nullable=True),
sa.Column("callback_status", sa.Text(), nullable=True),
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("now()"),
),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint(
"status IN ('pending', 'running', 'done', 'error')",
name="ix_jobs_status_check",
),
sa.CheckConstraint(
"callback_status IS NULL OR callback_status IN "
"('pending', 'delivered', 'failed')",
name="ix_jobs_callback_status_check",
),
)
# Partial index: the claim query hits only pending rows ordered by age.
# Partial-ness keeps the index small as done/error rows accumulate.
op.create_index(
"ix_jobs_status_created",
"ix_jobs",
["status", "created_at"],
postgresql_where=sa.text("status = 'pending'"),
)
# Unique index on (client_id, request_id) enforces caller-side idempotency
# at the DB layer. The repo relies on the unique violation to detect an
# existing pending/running row and return it unchanged.
op.create_index(
"ix_jobs_client_request",
"ix_jobs",
["client_id", "request_id"],
unique=True,
)
def downgrade() -> None:
op.drop_index("ix_jobs_client_request", table_name="ix_jobs")
op.drop_index("ix_jobs_status_created", table_name="ix_jobs")
op.drop_table("ix_jobs")

20
src/ix/store/__init__.py Normal file
View file

@ -0,0 +1,20 @@
"""Async Postgres job store — SQLAlchemy 2.0 ORM + repo.
Exports are intentionally minimal: the engine factory and the declarative
``Base`` + ``IxJob`` ORM. The ``jobs_repo`` module lives next to this one and
exposes the CRUD methods callers actually need; we don't re-export from the
package so it stays obvious where each function lives.
"""
from __future__ import annotations
from ix.store.engine import get_engine, get_session_factory, reset_engine
from ix.store.models import Base, IxJob
__all__ = [
"Base",
"IxJob",
"get_engine",
"get_session_factory",
"reset_engine",
]

76
src/ix/store/engine.py Normal file
View file

@ -0,0 +1,76 @@
"""Lazy async engine + session-factory singletons.
The factories read ``IX_POSTGRES_URL`` from the environment on first call. In
Task 3.2 this switches to ``get_config()``; for now we go through ``os.environ``
directly so the store module doesn't depend on config that doesn't exist yet.
Both factories are idempotent on success repeat calls return the same
engine / sessionmaker. ``reset_engine`` nukes the cache and should only be
used in tests (where we teardown-recreate the DB between sessions).
"""
from __future__ import annotations
import os
from sqlalchemy.ext.asyncio import (
AsyncEngine,
AsyncSession,
async_sessionmaker,
create_async_engine,
)
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
def _resolve_url() -> str:
"""Grab the Postgres URL from the environment.
Task 3.2 refactors this to go through ``ix.config.get_config()``; this
version keeps the store module usable during the bootstrap window where
``ix.config`` doesn't exist yet. Behaviour after refactor is identical —
both paths ultimately read ``IX_POSTGRES_URL``.
"""
try:
from ix.config import get_config
except ImportError:
url = os.environ.get("IX_POSTGRES_URL")
if not url:
raise RuntimeError(
"IX_POSTGRES_URL is not set and ix.config is unavailable"
) from None
return url
return get_config().postgres_url
def get_engine() -> AsyncEngine:
"""Return the process-wide async engine; create on first call."""
global _engine
if _engine is None:
_engine = create_async_engine(_resolve_url(), pool_pre_ping=True)
return _engine
def get_session_factory() -> async_sessionmaker[AsyncSession]:
"""Return the process-wide session factory; create on first call.
``expire_on_commit=False`` so ORM instances stay usable after ``commit()``
we frequently commit inside a repo method and then ``model_validate``
the row outside the session.
"""
global _session_factory
if _session_factory is None:
_session_factory = async_sessionmaker(get_engine(), expire_on_commit=False)
return _session_factory
def reset_engine() -> None:
"""Drop the cached engine + session factory. Test-only."""
global _engine, _session_factory
_engine = None
_session_factory = None

86
src/ix/store/models.py Normal file
View file

@ -0,0 +1,86 @@
"""SQLAlchemy 2.0 ORM for ``ix_jobs``.
Shape matches the initial migration (``alembic/versions/001_initial_ix_jobs.py``)
which in turn matches spec §4. JSONB columns carry the RequestIX / ResponseIX
Pydantic payloads; we don't wrap them in custom TypeDecorators — the repo does
an explicit ``model_dump(mode="json")`` on write and ``model_validate`` on read
so the ORM stays a thin mapping layer and the Pydantic round-trip logic stays
colocated with the other contract code.
The status column is a plain string the CHECK constraint in the DB enforces
the allowed values. Using a SQLAlchemy ``Enum`` type here would double-bind
the enum values on both sides and force a migration each time we add a state.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from uuid import UUID
from sqlalchemy import CheckConstraint, DateTime, Index, Integer, Text, text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.dialects.postgresql import UUID as PgUUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
"""Shared declarative base for the store package."""
class IxJob(Base):
"""ORM mapping for the ``ix_jobs`` table.
One row per submitted extraction job. Lifecycle: pending running
(done | error). The worker is the only writer that flips status past
pending; the REST / pg_queue adapters only insert.
"""
__tablename__ = "ix_jobs"
__table_args__ = (
CheckConstraint(
"status IN ('pending', 'running', 'done', 'error')",
name="ix_jobs_status_check",
),
CheckConstraint(
"callback_status IS NULL OR callback_status IN "
"('pending', 'delivered', 'failed')",
name="ix_jobs_callback_status_check",
),
Index(
"ix_jobs_status_created",
"status",
"created_at",
postgresql_where=text("status = 'pending'"),
),
Index(
"ix_jobs_client_request",
"client_id",
"request_id",
unique=True,
),
)
job_id: Mapped[UUID] = mapped_column(PgUUID(as_uuid=True), primary_key=True)
ix_id: Mapped[str] = mapped_column(Text, nullable=False)
client_id: Mapped[str] = mapped_column(Text, nullable=False)
request_id: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[str] = mapped_column(Text, nullable=False)
request: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
response: Mapped[dict[str, Any] | None] = mapped_column(JSONB, nullable=True)
callback_url: Mapped[str | None] = mapped_column(Text, nullable=True)
callback_status: Mapped[str | None] = mapped_column(Text, nullable=True)
attempts: Mapped[int] = mapped_column(
Integer, nullable=False, server_default=text("0")
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
server_default=text("now()"),
)
started_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
finished_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)

View file

@ -0,0 +1,111 @@
"""Hermetic smoke test for the Alembic migration module.
Does NOT run ``alembic upgrade head`` that requires a real database and is
exercised by the integration suite. This test only verifies the migration
module's structural integrity:
* the initial migration can be imported without side effects,
* its revision / down_revision pair is well-formed,
* ``upgrade()`` and ``downgrade()`` are callable,
* the SQL emitted by ``upgrade()`` mentions every column the spec requires.
We capture emitted SQL via ``alembic.op`` in offline mode so we don't need a
live connection. The point is that callers can look at this one test and know
the migration won't silently drift from spec §4 at import time.
"""
from __future__ import annotations
import importlib.util
from pathlib import Path
ALEMBIC_DIR = Path(__file__).resolve().parents[2] / "alembic"
INITIAL_PATH = ALEMBIC_DIR / "versions" / "001_initial_ix_jobs.py"
def _load_migration_module(path: Path):
spec = importlib.util.spec_from_file_location(f"_test_migration_{path.stem}", path)
assert spec is not None and spec.loader is not None
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_initial_migration_file_exists() -> None:
assert INITIAL_PATH.exists(), f"missing migration: {INITIAL_PATH}"
def test_initial_migration_revision_ids() -> None:
module = _load_migration_module(INITIAL_PATH)
# Revision must be a non-empty string; down_revision must be None for the
# initial migration (no parent).
assert isinstance(module.revision, str) and module.revision
assert module.down_revision is None
def test_initial_migration_has_upgrade_and_downgrade() -> None:
module = _load_migration_module(INITIAL_PATH)
assert callable(module.upgrade)
assert callable(module.downgrade)
def test_initial_migration_source_mentions_required_columns() -> None:
"""Spec §4 columns must all appear in the migration source.
We grep the source file rather than running the migration because running
it needs Postgres. This is belt-and-braces: if someone renames a column
they'll see this test fail and go update both sides in lockstep.
"""
source = INITIAL_PATH.read_text(encoding="utf-8")
for column in (
"job_id",
"ix_id",
"client_id",
"request_id",
"status",
"request",
"response",
"callback_url",
"callback_status",
"attempts",
"created_at",
"started_at",
"finished_at",
):
assert column in source, f"migration missing column {column!r}"
def test_initial_migration_source_mentions_indexes_and_constraint() -> None:
source = INITIAL_PATH.read_text(encoding="utf-8")
# Unique correlation index on (client_id, request_id).
assert "ix_jobs_client_request" in source
# Partial index on pending rows for the claim query.
assert "ix_jobs_status_created" in source
# CHECK constraint on status values.
assert "pending" in source and "running" in source
assert "done" in source and "error" in source
def test_models_module_declares_ix_job() -> None:
"""The ORM model mirrors the migration; both must stay in sync."""
from ix.store.models import Base, IxJob
assert IxJob.__tablename__ == "ix_jobs"
# Registered in the shared Base.metadata so alembic autogenerate could
# in principle see it — we don't rely on autogenerate, but having the
# model in the shared metadata is what lets integration tests do
# ``Base.metadata.create_all`` as a fast path when Alembic isn't desired.
assert "ix_jobs" in Base.metadata.tables
def test_engine_module_exposes_factory() -> None:
from ix.store.engine import get_engine, reset_engine
# The engine factory is lazy and idempotent. We don't actually call
# ``get_engine()`` here — that would require IX_POSTGRES_URL and a real
# DB. Just confirm the symbols exist and ``reset_engine`` is safe to call
# on a cold cache.
assert callable(get_engine)
reset_engine() # no-op when nothing is cached