Configuring Alembic with Async SQLAlchemy Engines
Configuring Alembic to drive schema migrations through an async SQLAlchemy engine requires a specific wiring pattern that bridges Alembic's inherently synchronous migration runner with the asyncio event loop. This guide covers the complete async env.py structure, the run_sync bridging technique, environment-based URL configuration, and both online and offline migration modes — all within the broader Alembic Async Migrations and Schema Evolution workflow.
Concept and Execution Model
Alembic was designed around synchronous DBAPI connections. Its internal migration engine — the MigrationContext — expects a plain synchronous Connection object that it can call DDL through in a blocking fashion. SQLAlchemy 2.0's async engine does not expose such a connection directly; it wraps everything in an event-loop-aware async context manager.
The reconciliation is a two-step bridge:
- An
asyncio.run()call starts a dedicated event loop for the migration process. - Inside that coroutine, you acquire an
AsyncConnectionfrom acreate_async_engine, then invokeconnection.run_sync(...)to hand Alembic a true synchronousConnectionobject that it can use without any modifications to Alembic's internals.
This approach is officially supported and keeps your migration tooling free of monkey-patching or third-party wrappers. Alembic itself never sees the async layer; it operates against a synchronous facade that run_sync provides. The result is full compatibility with every Alembic feature — autogenerate, branching, batch operations, and custom migration templates — while your application code continues to use create_async_engine and AsyncSession at runtime.
Understanding this execution model matters before wiring env.py. The async engine you create inside env.py is a short-lived migration-time engine, entirely separate from the engine your application uses at request time. The two can share the same URL; they must not share the same instance, since connection pool state is not safe to transfer across process or event-loop boundaries.
Async Engine Setup and env.py Structure
Installing dependencies
Alembic, asyncpg, and SQLAlchemy must be present before any configuration takes place:
# requirements.txt or pyproject.toml dependencies
# alembic>=1.13.0
# sqlalchemy[asyncio]>=2.0.0
# asyncpg>=0.29.0
Initialize a fresh Alembic environment with alembic init alembic. This creates alembic.ini and alembic/env.py. The generated env.py is synchronous; you will rewrite it entirely.
The application metadata contract
Alembic's autogenerate feature compares the current database schema against your DeclarativeBase metadata. The binding point is target_metadata in env.py. Your application's model file must expose its Base.metadata object, and env.py imports it directly:
# app/models/base.py
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
# app/models/user.py
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from .base import Base
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False)
full_name: Mapped[str] = mapped_column(String(255), nullable=False)
created_at: Mapped[DateTime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# app/models/order.py
from decimal import Decimal
from sqlalchemy import Numeric, ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Base
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id"), nullable=False)
status: Mapped[str] = mapped_column(String(32), nullable=False, default="pending")
total: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
Import every model module before env.py references Base.metadata, otherwise Alembic will not detect tables defined in unimported modules. A common pattern is a single app/models/__init__.py that imports each model:
# app/models/__init__.py
from .base import Base
from .user import User
from .order import Order
from .product import Product
from .invoice import Invoice
__all__ = ["Base", "User", "Order", "Product", "Invoice"]
Reading the database URL from the environment
Hard-coding the database URL in alembic.ini works for single-environment setups but breaks in CI pipelines and multi-tenant deployments. The recommended pattern reads from environment variables and overrides the alembic.ini value at runtime inside env.py:
# alembic/env.py — URL configuration section
import os
from alembic import context
# alembic.ini still has a placeholder; we override it here
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/appdb",
)
config = context.config
config.set_main_option("sqlalchemy.url", DATABASE_URL)
For applications that centralize settings through Pydantic or python-decouple, call the settings object directly:
# Pydantic BaseSettings alternative
from app.config import settings # exposes settings.database_url
config.set_main_option("sqlalchemy.url", str(settings.database_url))
This keeps the migration command clean: DATABASE_URL=postgresql+asyncpg://... alembic upgrade head — no alembic.ini edits needed per environment.
Complete Async env.py Implementation
Offline mode (SQL generation without a live connection)
Offline mode generates SQL scripts for manual review or deployment through a change-management pipeline. It does not require a live database connection and remains fully synchronous:
def run_migrations_offline() -> None:
"""
Emit migration SQL to stdout without a live database connection.
Useful for generating reviewed scripts for production deployments.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
Online mode (async bridge pattern)
Online mode executes migrations against a live database. The async bridge is the critical section:
import asyncio
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine, AsyncConnection
from alembic import context
# Import your app's metadata so autogenerate can compare schemas
from app.models import Base
target_metadata = Base.metadata
config = context.config
fileConfig(config.config_file_name) # type: ignore[arg-type]
def do_run_migrations(connection: AsyncConnection) -> None:
"""
Called inside run_sync — receives a plain synchronous Connection.
This is the synchronous half that Alembic's MigrationContext expects.
"""
context.configure(
connection=connection, # type: ignore[arg-type]
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_schemas=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""
Create a short-lived async engine, acquire a connection,
then delegate to the synchronous migration runner via run_sync.
"""
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"), # type: ignore[arg-type]
echo=False,
pool_pre_ping=True,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Entry point called by Alembic when a live connection is available."""
asyncio.run(run_async_migrations())
Wiring offline and online modes together
The bottom of env.py chooses the mode based on Alembic's context flag:
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
The full env.py assembled:
# alembic/env.py
import asyncio
import os
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
# --- Application imports ---
from app.models import Base # pulls in all model modules via __init__.py
# --- Alembic config ---
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Override URL from environment
DATABASE_URL = os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@localhost:5432/appdb",
)
config.set_main_option("sqlalchemy.url", DATABASE_URL)
target_metadata = Base.metadata
# --- Offline mode ---
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
# --- Online mode ---
def do_run_migrations(connection) -> None: # type: ignore[type-arg]
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_schemas=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
connectable = create_async_engine(
config.get_main_option("sqlalchemy.url"), # type: ignore[arg-type]
echo=False,
pool_pre_ping=True,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
# --- Dispatch ---
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
For the detailed step-by-step walkthrough of each env.py configuration option and the full alembic.ini setup, see Setting Up Alembic env.py for asyncpg.
State Management and Session Boundaries
Why migrations use a separate engine
The engine in env.py must not be the same object your application uses for request handling. The application engine is typically created at startup and persists for the process lifetime, maintaining an open connection pool. The migration engine is ephemeral: it is created inside run_async_migrations(), used for the duration of the alembic upgrade head command, and disposed immediately after via await connectable.dispose().
Mixing the two creates problems:
- The application pool may have open transactions that conflict with DDL locks.
- Pool connections are bound to the event loop that created them. The migration script's
asyncio.run()creates a new event loop; connections from the application's engine are invalid in it. pool_pre_ping=Trueon the migration engine catches stale connections that may have accumulated if the database was restarted between CI steps.
Transaction management in migrations
Alembic wraps each migration script in a transaction by default (with context.begin_transaction()). PostgreSQL supports transactional DDL, meaning CREATE TABLE, ALTER TABLE, and CREATE INDEX can be rolled back if the migration fails mid-way. This is a significant safety guarantee. Do not disable transaction wrapping unless you are using a DDL statement that PostgreSQL does not allow inside a transaction, such as CREATE INDEX CONCURRENTLY. In that case, mark the migration as non-transactional:
# versions/20240618_add_invoice_index.py
from alembic import op
# directive to opt out of the wrapping transaction for this migration
def upgrade() -> None:
op.execute("COMMIT") # close the transaction alembic opened
op.create_index(
"ix_invoices_tenant_id",
"invoices",
["tenant_id"],
postgresql_concurrently=True,
)
For zero-downtime DDL strategies like concurrent index creation, see Zero-Downtime Schema Migration Strategies.
Sharing metadata across application and migration layers
target_metadata = Base.metadata creates a reference, not a copy. Any change to Base.metadata after this line — such as dynamically registering new tables — is reflected in what Alembic compares against the live schema. Ensure all model imports complete before env.py references target_metadata. A delayed import or a lazy model registration (common in plugin architectures) will cause autogenerate to miss tables silently.
The safest pattern for large applications with many model modules is to enumerate imports explicitly in app/models/__init__.py rather than relying on autodiscovery. This makes the import surface visible and auditable.
Advanced Configuration Patterns
Multi-schema deployments
Applications that split models across PostgreSQL schemas (e.g., a public schema for shared tables and a tenants schema for per-tenant tables) need include_schemas=True in context.configure() and an include_object filter:
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations
def include_object(object, name, type_, reflected, compare_to):
# Only autogenerate for schemas we own
if type_ == "table":
return object.schema in (None, "public", "tenants")
return True
def do_run_migrations(connection) -> None: # type: ignore[type-arg]
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_schemas=True,
include_object=include_object,
version_table_schema="public", # keep alembic_version in public schema
)
with context.begin_transaction():
context.run_migrations()
Tenant-aware migrations
SaaS platforms often need to apply the same migration to multiple tenant schemas. The pattern creates an async engine once, then loops over schemas within a single run_sync call:
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import context
from app.models import Base
from app.db import get_all_tenant_schemas # returns List[str]
target_metadata = Base.metadata
async def run_tenant_migrations() -> None:
tenant_schemas = await get_all_tenant_schemas()
connectable = create_async_engine(
context.config.get_main_option("sqlalchemy.url"), # type: ignore[arg-type]
echo=False,
pool_pre_ping=True,
)
for schema in tenant_schemas:
async with connectable.connect() as connection:
await connection.execute(
text(f"SET search_path TO {schema}")
)
await connection.run_sync(
lambda conn: _configure_and_run(conn, schema)
)
await connectable.dispose()
def _configure_and_run(connection, schema: str) -> None:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
version_table=f"alembic_version_{schema}",
include_schemas=False,
)
with context.begin_transaction():
context.run_migrations()
This approach keeps one connection pool for the migration run while touching each Tenant schema sequentially. Parallel schema migration is possible but risks DDL lock contention on shared catalog tables.
Programmatic migration invocation from FastAPI startup
Some deployment patterns run alembic upgrade head programmatically during application startup rather than as a separate CLI step. This is common in Kubernetes init-container patterns and Docker Compose setups:
# app/db/migrations.py
import asyncio
from alembic.config import Config
from alembic import command
def run_migrations_sync() -> None:
"""
Invoke alembic upgrade head programmatically.
Must be called before the application event loop starts
or from a dedicated thread, never from within a running loop.
"""
alembic_cfg = Config("alembic.ini")
command.upgrade(alembic_cfg, "head")
# FastAPI lifespan — run migrations before accepting traffic
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# run_in_executor keeps the synchronous alembic call off the event loop
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, run_migrations_sync)
yield
# shutdown cleanup here
app = FastAPI(lifespan=lifespan)
Note that command.upgrade is synchronous and calls asyncio.run() internally (through env.py). Invoking it from inside a running event loop causes RuntimeError: This event loop is already running. The run_in_executor pattern sidesteps this by running the synchronous Alembic call in a thread pool where no event loop is active.
Comparison: Configuration Approaches
| Approach | URL Source | Env Isolation | CI-Friendly | Multi-Tenant |
|---|---|---|---|---|
Hard-coded in alembic.ini | Static file | None | No | No |
os.environ.get(...) in env.py | Environment variable | Full | Yes | With loop |
Pydantic BaseSettings | .env + env vars | Full | Yes | With loop |
Programmatic Config object | Python caller | Full | Yes | Yes |
Separate alembic.ini per env | Multiple files | Good | Moderate | Moderate |
The os.environ.get(...) pattern in env.py covers the majority of production deployments. Pydantic BaseSettings adds validation and type coercion, which is valuable when the application already uses it for other configuration.
Hybrid Architectures and Migration Strategies
Decoupling migration engines from application engines
In high-availability deployments, the application engine connects to a read-replica-aware connection string or a pgBouncer transaction-mode pooler. Neither is safe for DDL. Migrations must target the primary directly. Use separate environment variables:
# env.py
MIGRATION_DATABASE_URL = os.environ.get(
"MIGRATION_DATABASE_URL",
os.environ.get(
"DATABASE_URL",
"postgresql+asyncpg://postgres:postgres@primary.db.internal:5432/appdb",
),
)
config.set_main_option("sqlalchemy.url", MIGRATION_DATABASE_URL)
Setting MIGRATION_DATABASE_URL to the primary host bypasses read replicas and connection poolers, both of which are incompatible with DDL execution. For deeper coverage of async engine options, see Async Engines, Dialects, and Connection Pooling and the specific pooling configuration patterns in Configuring Async Engines and Connection Pools.
Blue-green and rolling deployments
Migrations that run as a separate Kubernetes Job before the new pod version starts must be idempotent and backward-compatible. Alembic's alembic_version table guarantees that a migration script runs exactly once per revision ID. To ensure new code is compatible with both the old schema (during the deploy window) and the new schema, apply the expand-contract pattern:
- Expand: add nullable columns, new tables, new indexes — never remove or rename existing ones.
- Contract: in a subsequent release, remove the old column or constraint once all instances run the new code.
Autogenerate supports both steps. The key is never merging expand and contract into a single migration. Review the autogenerate output carefully before committing — see Autogenerating and Reviewing Migration Scripts for the review workflow.
Connection pooler compatibility
pgBouncer in transaction-pooling mode breaks SET search_path, advisory locks, and prepared statements. The migration engine should connect directly to PostgreSQL or use pgBouncer in session-pooling mode. Add statement_cache_size=0 to connect_args when asyncpg connects through any pooler to disable prepared statement caching:
connectable = create_async_engine(
MIGRATION_DATABASE_URL,
echo=False,
pool_pre_ping=True,
connect_args={
"statement_cache_size": 0, # required for pgBouncer transaction mode
"prepared_statement_cache_size": 0,
},
)
Production Pitfalls and Anti-Patterns
RuntimeError: This event loop is already running— callingasyncio.run()from inside a running FastAPI or Starlette event loop. Fix: invoke Alembic from a thread vialoop.run_in_executor(None, run_migrations_sync)or run migrations as a separate process before the application starts.sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called— using a synchronous engine URL (e.g.,postgresql://instead ofpostgresql+asyncpg://) inside an async context, or calling an async method from a synchronous context withoutrun_sync. Fix: ensure all URLs inenv.pyusepostgresql+asyncpg://and thatdo_run_migrationsis only ever called throughconnection.run_sync(...).AttributeError: 'coroutine' object has no attribute 'execute'— passing anAsyncConnectiondirectly tocontext.configure(connection=...)without going throughrun_sync. Fix: always pass theconnectionargument thatrun_syncsupplies to the callback; never pass theAsyncConnectionitself.- Silent autogenerate misses — model modules not imported before
target_metadata = Base.metadata. Alembic sees an empty metadata and generates no operations, silently treating the schema as up-to-date. Fix: import all model modules explicitly inapp/models/__init__.pyand verify withalembic checkthat detected changes match expectations. - DDL conflicts from pgBouncer transaction mode —
asyncpgprepared statements cached from a previous session are invalid after pgBouncer rotates the underlying connection. Fix: setstatement_cache_size=0andprepared_statement_cache_size=0inconnect_argson the migration engine. alembic.util.exc.CommandError: Can't locate revision identified by '...'— runningalembic upgrade headagainst a database whosealembic_versiontable references a revision that no longer exists in theversions/directory (common after branch squashing). Fix: inspect the livealembic_versiontable, stamp the correct revision withalembic stamp <revision>, and re-run.
Frequently Asked Questions
Why does Alembic need run_sync if asyncpg can handle async natively?
Alembic's migration runner and MigrationContext were designed for synchronous DBAPI connections. They issue cursor.execute() calls in a blocking fashion internally. The run_sync method on an AsyncConnection wraps those synchronous calls so they execute in a greenlet, which asyncpg's event loop integration can schedule without blocking. The result is that Alembic's internals do not need modification, and the async driver still performs the actual I/O asynchronously at the socket level.
Can I reuse my application's AsyncEngine instance in env.py?
Not safely. The application engine is created in one event loop (the ASGI server's loop) and holds connections bound to that loop. asyncio.run() in env.py creates a new, separate event loop. Connections from the application engine are invalid in the migration loop and will raise RuntimeError or asyncpg.exceptions.InterfaceError. Always create a new create_async_engine(...) instance inside run_async_migrations() and dispose it before returning.
How do I run alembic upgrade head in a Docker Compose setup without a race condition against the database starting?
Use depends_on with a health check in docker-compose.yml to ensure the PostgreSQL container is accepting connections before the migration service starts. In the migration entrypoint script, add a pg_isready loop as a belt-and-suspenders check, then invoke alembic upgrade head. The pool_pre_ping=True on the migration engine provides a final connection validation before DDL begins.
Does the async env.py pattern work with SQLite for testing?
Yes, with the aiosqlite driver. Replace postgresql+asyncpg:// with sqlite+aiosqlite:///./test.db. The run_sync pattern is identical. For in-memory SQLite databases in test suites, pass the same AsyncEngine instance to both the application fixtures and a custom migration runner so the in-memory database is not discarded between setup steps.
Can alembic autogenerate detect changes to column types, server defaults, and constraints?
Yes, when compare_type=True and compare_server_default=True are set in context.configure(). Without these flags, Alembic compares only table and column existence, not type changes. Be aware that compare_server_default can generate false positives when the database normalizes default expressions differently from how SQLAlchemy emits them. Review autogenerated scripts before applying them in production, as covered in Autogenerating and Reviewing Migration Scripts.
Related
- Alembic Async Migrations and Schema Evolution — parent pillar covering the full migration lifecycle
- Setting Up Alembic env.py for asyncpg — step-by-step walkthrough of each
env.pyconfiguration option - Autogenerating and Reviewing Migration Scripts — generating and validating migration scripts from model metadata
- Zero-Downtime Schema Migration Strategies — expand-contract and concurrent index patterns for live deployments
- Async Engines, Dialects, and Connection Pooling — engine and pool configuration for runtime async database access