infoxtractor/alembic/versions/001_initial_ix_jobs.py
Dirk Riemann 1c60c30084
All checks were successful
tests / test (push) Successful in 1m15s
tests / test (pull_request) Successful in 1m2s
feat(store): Alembic scaffolding + initial ix_jobs migration (spec §4)
Lands the async-friendly Alembic env (NullPool, reads IX_POSTGRES_URL), the
hand-written 001 migration matching the spec's table layout exactly
(CHECK on status, partial index on pending rows, UNIQUE on
(client_id, request_id)), the SQLAlchemy 2.0 ORM mapping, and a lazy
engine/session factory. The factory reads the URL through ix.config when
available; Task 3.2 makes that the only path.

Smoke-tested: alembic upgrade head + downgrade base against a live
postgres:16 produce the expected table shape and tear down cleanly.
Unit tests assert the migration source contains every required column/index
so the migration can't drift from spec at import time.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-18 11:37:21 +02:00

90 lines
3.3 KiB
Python

"""Initial migration — creates the ``ix_jobs`` table per spec §4.
Hand-written (do NOT ``alembic revision --autogenerate``) so the table layout
stays byte-exact with the MVP spec. autogenerate tends to add/drop indexes in
an order that makes diffs noisy and occasionally swaps JSONB for JSON on
dialects that don't distinguish them.
Revision ID: 001
Revises:
Create Date: 2026-04-18
"""
from __future__ import annotations
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "001"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
"""Create ``ix_jobs`` + its indexes exactly as spec §4 describes.
JSONB for ``request`` and ``response`` (Postgres-only; the MVP doesn't
support any other backend). CHECK constraint bakes the status enum into
the DDL so direct SQL inserts (the pg_queue_adapter path) can't land
bogus values. The partial index on ``status='pending'`` matches the
claim query's ``WHERE status='pending' ORDER BY created_at`` pattern.
"""
op.create_table(
"ix_jobs",
sa.Column("job_id", postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column("ix_id", sa.Text(), nullable=False),
sa.Column("client_id", sa.Text(), nullable=False),
sa.Column("request_id", sa.Text(), nullable=False),
sa.Column("status", sa.Text(), nullable=False),
sa.Column("request", postgresql.JSONB(), nullable=False),
sa.Column("response", postgresql.JSONB(), nullable=True),
sa.Column("callback_url", sa.Text(), nullable=True),
sa.Column("callback_status", sa.Text(), nullable=True),
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.text("now()"),
),
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
sa.CheckConstraint(
"status IN ('pending', 'running', 'done', 'error')",
name="ix_jobs_status_check",
),
sa.CheckConstraint(
"callback_status IS NULL OR callback_status IN "
"('pending', 'delivered', 'failed')",
name="ix_jobs_callback_status_check",
),
)
# Partial index: the claim query hits only pending rows ordered by age.
# Partial-ness keeps the index small as done/error rows accumulate.
op.create_index(
"ix_jobs_status_created",
"ix_jobs",
["status", "created_at"],
postgresql_where=sa.text("status = 'pending'"),
)
# Unique index on (client_id, request_id) enforces caller-side idempotency
# at the DB layer. The repo relies on the unique violation to detect an
# existing pending/running row and return it unchanged.
op.create_index(
"ix_jobs_client_request",
"ix_jobs",
["client_id", "request_id"],
unique=True,
)
def downgrade() -> None:
op.drop_index("ix_jobs_client_request", table_name="ix_jobs")
op.drop_index("ix_jobs_status_created", table_name="ix_jobs")
op.drop_table("ix_jobs")