"""Alembic async env — reads ``IX_POSTGRES_URL`` from the environment. Mirrors mammon's ``alembic/env.py`` pattern (async engine + ``run_sync`` bridge) so anyone familiar with that repo can read this one without context switch. The only deviations: * We source the URL from ``IX_POSTGRES_URL`` via ``os.environ`` rather than via the alembic.ini ``sqlalchemy.url`` setting. Config parsing happens at import time and depending on pydantic-settings here would introduce a cycle with ``src/ix/config.py`` (which lands in Task 3.2). * We use ``NullPool`` — migrations open/close their connection once, pooling would hold an unused async connection open after ``alembic upgrade head`` returned, which breaks the container's CMD chain. Run offline by setting ``-x url=...`` or the env var + ``--sql``. """ from __future__ import annotations import asyncio import os from logging.config import fileConfig from alembic import context from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.pool import NullPool from ix.store.models import Base config = context.config if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata def _database_url() -> str: """Resolve the connection URL from env, falling back to alembic.ini. The env var is the primary source (container CMD sets it). The ini value remains available so ``alembic -x url=...`` or a manual ``alembic.ini`` edit still work for one-off scripts. """ env_url = os.environ.get("IX_POSTGRES_URL") if env_url: return env_url ini_url = config.get_main_option("sqlalchemy.url") if ini_url and ini_url != "driver://user:pass@localhost/dbname": return ini_url raise RuntimeError( "IX_POSTGRES_URL not set and alembic.ini sqlalchemy.url not configured" ) def run_migrations_offline() -> None: """Emit migrations as SQL without a live connection.""" context.configure( url=_database_url(), target_metadata=target_metadata, literal_binds=True, dialect_opts={"paramstyle": "named"}, ) with context.begin_transaction(): context.run_migrations() def do_run_migrations(connection) -> None: # type: ignore[no-untyped-def] context.configure(connection=connection, target_metadata=target_metadata) with context.begin_transaction(): context.run_migrations() async def run_async_migrations() -> None: engine = create_async_engine(_database_url(), poolclass=NullPool) async with engine.connect() as connection: await connection.run_sync(do_run_migrations) await engine.dispose() def run_migrations_online() -> None: asyncio.run(run_async_migrations()) if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()