Alembic Async Migrations and Schema Evolution for SQLAlchemy 2.0

Alembic is the canonical migration tool for SQLAlchemy projects, managing every schema change from initial table creation to complex multi-step zero-downtime alterations — and when your application uses async engines and connection pooling, configuring Alembic correctly to cooperate with asyncio requires specific patterns that differ substantially from the synchronous default setup. This guide covers the complete migration lifecycle: wiring Alembic's env.py to an async engine, generating and auditing revision scripts, executing zero-downtime schema changes in production, and hardening your migration pipeline for CI/CD deployment.

Alembic Async Migration Flow Diagram showing how SQLAlchemy DeclarativeBase MetaData feeds into Alembic autogenerate, produces a revision script, which runs upgrade or downgrade via an async engine, and updates the alembic_version table. ORM Models / DeclarativeBase target_metadata Autogenerate Diff Engine revision script Revision Script upgrade / downgrade run_sync Async Engine alembic_version table (DAG head) Revision DAG base a1b2 c3d4 head each node = one revision file (upgrade + downgrade) Online: live DB connection Offline: SQL script output

Architectural Foundations

How Alembic Relates to SQLAlchemy MetaData and DeclarativeBase

Alembic does not inspect your running application code at migration time — it introspects your MetaData object. In SQLAlchemy 2.0, the preferred pattern is a single DeclarativeBase subclass whose .metadata attribute accumulates the complete schema as ORM models are imported:

# app/db/base.py
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass
# app/models/user.py
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.db.base import Base

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(320), unique=True, nullable=False)
    created_at: Mapped[DateTime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

Alembic's autogenerate compares Base.metadata (what your Python models declare) against the live database schema (what actually exists) and emits the delta as a revision script. This bi-directional diff is the core value proposition: you express intent in Python, Alembic expresses it in SQL.

The relationship with DeclarativeBase is covered in depth in the SQLAlchemy 2.0 Core and ORM Architecture guide, which explains how MetaData participates in the ORM's unit-of-work identity map.

The Migration DAG and Revision Identifiers

Every revision is a node in a directed acyclic graph (DAG). Each file contains:

  • A revision identifier — a 12-character hex token generated by Alembic.
  • A down_revision pointer — the identifier of the immediately preceding revision (or None for the initial revision, or a tuple for merged branches).
  • An upgrade() function — the forward migration.
  • A downgrade() function — the reverse migration.

Alembic stores the current head revision in the alembic_version table (one row, one column by default). When you run alembic upgrade head, Alembic walks the DAG from the current row forward to the latest node and executes each upgrade() in sequence within individual transactions.

base ──► a1b2c3 ──► d4e5f6 ──► 7g8h9i  (head)
                        │
                        └──► branch_a  (feature branch)

Branch points arise when two developers create revisions concurrently from the same head. You resolve them with alembic merge, which produces a merge revision that lists both parents in down_revision.

Online vs Offline Mode

Online mode (the default) acquires a live database connection, executes DDL statements directly, and updates alembic_version atomically. This is what happens in alembic upgrade head.

Offline mode (alembic upgrade head --sql) renders the migration as a SQL script to stdout without touching the database. Production teams at companies with strict change-management processes use offline mode to produce a script that a DBA reviews and runs manually. Offline mode requires that your env.py calls context.configure(literal_binds=True) or handles the dialect-specific SQL rendering explicitly.

# Generate SQL script for DBA review
alembic upgrade head --sql > migrations/upgrade_$(date +%Y%m%d).sql

# Offline downgrade from current head to a specific revision
alembic downgrade d4e5f6 --sql >> migrations/downgrade_$(date +%Y%m%d).sql

Data Migrations vs Schema Migrations

Alembic is designed for schema migrations (DDL), but revision scripts frequently need to also perform data migrations (DML — INSERT, UPDATE, DELETE) to transform existing rows as the schema evolves. The key rule is to keep schema operations and data migrations in separate revisions so that each can be rolled back independently.

For data migrations, use op.get_bind() to access the underlying synchronous connection from within a migration function:

from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    # Access the synchronous connection — safe inside a run_sync callback
    bind = op.get_bind()

    # Use Core construct for portable, parameterized DML
    orders_table = sa.table(
        "orders",
        sa.column("id", sa.Integer),
        sa.column("status", sa.String),
        sa.column("legacy_status", sa.String),
    )

    # Batch update in chunks of 10 000 to limit lock duration
    offset = 0
    batch_size = 10_000
    while True:
        result = bind.execute(
            sa.select(orders_table.c.id, orders_table.c.legacy_status)
            .where(orders_table.c.status.is_(None))
            .limit(batch_size)
            .offset(offset)
        )
        rows = result.fetchall()
        if not rows:
            break
        bind.execute(
            orders_table.update()
            .where(orders_table.c.id.in_([r.id for r in rows]))
            .values(status=sa.case(
                (orders_table.c.legacy_status == "DONE", "fulfilled"),
                else_="pending",
            ))
        )
        offset += batch_size


def downgrade() -> None:
    # Reverting a data migration is often impractical — document why
    pass

Avoid using ORM session objects inside migration scripts. The op.get_bind() pattern works with raw Core expressions and remains safe across all async/sync engine configurations because run_sync provides the synchronous DBAPI connection.

Key Component Deep-Dive 1: Configuring Alembic for an Async Engine

The env.py Challenge

Alembic's default env.py template uses synchronous SQLAlchemy APIs. When your application uses create_async_engine, you must bridge the synchronous migration runner to the async driver using connection.run_sync. This is the single most common stumbling block when configuring Alembic with async SQLAlchemy engines.

The key insight is that Alembic's internal migration logic (context.run_migrations()) is synchronous. You must hand it a synchronous DBAPI connection obtained by unwrapping the async engine's underlying connection. SQLAlchemy provides AsyncConnection.run_sync() for exactly this purpose.

# alembic/env.py — full async-compatible implementation
import asyncio
from logging.config import fileConfig

from alembic import context
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

# Import your Base so MetaData is populated
from app.db.base import Base
from app.models import user, order, product  # noqa: F401 — triggers __tablename__ registration

config = context.config
fileConfig(config.config_file_name)

target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode — renders SQL without a live connection."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,
        compare_server_default=True,
    )
    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection) -> None:
    """Synchronous callback executed inside run_sync."""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
    )
    with context.begin_transaction():
        context.run_migrations()


async def run_migrations_online() -> None:
    """Run migrations in 'online' mode against a live async engine."""
    connectable: AsyncEngine = create_async_engine(
        config.get_main_option("sqlalchemy.url"),
        poolclass=pool.NullPool,  # NullPool: migrations are single-use; no persistent pool needed
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


if context.is_offline_mode():
    run_migrations_offline()
else:
    asyncio.run(run_migrations_online())

The NullPool is deliberate: migration processes are typically short-lived scripts, not long-running servers. Using NullPool guarantees that the connection is physically closed after the migration completes rather than being returned to a pool that will be immediately discarded.

alembic.ini Configuration

# alembic.ini (relevant sections)
[alembic]
script_location = alembic
prepend_sys_path = .

# Use an environment variable so secrets never live in source control
sqlalchemy.url = postgresql+asyncpg://%(DB_USER)s:%(DB_PASS)s@%(DB_HOST)s/%(DB_NAME)s

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

Inject credentials at runtime via environment variables or a secrets manager:

export DB_USER=app_user
export DB_PASS=s3cur3p4ss
export DB_HOST=db.internal
export DB_NAME=app_production
alembic upgrade head

Key Component Deep-Dive 2: Autogenerate Workflow and Reviewing Revisions

Setting Up target_metadata Correctly

target_metadata must reflect your complete schema at import time. Any model file that is not imported before env.py runs will be invisible to the autogenerate diff, causing Alembic to think those tables do not exist in your models and scheduling them for DROP TABLE. This is a source of data-loss incidents.

The safest pattern is a dedicated app/models/__init__.py that imports every model:

# app/models/__init__.py
from app.models.user import User       # noqa: F401
from app.models.order import Order     # noqa: F401
from app.models.product import Product # noqa: F401
from app.models.invoice import Invoice # noqa: F401
from app.models.tenant import Tenant   # noqa: F401

Then in env.py:

import app.models  # noqa: F401 — side-effect: all __tablename__ registrations fire
from app.db.base import Base

target_metadata = Base.metadata

Generating and Running a Revision

# Generate an autogenerated revision
alembic revision --autogenerate -m "add_invoice_due_date"

# Inspect what was generated
cat alembic/versions/<timestamp>_add_invoice_due_date.py

# Apply to development database
alembic upgrade head

# Apply only the next revision (useful for incremental testing)
alembic upgrade +1

# Roll back one revision
alembic downgrade -1

# Show current database revision
alembic current

# Show full revision history
alembic history --verbose

Reading a Generated Revision

# alembic/versions/20260618_001_add_invoice_due_date.py
"""add invoice due_date

Revision ID: 3f7a1c8b2e49
Revises: 9d2e5a4b1c73
Create Date: 2026-06-18 09:14:32.887123
"""
from __future__ import annotations

from alembic import op
import sqlalchemy as sa

revision: str = "3f7a1c8b2e49"
down_revision: str | tuple = "9d2e5a4b1c73"
branch_labels: str | tuple | None = None
depends_on: str | tuple | None = None


def upgrade() -> None:
    op.add_column(
        "invoices",
        sa.Column(
            "due_date",
            sa.Date(),
            nullable=True,  # always nullable first — see zero-downtime section
        ),
    )


def downgrade() -> None:
    op.drop_column("invoices", "due_date")

What Autogenerate Can and Cannot Detect

Alembic autogenerate handles: column additions/removals, column type changes (with compare_type=True), index creation/deletion, foreign key changes, UniqueConstraint changes, and table creation/deletion.

It does not automatically detect: stored procedures, triggers, views, partial indexes (without custom comparison functions), CHECK constraints (dialect-dependent), column reordering, or server-default changes on some dialects unless compare_server_default=True is set and the dialect supports it.

Always audit the generated script before applying it to production. Pay particular attention to DROP TABLE and DROP COLUMN statements — autogenerate may emit these when a model import is missing from env.py.

Key Component Deep-Dive 3: Zero-Downtime Schema Change Strategies

The Expand/Contract Pattern

The fundamental principle of zero-downtime migrations is that schema changes must be backward-compatible with the currently running application version during the deployment window when both old and new code may be serving requests simultaneously. The expand/contract pattern splits each breaking change into three phases:

  1. Expand: Add the new structure (new column, new table, new index) without removing anything. Old code ignores the new column; new code writes to it.
  2. Migrate data: Backfill the new column/table with existing data. Do this in batches to avoid locking.
  3. Contract: Remove the old structure once all application instances have been updated and the backfill is complete.
# Phase 1 — Expand: add nullable column (no lock on Postgres)
def upgrade() -> None:
    op.add_column(
        "orders",
        sa.Column("fulfilled_at", sa.DateTime(timezone=True), nullable=True),
    )

def downgrade() -> None:
    op.drop_column("orders", "fulfilled_at")
# Phase 2 — Backfill (separate revision, run after app deploy)
def upgrade() -> None:
    # Batch update in chunks to avoid row-level lock accumulation
    op.execute("""
        UPDATE orders
        SET fulfilled_at = completed_at
        WHERE fulfilled_at IS NULL
          AND status = 'fulfilled'
    """)

def downgrade() -> None:
    pass  # Backfill reversal not required — just nullify
# Phase 3 — Contract: add NOT NULL constraint (Postgres 12+ validates without full lock)
def upgrade() -> None:
    # First add as NOT VALID to avoid full-table lock during validation
    op.execute("""
        ALTER TABLE orders
        ADD CONSTRAINT orders_fulfilled_at_nn
        CHECK (fulfilled_at IS NOT NULL) NOT VALID
    """)
    # Validate separately — uses ShareUpdateExclusiveLock, not AccessExclusiveLock
    op.execute("ALTER TABLE orders VALIDATE CONSTRAINT orders_fulfilled_at_nn")

def downgrade() -> None:
    op.execute("ALTER TABLE orders DROP CONSTRAINT orders_fulfilled_at_nn")

Adding a NOT NULL Column Without Locking

On PostgreSQL, ALTER TABLE ADD COLUMN col TYPE NOT NULL DEFAULT val rewrites the entire table in Postgres versions prior to 11. From Postgres 11 onward, adding a column with a constant DEFAULT is instantaneous because the default is stored in the catalog rather than rewritten into every row. Even so, the safe cross-version recipe is the autogenerate-friendly approach that avoids zero-downtime-schema-migration-strategies surprises:

# Step 1: Add nullable, no default
def upgrade_step1() -> None:
    op.add_column("products", sa.Column("sku", sa.String(64), nullable=True))


# Step 2: Backfill (application version N+1 writes sku on new rows)
def upgrade_step2() -> None:
    op.execute("UPDATE products SET sku = 'LEGACY-' || id::text WHERE sku IS NULL")


# Step 3: Set NOT NULL (fast on Postgres 12+ using NOT VALID + VALIDATE pattern)
def upgrade_step3() -> None:
    op.alter_column("products", "sku", nullable=False)

For the complete walkthrough of this pattern, see the zero-downtime schema migration strategies guide.

Concurrent Index Creation

Creating an index on a large table with the standard CREATE INDEX acquires a lock that blocks writes for the duration of the build. PostgreSQL offers CREATE INDEX CONCURRENTLY to build the index without a write lock, but Alembic's op.create_index() uses the standard form by default. To use the concurrent form:

from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    # postgresql_concurrently=True uses CREATE INDEX CONCURRENTLY
    # Must NOT be wrapped in a transaction — disable autocommit
    op.execute("COMMIT")  # End the Alembic-managed transaction
    op.create_index(
        "ix_orders_customer_id",
        "orders",
        ["customer_id"],
        postgresql_concurrently=True,
    )

def downgrade() -> None:
    op.execute("COMMIT")
    op.drop_index("ix_orders_customer_id", table_name="orders", postgresql_concurrently=True)

In the async env.py, set isolation_level="AUTOCOMMIT" on the connection before running concurrent index migrations:

async def run_concurrent_index_migration() -> None:
    engine = create_async_engine(DATABASE_URL, poolclass=pool.NullPool)
    async with engine.connect() as conn:
        await conn.execution_options(isolation_level="AUTOCOMMIT")
        await conn.run_sync(do_run_migrations)
    await engine.dispose()

Batch Operations for SQLite

SQLite does not support ALTER TABLE ... DROP COLUMN or ALTER TABLE ... RENAME COLUMN in older versions, and has no ALTER TABLE ... ALTER COLUMN at all. Alembic's batch mode works around this by recreating the table under a temporary name, copying all data, then renaming:

from alembic import op
import sqlalchemy as sa

def upgrade() -> None:
    with op.batch_alter_table("invoices", schema=None) as batch_op:
        batch_op.add_column(sa.Column("currency", sa.String(3), nullable=False, server_default="USD"))
        batch_op.drop_column("currency_code")  # old name
        batch_op.alter_column("amount", type_=sa.Numeric(12, 2))

def downgrade() -> None:
    with op.batch_alter_table("invoices", schema=None) as batch_op:
        batch_op.add_column(sa.Column("currency_code", sa.String(3), nullable=False, server_default="USD"))
        batch_op.drop_column("currency")
        batch_op.alter_column("amount", type_=sa.Numeric(10, 2))

Batch mode works on any dialect but is most commonly needed for SQLite-backed test databases. The autogenerating and reviewing migration scripts guide covers how to configure render_as_batch=True in alembic.ini for projects that need batch mode by default.

Advanced Patterns & Production Configuration

Running Migrations in CI/CD

The most reliable CI/CD migration pattern is to run alembic upgrade head as a pre-deployment step, before your new application containers start. Container orchestration platforms (Kubernetes, ECS) support this as an init container or a migration job.

# Dockerfile — migration stage
FROM python:3.11-slim AS migrator
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["alembic", "upgrade", "head"]
# kubernetes/migration-job.yaml
apiVersion: batch/v1
kind: Job
metadata:
  name: db-migration-{{ .Release.Revision }}
spec:
  template:
    spec:
      restartPolicy: Never
      containers:
        - name: alembic
          image: myapp:{{ .Values.image.tag }}
          command: ["alembic", "upgrade", "head"]
          envFrom:
            - secretRef:
                name: db-credentials
      initContainers: []

Fail fast if the migration job fails — never proceed to rolling out application containers with a schema that is out of sync.

For rollback safety, always verify that downgrade() functions are implemented and tested. A common CI check:

# In CI pipeline: verify round-trip integrity
alembic upgrade head
alembic downgrade -1
alembic upgrade head

Multiple Bases and Branch Labels

Large applications split migrations across multiple branches — for example, separating application schema from audit logging or multi-tenant infrastructure tables. Alembic supports this via multiple bases:

# Create a second revision root
alembic revision --head=base --branch-label=audit --autogenerate -m "create_audit_log"

# Apply only the audit branch
alembic upgrade audit@head

# Apply both branches (standard and audit)
alembic upgrade heads

Each branch tracks its own head in alembic_version, which allows teams to merge and deploy branch migrations independently without conflicts.

version_table Schema Placement

By default, Alembic creates alembic_version in the default schema (public for PostgreSQL). In multi-tenant or multi-schema applications, you may want to control this placement:

# In env.py context.configure()
context.configure(
    connection=connection,
    target_metadata=target_metadata,
    version_table="alembic_version",
    version_table_schema="migrations",  # Store version table in dedicated schema
    include_schemas=True,               # Autogenerate across all schemas
    compare_type=True,
    compare_server_default=True,
)

Pair this with include_object to filter which schemas and tables participate in autogenerate:

def include_object(object, name, type_, reflected, compare_to):
    # Skip PostGIS system tables and pg_catalog artifacts
    if type_ == "table" and name.startswith("spatial_ref_sys"):
        return False
    if type_ == "table" and object.schema in ("tiger", "topology"):
        return False
    return True

context.configure(
    ...,
    include_object=include_object,
)

Lock Handling and Migration Timeouts

On high-traffic databases, DDL migrations compete with application queries for table-level locks. Set lock_timeout at the session level to fail fast rather than queue behind long-running transactions:

def do_run_migrations(connection) -> None:
    # Fail if we cannot acquire the lock within 5 seconds
    connection.execute(sa.text("SET lock_timeout = '5s'"))
    connection.execute(sa.text("SET statement_timeout = '120s'"))

    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )
    with context.begin_transaction():
        context.run_migrations()

For deployments that cannot afford even a 5-second lock wait, restructure the migration using the expand/contract pattern so that no individual DDL statement acquires more than a ShareUpdateExclusiveLock.

Stamping and Baseline Management

When adopting Alembic for an existing database that was not previously managed by it, use alembic stamp to record a baseline without running any migrations:

# Record that the database is already at revision 9d2e5a4b1c73
# without executing any upgrade() functions
alembic stamp 9d2e5a4b1c73

# Stamp to head (marks all current revisions as applied)
alembic stamp head

This is the correct onboarding procedure for brownfield projects. Never run alembic upgrade head against a pre-existing production database without first auditing every revision's upgrade() function — autogenerated scripts may contain DROP statements that assume a clean slate.

Pool Configuration for Async Migration Engines

The async engine setup for migrations differs from the application engine in the async engines and connection pooling guide. Migration engines should always use NullPool because:

  1. Migrations run synchronously (one revision at a time).
  2. The migration process exits after completion — pooled connections are wasted.
  3. Some DDL operations (particularly CREATE INDEX CONCURRENTLY) must run outside a transaction, which is incompatible with pool-managed connection state.
from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import create_async_engine

migration_engine = create_async_engine(
    DATABASE_URL,
    poolclass=NullPool,
    echo=True,   # Log all SQL during migrations for audit trail
    future=True,
)

Custom Revision Templates

The default revision template is minimal. For production teams, a custom template enforces a house style: structured docstrings, type annotations, and explicit rollback verification comments.

# alembic/script.py.mako  (Mako template, not Python — syntax is intentional)
"""${message}

Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}

Notes:
    - Reviewed by: <reviewer>
    - Rollback tested: <yes/no>
    - Lock risk: <low/medium/high>
"""
from __future__ import annotations

from alembic import op
import sqlalchemy as sa
${imports if imports else ""}

revision: str = ${repr(up_revision)}
down_revision: str | tuple | None = ${repr(down_revision)}
branch_labels: str | tuple | None = ${repr(branch_labels)}
depends_on: str | tuple | None = ${repr(depends_on)}


def upgrade() -> None:
    ${upgrades if upgrades else "pass"}


def downgrade() -> None:
    ${downgrades if downgrades else "pass"}

Configure the custom template location in alembic.ini:

[alembic]
# Path to custom revision template
revision_environment = true
file_template = %%(year)d%%(month).2d%%(day).2d_%%(rev)s_%%(slug)s

Detecting and Recovering from a Split-Brain Version Table

In distributed deployments, two migration processes may run concurrently if a network partition causes a delayed migration job to fire twice. PostgreSQL advisory locks prevent this:

import hashlib

def do_run_migrations(connection) -> None:
    # Acquire a session-level advisory lock keyed to the database name
    # so only one migration process runs at a time across the cluster
    lock_key = int(hashlib.md5(b"alembic_migration_lock").hexdigest()[:8], 16)
    connection.execute(sa.text(f"SELECT pg_advisory_lock({lock_key})"))
    try:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            compare_type=True,
            compare_server_default=True,
        )
        with context.begin_transaction():
            context.run_migrations()
    finally:
        connection.execute(sa.text(f"SELECT pg_advisory_unlock({lock_key})"))

The advisory lock is session-scoped: if the migration process crashes, PostgreSQL automatically releases the lock when the connection closes, preventing deadlocks.

Common Pitfalls & Anti-Patterns

  • Can't load plugin: sqlalchemy.dialects:postgres — You specified postgres:// instead of postgresql:// or postgresql+asyncpg:// in sqlalchemy.url. The postgres:// scheme was removed in SQLAlchemy 1.4. Fix: update alembic.ini to use postgresql+asyncpg:// and ensure the asyncpg package is installed.
  • Target database is not up to date — Alembic refuses to run revision --autogenerate when the database is behind the current head. Fix: run alembic upgrade head first to bring the database to the latest revision before generating a new one. In CI, this error indicates a missing migration step in the pipeline.
  • MissingGreenlet: greenlet_spawn has not been called — You are calling an async SQLAlchemy ORM method from synchronous Alembic migration code without using connection.run_sync. Fix: ensure all ORM calls inside upgrade() or downgrade() use op.execute(sa.text(...)) (pure SQL via Core) rather than AsyncSession or async ORM methods. If you need ORM-level data manipulation, use a synchronous session obtained inside a run_sync callback.
  • Autogenerate produces an empty migration — The most common cause is that model files are not imported before env.py sets target_metadata. The MetaData object only knows about tables whose ORM classes have been instantiated. Fix: add explicit imports of all model modules in env.py or in a centralized app/models/__init__.py that env.py imports.
  • Autogenerate drops tables it should not — A variation of the missing-import problem. Fix: audit target_metadata.tables in a Python shell to confirm it contains all expected tables before running autogenerate. Also check include_schemas if you use non-default schemas.
  • sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction is rolled back — A previous DDL statement failed and left the connection in an aborted transaction state. This typically happens when CREATE INDEX CONCURRENTLY is attempted inside a transaction. Fix: call op.execute("COMMIT") before any concurrent DDL and configure the migration connection with isolation_level="AUTOCOMMIT" for that specific operation.
  • FAILED: Multiple head revisions are present for given argument 'head' — Two developers generated revisions from the same head simultaneously, creating a branch. Fix: run alembic merge heads -m "merge_feature_branches" to create a merge revision that reconciles both branches.

Frequently Asked Questions

Can I use Alembic's async env.py with FastAPI's lifespan event? Yes, but it is not recommended. Running alembic upgrade head inside a FastAPI lifespan event makes your application startup dependent on migration success and serializes startup across replicas. The preferred pattern is to run migrations as a separate Kubernetes Job or Fargate task before rolling out new application instances, so that the application only starts after the schema is confirmed up to date.

Why does asyncio.run(run_migrations_online()) fail with "This event loop is already running"? This error occurs when Alembic is invoked inside an already-running event loop — for example, inside a Jupyter notebook or a test that uses pytest-asyncio with a module-scoped loop. Fix: replace asyncio.run() with anyio.from_thread.run_sync() or ensure that migrations are always invoked as a subprocess (subprocess.run(["alembic", "upgrade", "head"])) rather than being called directly from async application code.

Does autogenerate detect changes to PostgreSQL ENUM types? By default, Alembic does not detect ENUM type changes because enums are shared across tables and their alteration is complex. You must implement a custom include_object or process_revision_directives hook, or manage ENUM migrations manually with op.execute("ALTER TYPE ..."). The alembic-utils third-party package provides improved ENUM change detection.

How do I run migrations against a test database that uses aiosqlite? Set sqlalchemy.url = sqlite+aiosqlite:///./test.db in alembic.ini (or override via environment variable) and enable render_as_batch = true in the [alembic] section so that all column alterations use the batch recreation strategy compatible with SQLite's limited DDL support.

What happens if a migration fails halfway through? Alembic wraps each revision's upgrade() in a transaction by default (with context.begin_transaction()). If the revision raises an exception, the transaction is rolled back and alembic_version is not updated, leaving the database in the pre-migration state. For DDL statements that cannot run inside a transaction (concurrent index creation, some PostgreSQL-specific operations), you must handle partial failure manually — typically by writing the downgrade() to be idempotent (DROP INDEX IF EXISTS, DROP COLUMN IF EXISTS).

How do I prevent autogenerate from including PostGIS or extension tables? Implement an include_object callback in env.py that returns False for tables you want to exclude, and pass it to context.configure(include_object=include_object). For extension schemas, also set include_schemas=False or explicitly list only the schemas you manage with include_schemas=["public", "app"].