Configuring Alembic with Async SQLAlchemy Engines

Configuring Alembic to drive schema migrations through an async SQLAlchemy engine requires a specific wiring pattern that bridges Alembic's inherently synchronous migration runner with the asyncio event loop. This guide covers the complete async env.py structure, the run_sync bridging technique, environment-based URL configuration, and both online and offline migration modes — all within the broader Alembic Async Migrations and Schema Evolution workflow.

Concept and Execution Model

Alembic was designed around synchronous DBAPI connections. Its internal migration engine — the MigrationContext — expects a plain synchronous Connection object that it can call DDL through in a blocking fashion. SQLAlchemy 2.0's async engine does not expose such a connection directly; it wraps everything in an event-loop-aware async context manager.

The reconciliation is a two-step bridge:

  1. An asyncio.run() call starts a dedicated event loop for the migration process.
  2. Inside that coroutine, you acquire an AsyncConnection from a create_async_engine, then invoke connection.run_sync(...) to hand Alembic a true synchronous Connection object that it can use without any modifications to Alembic's internals.

This approach is officially supported and keeps your migration tooling free of monkey-patching or third-party wrappers. Alembic itself never sees the async layer; it operates against a synchronous facade that run_sync provides. The result is full compatibility with every Alembic feature — autogenerate, branching, batch operations, and custom migration templates — while your application code continues to use create_async_engine and AsyncSession at runtime.

Understanding this execution model matters before wiring env.py. The async engine you create inside env.py is a short-lived migration-time engine, entirely separate from the engine your application uses at request time. The two can share the same URL; they must not share the same instance, since connection pool state is not safe to transfer across process or event-loop boundaries.

Async env.py execution flow Diagram showing how asyncio.run() invokes an async coroutine, which creates an async engine, acquires an async connection, calls run_sync to obtain a synchronous connection, passes it to the Alembic MigrationContext, and finally emits DDL statements to the database. asyncio.run() event loop AsyncEngine create_async_engine run_sync sync Connection Migration Context context.configure() run_migrations() PostgreSQL DDL executed 1. Start loop 2. Create engine 3. Bridge sync 4. Configure & run Alembic never sees the async layer — run_sync provides a synchronous facade

Async Engine Setup and env.py Structure

Installing dependencies

Alembic, asyncpg, and SQLAlchemy must be present before any configuration takes place:

# requirements.txt or pyproject.toml dependencies
# alembic>=1.13.0
# sqlalchemy[asyncio]>=2.0.0
# asyncpg>=0.29.0

Initialize a fresh Alembic environment with alembic init alembic. This creates alembic.ini and alembic/env.py. The generated env.py is synchronous; you will rewrite it entirely.

The application metadata contract

Alembic's autogenerate feature compares the current database schema against your DeclarativeBase metadata. The binding point is target_metadata in env.py. Your application's model file must expose its Base.metadata object, and env.py imports it directly:

# app/models/base.py
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass
# app/models/user.py
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from .base import Base


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
    full_name: Mapped[str] = mapped_column(String(255), nullable=False)
    created_at: Mapped[DateTime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), nullable=False
    )
# app/models/order.py
from decimal import Decimal
from sqlalchemy import Numeric, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Base


class Order(Base):
    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
    status: Mapped[str] = mapped_column(String(32), nullable=False, default="pending")
    total: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)

Import every model module before env.py references Base.metadata, otherwise Alembic will not detect tables defined in unimported modules. A common pattern is a single app/models/__init__.py that imports each model:

# app/models/__init__.py
from .base import Base
from .user import User
from .order import Order
from .product import Product
from .invoice import Invoice

__all__ = ["Base", "User", "Order", "Product", "Invoice"]

Reading the database URL from the environment

Hard-coding the database URL in alembic.ini works for single-environment setups but breaks in CI pipelines and multi-tenant deployments. The recommended pattern reads from environment variables and overrides the alembic.ini value at runtime inside env.py:

# alembic/env.py — URL configuration section
import os
from alembic import context

# alembic.ini still has a placeholder; we override it here
DATABASE_URL = os.environ.get(
    "DATABASE_URL",
    "postgresql+asyncpg://postgres:postgres@localhost:5432/appdb",
)

config = context.config
config.set_main_option("sqlalchemy.url", DATABASE_URL)

For applications that centralize settings through Pydantic or python-decouple, call the settings object directly:

# Pydantic BaseSettings alternative
from app.config import settings  # exposes settings.database_url

config.set_main_option("sqlalchemy.url", str(settings.database_url))

This keeps the migration command clean: DATABASE_URL=postgresql+asyncpg://... alembic upgrade head — no alembic.ini edits needed per environment.

Complete Async env.py Implementation

Offline mode (SQL generation without a live connection)

Offline mode generates SQL scripts for manual review or deployment through a change-management pipeline. It does not require a live database connection and remains fully synchronous:

def run_migrations_offline() -> None:
    """
    Emit migration SQL to stdout without a live database connection.
    Useful for generating reviewed scripts for production deployments.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,
        compare_server_default=True,
    )

    with context.begin_transaction():
        context.run_migrations()

Online mode (async bridge pattern)

Online mode executes migrations against a live database. The async bridge is the critical section:

import asyncio
from logging.config import fileConfig

from sqlalchemy.ext.asyncio import create_async_engine, AsyncConnection
from alembic import context

# Import your app's metadata so autogenerate can compare schemas
from app.models import Base

target_metadata = Base.metadata

config = context.config
fileConfig(config.config_file_name)  # type: ignore[arg-type]


def do_run_migrations(connection: AsyncConnection) -> None:
    """
    Called inside run_sync — receives a plain synchronous Connection.
    This is the synchronous half that Alembic's MigrationContext expects.
    """
    context.configure(
        connection=connection,  # type: ignore[arg-type]
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        include_schemas=True,
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    """
    Create a short-lived async engine, acquire a connection,
    then delegate to the synchronous migration runner via run_sync.
    """
    connectable = create_async_engine(
        config.get_main_option("sqlalchemy.url"),  # type: ignore[arg-type]
        echo=False,
        pool_pre_ping=True,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:
    """Entry point called by Alembic when a live connection is available."""
    asyncio.run(run_async_migrations())

Wiring offline and online modes together

The bottom of env.py chooses the mode based on Alembic's context flag:

if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

The full env.py assembled:

# alembic/env.py
import asyncio
import os
from logging.config import fileConfig

from sqlalchemy.ext.asyncio import create_async_engine

from alembic import context

# --- Application imports ---
from app.models import Base  # pulls in all model modules via __init__.py

# --- Alembic config ---
config = context.config

if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Override URL from environment
DATABASE_URL = os.environ.get(
    "DATABASE_URL",
    "postgresql+asyncpg://postgres:postgres@localhost:5432/appdb",
)
config.set_main_option("sqlalchemy.url", DATABASE_URL)

target_metadata = Base.metadata


# --- Offline mode ---
def run_migrations_offline() -> None:
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,
        compare_server_default=True,
    )
    with context.begin_transaction():
        context.run_migrations()


# --- Online mode ---
def do_run_migrations(connection) -> None:  # type: ignore[type-arg]
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        include_schemas=True,
    )
    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    connectable = create_async_engine(
        config.get_main_option("sqlalchemy.url"),  # type: ignore[arg-type]
        echo=False,
        pool_pre_ping=True,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def run_migrations_online() -> None:
    asyncio.run(run_async_migrations())


# --- Dispatch ---
if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

For the detailed step-by-step walkthrough of each env.py configuration option and the full alembic.ini setup, see Setting Up Alembic env.py for asyncpg.

State Management and Session Boundaries

Why migrations use a separate engine

The engine in env.py must not be the same object your application uses for request handling. The application engine is typically created at startup and persists for the process lifetime, maintaining an open connection pool. The migration engine is ephemeral: it is created inside run_async_migrations(), used for the duration of the alembic upgrade head command, and disposed immediately after via await connectable.dispose().

Mixing the two creates problems:

  • The application pool may have open transactions that conflict with DDL locks.
  • Pool connections are bound to the event loop that created them. The migration script's asyncio.run() creates a new event loop; connections from the application's engine are invalid in it.
  • pool_pre_ping=True on the migration engine catches stale connections that may have accumulated if the database was restarted between CI steps.

Transaction management in migrations

Alembic wraps each migration script in a transaction by default (with context.begin_transaction()). PostgreSQL supports transactional DDL, meaning CREATE TABLE, ALTER TABLE, and CREATE INDEX can be rolled back if the migration fails mid-way. This is a significant safety guarantee. Do not disable transaction wrapping unless you are using a DDL statement that PostgreSQL does not allow inside a transaction, such as CREATE INDEX CONCURRENTLY. In that case, mark the migration as non-transactional:

# versions/20240618_add_invoice_index.py
from alembic import op

# directive to opt out of the wrapping transaction for this migration
def upgrade() -> None:
    op.execute("COMMIT")  # close the transaction alembic opened
    op.create_index(
        "ix_invoices_tenant_id",
        "invoices",
        ["tenant_id"],
        postgresql_concurrently=True,
    )

For zero-downtime DDL strategies like concurrent index creation, see Zero-Downtime Schema Migration Strategies.

Sharing metadata across application and migration layers

target_metadata = Base.metadata creates a reference, not a copy. Any change to Base.metadata after this line — such as dynamically registering new tables — is reflected in what Alembic compares against the live schema. Ensure all model imports complete before env.py references target_metadata. A delayed import or a lazy model registration (common in plugin architectures) will cause autogenerate to miss tables silently.

The safest pattern for large applications with many model modules is to enumerate imports explicitly in app/models/__init__.py rather than relying on autodiscovery. This makes the import surface visible and auditable.

Advanced Configuration Patterns

Multi-schema deployments

Applications that split models across PostgreSQL schemas (e.g., a public schema for shared tables and a tenants schema for per-tenant tables) need include_schemas=True in context.configure() and an include_object filter:

from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations


def include_object(object, name, type_, reflected, compare_to):
    # Only autogenerate for schemas we own
    if type_ == "table":
        return object.schema in (None, "public", "tenants")
    return True


def do_run_migrations(connection) -> None:  # type: ignore[type-arg]
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        compare_server_default=True,
        include_schemas=True,
        include_object=include_object,
        version_table_schema="public",  # keep alembic_version in public schema
    )
    with context.begin_transaction():
        context.run_migrations()

Tenant-aware migrations

SaaS platforms often need to apply the same migration to multiple tenant schemas. The pattern creates an async engine once, then loops over schemas within a single run_sync call:

import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from app.models import Base
from app.db import get_all_tenant_schemas  # returns List[str]

target_metadata = Base.metadata


async def run_tenant_migrations() -> None:
    tenant_schemas = await get_all_tenant_schemas()

    connectable = create_async_engine(
        context.config.get_main_option("sqlalchemy.url"),  # type: ignore[arg-type]
        echo=False,
        pool_pre_ping=True,
    )

    for schema in tenant_schemas:
        async with connectable.connect() as connection:
            await connection.execute(
                text(f"SET search_path TO {schema}")
            )
            await connection.run_sync(
                lambda conn: _configure_and_run(conn, schema)
            )

    await connectable.dispose()


def _configure_and_run(connection, schema: str) -> None:
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        version_table=f"alembic_version_{schema}",
        include_schemas=False,
    )
    with context.begin_transaction():
        context.run_migrations()

This approach keeps one connection pool for the migration run while touching each Tenant schema sequentially. Parallel schema migration is possible but risks DDL lock contention on shared catalog tables.

Programmatic migration invocation from FastAPI startup

Some deployment patterns run alembic upgrade head programmatically during application startup rather than as a separate CLI step. This is common in Kubernetes init-container patterns and Docker Compose setups:

# app/db/migrations.py
import asyncio
from alembic.config import Config
from alembic import command


def run_migrations_sync() -> None:
    """
    Invoke alembic upgrade head programmatically.
    Must be called before the application event loop starts
    or from a dedicated thread, never from within a running loop.
    """
    alembic_cfg = Config("alembic.ini")
    command.upgrade(alembic_cfg, "head")


# FastAPI lifespan — run migrations before accepting traffic
from contextlib import asynccontextmanager
from fastapi import FastAPI


@asynccontextmanager
async def lifespan(app: FastAPI):
    # run_in_executor keeps the synchronous alembic call off the event loop
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, run_migrations_sync)
    yield
    # shutdown cleanup here


app = FastAPI(lifespan=lifespan)

Note that command.upgrade is synchronous and calls asyncio.run() internally (through env.py). Invoking it from inside a running event loop causes RuntimeError: This event loop is already running. The run_in_executor pattern sidesteps this by running the synchronous Alembic call in a thread pool where no event loop is active.

Comparison: Configuration Approaches

ApproachURL SourceEnv IsolationCI-FriendlyMulti-Tenant
Hard-coded in alembic.iniStatic fileNoneNoNo
os.environ.get(...) in env.pyEnvironment variableFullYesWith loop
Pydantic BaseSettings.env + env varsFullYesWith loop
Programmatic Config objectPython callerFullYesYes
Separate alembic.ini per envMultiple filesGoodModerateModerate

The os.environ.get(...) pattern in env.py covers the majority of production deployments. Pydantic BaseSettings adds validation and type coercion, which is valuable when the application already uses it for other configuration.

Hybrid Architectures and Migration Strategies

Decoupling migration engines from application engines

In high-availability deployments, the application engine connects to a read-replica-aware connection string or a pgBouncer transaction-mode pooler. Neither is safe for DDL. Migrations must target the primary directly. Use separate environment variables:

# env.py
MIGRATION_DATABASE_URL = os.environ.get(
    "MIGRATION_DATABASE_URL",
    os.environ.get(
        "DATABASE_URL",
        "postgresql+asyncpg://postgres:postgres@primary.db.internal:5432/appdb",
    ),
)
config.set_main_option("sqlalchemy.url", MIGRATION_DATABASE_URL)

Setting MIGRATION_DATABASE_URL to the primary host bypasses read replicas and connection poolers, both of which are incompatible with DDL execution. For deeper coverage of async engine options, see Async Engines, Dialects, and Connection Pooling and the specific pooling configuration patterns in Configuring Async Engines and Connection Pools.

Blue-green and rolling deployments

Migrations that run as a separate Kubernetes Job before the new pod version starts must be idempotent and backward-compatible. Alembic's alembic_version table guarantees that a migration script runs exactly once per revision ID. To ensure new code is compatible with both the old schema (during the deploy window) and the new schema, apply the expand-contract pattern:

  • Expand: add nullable columns, new tables, new indexes — never remove or rename existing ones.
  • Contract: in a subsequent release, remove the old column or constraint once all instances run the new code.

Autogenerate supports both steps. The key is never merging expand and contract into a single migration. Review the autogenerate output carefully before committing — see Autogenerating and Reviewing Migration Scripts for the review workflow.

Connection pooler compatibility

pgBouncer in transaction-pooling mode breaks SET search_path, advisory locks, and prepared statements. The migration engine should connect directly to PostgreSQL or use pgBouncer in session-pooling mode. Add statement_cache_size=0 to connect_args when asyncpg connects through any pooler to disable prepared statement caching:

connectable = create_async_engine(
    MIGRATION_DATABASE_URL,
    echo=False,
    pool_pre_ping=True,
    connect_args={
        "statement_cache_size": 0,  # required for pgBouncer transaction mode
        "prepared_statement_cache_size": 0,
    },
)

Production Pitfalls and Anti-Patterns

  • RuntimeError: This event loop is already running — calling asyncio.run() from inside a running FastAPI or Starlette event loop. Fix: invoke Alembic from a thread via loop.run_in_executor(None, run_migrations_sync) or run migrations as a separate process before the application starts.
  • sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called — using a synchronous engine URL (e.g., postgresql:// instead of postgresql+asyncpg://) inside an async context, or calling an async method from a synchronous context without run_sync. Fix: ensure all URLs in env.py use postgresql+asyncpg:// and that do_run_migrations is only ever called through connection.run_sync(...).
  • AttributeError: 'coroutine' object has no attribute 'execute' — passing an AsyncConnection directly to context.configure(connection=...) without going through run_sync. Fix: always pass the connection argument that run_sync supplies to the callback; never pass the AsyncConnection itself.
  • Silent autogenerate misses — model modules not imported before target_metadata = Base.metadata. Alembic sees an empty metadata and generates no operations, silently treating the schema as up-to-date. Fix: import all model modules explicitly in app/models/__init__.py and verify with alembic check that detected changes match expectations.
  • DDL conflicts from pgBouncer transaction modeasyncpg prepared statements cached from a previous session are invalid after pgBouncer rotates the underlying connection. Fix: set statement_cache_size=0 and prepared_statement_cache_size=0 in connect_args on the migration engine.
  • alembic.util.exc.CommandError: Can't locate revision identified by '...' — running alembic upgrade head against a database whose alembic_version table references a revision that no longer exists in the versions/ directory (common after branch squashing). Fix: inspect the live alembic_version table, stamp the correct revision with alembic stamp <revision>, and re-run.

Frequently Asked Questions

Why does Alembic need run_sync if asyncpg can handle async natively?

Alembic's migration runner and MigrationContext were designed for synchronous DBAPI connections. They issue cursor.execute() calls in a blocking fashion internally. The run_sync method on an AsyncConnection wraps those synchronous calls so they execute in a greenlet, which asyncpg's event loop integration can schedule without blocking. The result is that Alembic's internals do not need modification, and the async driver still performs the actual I/O asynchronously at the socket level.

Can I reuse my application's AsyncEngine instance in env.py?

Not safely. The application engine is created in one event loop (the ASGI server's loop) and holds connections bound to that loop. asyncio.run() in env.py creates a new, separate event loop. Connections from the application engine are invalid in the migration loop and will raise RuntimeError or asyncpg.exceptions.InterfaceError. Always create a new create_async_engine(...) instance inside run_async_migrations() and dispose it before returning.

How do I run alembic upgrade head in a Docker Compose setup without a race condition against the database starting?

Use depends_on with a health check in docker-compose.yml to ensure the PostgreSQL container is accepting connections before the migration service starts. In the migration entrypoint script, add a pg_isready loop as a belt-and-suspenders check, then invoke alembic upgrade head. The pool_pre_ping=True on the migration engine provides a final connection validation before DDL begins.

Does the async env.py pattern work with SQLite for testing?

Yes, with the aiosqlite driver. Replace postgresql+asyncpg:// with sqlite+aiosqlite:///./test.db. The run_sync pattern is identical. For in-memory SQLite databases in test suites, pass the same AsyncEngine instance to both the application fixtures and a custom migration runner so the in-memory database is not discarded between setup steps.

Can alembic autogenerate detect changes to column types, server defaults, and constraints?

Yes, when compare_type=True and compare_server_default=True are set in context.configure(). Without these flags, Alembic compares only table and column existence, not type changes. Be aware that compare_server_default can generate false positives when the database normalizes default expressions differently from how SQLAlchemy emits them. Review autogenerated scripts before applying them in production, as covered in Autogenerating and Reviewing Migration Scripts.