Lands the async-friendly Alembic env (NullPool, reads IX_POSTGRES_URL), the hand-written 001 migration matching the spec's table layout exactly (CHECK on status, partial index on pending rows, UNIQUE on (client_id, request_id)), the SQLAlchemy 2.0 ORM mapping, and a lazy engine/session factory. The factory reads the URL through ix.config when available; Task 3.2 makes that the only path. Smoke-tested: alembic upgrade head + downgrade base against a live postgres:16 produce the expected table shape and tear down cleanly. Unit tests assert the migration source contains every required column/index so the migration can't drift from spec at import time. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
90 lines
3.3 KiB
Python
90 lines
3.3 KiB
Python
"""Initial migration — creates the ``ix_jobs`` table per spec §4.
|
|
|
|
Hand-written (do NOT ``alembic revision --autogenerate``) so the table layout
|
|
stays byte-exact with the MVP spec. autogenerate tends to add/drop indexes in
|
|
an order that makes diffs noisy and occasionally swaps JSONB for JSON on
|
|
dialects that don't distinguish them.
|
|
|
|
Revision ID: 001
|
|
Revises:
|
|
Create Date: 2026-04-18
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlalchemy as sa
|
|
from alembic import op
|
|
from sqlalchemy.dialects import postgresql
|
|
|
|
|
|
# revision identifiers, used by Alembic.
|
|
revision = "001"
|
|
down_revision = None
|
|
branch_labels = None
|
|
depends_on = None
|
|
|
|
|
|
def upgrade() -> None:
|
|
"""Create ``ix_jobs`` + its indexes exactly as spec §4 describes.
|
|
|
|
JSONB for ``request`` and ``response`` (Postgres-only; the MVP doesn't
|
|
support any other backend). CHECK constraint bakes the status enum into
|
|
the DDL so direct SQL inserts (the pg_queue_adapter path) can't land
|
|
bogus values. The partial index on ``status='pending'`` matches the
|
|
claim query's ``WHERE status='pending' ORDER BY created_at`` pattern.
|
|
"""
|
|
|
|
op.create_table(
|
|
"ix_jobs",
|
|
sa.Column("job_id", postgresql.UUID(as_uuid=True), primary_key=True),
|
|
sa.Column("ix_id", sa.Text(), nullable=False),
|
|
sa.Column("client_id", sa.Text(), nullable=False),
|
|
sa.Column("request_id", sa.Text(), nullable=False),
|
|
sa.Column("status", sa.Text(), nullable=False),
|
|
sa.Column("request", postgresql.JSONB(), nullable=False),
|
|
sa.Column("response", postgresql.JSONB(), nullable=True),
|
|
sa.Column("callback_url", sa.Text(), nullable=True),
|
|
sa.Column("callback_status", sa.Text(), nullable=True),
|
|
sa.Column("attempts", sa.Integer(), nullable=False, server_default="0"),
|
|
sa.Column(
|
|
"created_at",
|
|
sa.DateTime(timezone=True),
|
|
nullable=False,
|
|
server_default=sa.text("now()"),
|
|
),
|
|
sa.Column("started_at", sa.DateTime(timezone=True), nullable=True),
|
|
sa.Column("finished_at", sa.DateTime(timezone=True), nullable=True),
|
|
sa.CheckConstraint(
|
|
"status IN ('pending', 'running', 'done', 'error')",
|
|
name="ix_jobs_status_check",
|
|
),
|
|
sa.CheckConstraint(
|
|
"callback_status IS NULL OR callback_status IN "
|
|
"('pending', 'delivered', 'failed')",
|
|
name="ix_jobs_callback_status_check",
|
|
),
|
|
)
|
|
|
|
# Partial index: the claim query hits only pending rows ordered by age.
|
|
# Partial-ness keeps the index small as done/error rows accumulate.
|
|
op.create_index(
|
|
"ix_jobs_status_created",
|
|
"ix_jobs",
|
|
["status", "created_at"],
|
|
postgresql_where=sa.text("status = 'pending'"),
|
|
)
|
|
# Unique index on (client_id, request_id) enforces caller-side idempotency
|
|
# at the DB layer. The repo relies on the unique violation to detect an
|
|
# existing pending/running row and return it unchanged.
|
|
op.create_index(
|
|
"ix_jobs_client_request",
|
|
"ix_jobs",
|
|
["client_id", "request_id"],
|
|
unique=True,
|
|
)
|
|
|
|
|
|
def downgrade() -> None:
|
|
op.drop_index("ix_jobs_client_request", table_name="ix_jobs")
|
|
op.drop_index("ix_jobs_status_created", table_name="ix_jobs")
|
|
op.drop_table("ix_jobs")
|