Setting Up Alembic env.py for asyncpg
Setting up Alembic's env.py for asyncpg requires replacing the default synchronous engine with an AsyncEngine and bridging Alembic's synchronous migration context to the async runtime via conn.run_sync() — a complete walkthrough is part of the broader Configuring Alembic with Async SQLAlchemy Engines guide.
Quick Answer
The default env.py generated by alembic init is built for synchronous drivers. Below is a direct comparison of the legacy pattern and the correct async version for asyncpg.
Legacy sync env.py (do not use with asyncpg)
# env.py — synchronous, not compatible with asyncpg
from sqlalchemy import engine_from_config, pool
from alembic import context
config = context.config
def run_migrations_online() -> None:
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
Modern async env.py for asyncpg
# env.py — async-compatible with asyncpg
import asyncio
import os
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import pool
from alembic import context
# Import your declarative Base so autogenerate can inspect metadata
from app.models import Base # User, Order, Product, Invoice, Tenant models live here
config = context.config
target_metadata = Base.metadata
if config.config_file_name is not None:
fileConfig(config.config_file_name)
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost:5432/appdb")
def run_migrations_offline() -> None:
"""Generate SQL script without a live database connection."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection) -> None:
"""Synchronous callback executed inside run_sync()."""
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Create an async engine and delegate to the sync callback via run_sync."""
connectable = create_async_engine(DATABASE_URL, poolclass=pool.NullPool)
async with connectable.connect() as conn:
await conn.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Entry point for online migration mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Execution Context & Async Workflow Integration
Alembic's migration context was designed around synchronous DBAPI connections. Every internal operation — schema comparison, DDL emission, transaction management — calls blocking methods on a raw connection object. asyncpg, on the other hand, exposes only coroutine-based methods and refuses to be driven from a synchronous call stack without an event loop.
The bridge is AsyncConnection.run_sync(). When you call await conn.run_sync(do_run_migrations), SQLAlchemy creates a temporary synchronous shim that passes a wrapped connection object to your callback. Inside do_run_migrations, all operations appear synchronous to Alembic, but the underlying I/O is dispatched through the running event loop. The key rule is that run_sync must be called inside an active async with connectable.connect() block — the synchronous connection shim is only valid for the lifetime of that async context manager.
The asyncio.run() call in run_migrations_online() is the correct entry point. It creates a fresh event loop, runs the coroutine to completion, and tears the loop down. This matches how the async engine and connection pool lifecycle is designed to work: each migration run is a discrete, self-contained process, not a long-lived application server.
Using pool.NullPool is intentional. Migrations run as a CLI process, not a persistent service. Connection pooling would leave dangling connections when the process exits, potentially blocking other operations on short-lived database server configurations.
Reading the DATABASE_URL from the environment with os.getenv() rather than hardcoding it in alembic.ini is a production requirement. Secrets stored in INI files end up in version control; environment variables are injected by your deployment platform. The online migration path uses the env var exclusively. Note that the offline path still reads config.get_main_option("sqlalchemy.url") from alembic.ini — the implications of that asymmetry are covered in the error table below.
Resolving Warnings, Errors & Common Mistakes
| Exact warning / error string | Root cause | Production fix |
|---|---|---|
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called | Async SQLAlchemy code (like a lazy-loaded relationship) was invoked from inside a synchronous call stack without a greenlet context. In migrations this usually means do_run_migrations is calling async model methods. | Keep do_run_migrations purely synchronous. Never call await inside it. All async work must happen before or after conn.run_sync(). |
RuntimeError: got Future <Future ...> attached to a different loop | An AsyncEngine created in one event loop is being reused in another — common when a long-lived engine from the application layer is imported and passed to asyncio.run(), which spins up a new loop. | Create a fresh AsyncEngine inside run_async_migrations() rather than importing the application's shared engine. The NullPool ensures disposal is immediate. |
sqlalchemy.exc.OperationalError: (sqlalchemy.dialects.postgresql.asyncpg.Error) Can't operate on a closed transaction | do_run_migrations is being called after the async with connectable.connect() block has exited, meaning the connection was already returned or closed before run_sync completed. | Ensure await conn.run_sync(do_run_migrations) is called within the async with connectable.connect() as conn: block, not after it. Never cache the conn object and use it outside the context manager. |
alembic.util.exc.CommandError: Can't proceed with --autogenerate option; environment script /path/env.py does not provide a MetaData object | target_metadata is None because the import of Base.metadata failed silently or was never assigned. | Verify your model module path. Add a print statement temporarily: print(Base.metadata.tables.keys()) just after the import to confirm the models are loaded. All tables — users, orders, products, invoices, tenants — should appear. |
FAILED: Can't locate revision identified by '...' in offline mode despite valid revision IDs | Offline mode reads sqlalchemy.url from alembic.ini. If that key is absent or left as the placeholder, Alembic silently switches dialect handling, which can cause revision lookup failures on some backends. | Set sqlalchemy.url = postgresql+asyncpg:// (a dummy value is fine for offline SQL generation) in alembic.ini even when online mode uses env vars. Offline mode only needs the URL to determine dialect; it never opens a connection. |
DeprecationWarning: The current Greenlet was not spawned from a greenlet.run() (SQLAlchemy < 2.0) | Using SQLAlchemy 1.4 async APIs with Alembic 1.7 or earlier produces this warning because the greenlet interop layer changed. | Upgrade to SQLAlchemy 2.0+ and Alembic 1.10+. The async interop was stabilized in these releases. |
asyncpg.exceptions.TooManyConnectionsError during alembic upgrade head in CI | NullPool is not set, so the engine allocates its default pool. In CI environments with aggressive parallelism, multiple migration steps can exhaust the server's connection limit. | Always pass poolclass=pool.NullPool to create_async_engine inside run_async_migrations. Migrations are sequential by nature; pooling provides no benefit and adds risk. |
Advanced Async env.py Optimization
Sharing configuration between the application engine and migrations
A common pattern is to centralize engine configuration in a single module, then import it in both the application and env.py. The naive approach — importing the application's AsyncEngine instance directly — causes the "different loop" error described above because the application engine is often created at module import time, before any event loop exists.
The correct approach is to share the configuration rather than the engine object. Extract a get_engine_kwargs() factory:
# app/database.py
import os
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy import pool
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost:5432/appdb")
def get_engine_kwargs() -> dict:
"""Return engine configuration shared by the app and Alembic migrations."""
return {
"url": DATABASE_URL,
"pool_size": 10,
"max_overflow": 5,
"pool_pre_ping": True,
"connect_args": {
"server_settings": {"application_name": "myapp"},
"statement_cache_size": 0, # required for pgbouncer compatibility
},
}
# Application uses a long-lived pooled engine
engine = create_async_engine(**get_engine_kwargs())
SessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Then in env.py, build a migration-specific engine from the same configuration dict:
# env.py — advanced version sharing app configuration
import asyncio
import os
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy import pool
from alembic import context
from app.models import Base
from app.database import get_engine_kwargs # shared config, not the engine object
config = context.config
target_metadata = Base.metadata
if config.config_file_name is not None:
fileConfig(config.config_file_name)
def do_run_migrations(connection) -> None:
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
# Override pool settings for migration context
kwargs = get_engine_kwargs()
kwargs["poolclass"] = pool.NullPool
kwargs.pop("pool_size", None)
kwargs.pop("max_overflow", None)
connectable = create_async_engine(**kwargs)
async with connectable.connect() as conn:
await conn.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_offline() -> None:
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
This pattern ensures that SSL certificates, connect_args, and custom server settings (critical for PgBouncer environments serving User, Order, and Invoice tables with statement caching disabled) remain consistent between the running application and the migration process, without ever sharing an event-loop-bound engine object across boundaries.
When autogenerating migration scripts for schemas that span multiple models — for example when adding a tenant_id column to the products table — this configuration consistency ensures Alembic compares the live schema against the same dialect options the application uses. Discrepancies between connection settings at migration time versus runtime are a common source of phantom schema diffs. For a deeper look at how autogenerate detects and filters schema changes, including how to suppress false positives from server defaults on the invoices table, see the dedicated guide.
Frequently Asked Questions
Why does Alembic need run_sync if SQLAlchemy 2.0 supports async natively?
SQLAlchemy 2.0's async layer is an opt-in surface built on top of the core synchronous engine. Alembic's migration context (MigrationContext) was written against the synchronous Connection interface and has not been rewritten to use coroutines internally. The run_sync bridge hands Alembic a synchronous-looking connection object backed by the async driver, allowing Alembic to call .execute(), .fetchall(), and DDL methods without knowing it is inside an event loop. This is the officially supported integration pattern and is not expected to change while Alembic maintains backward compatibility.
Can I reuse the application's AsyncSession inside do_run_migrations?
No. do_run_migrations receives a raw Connection object, not a Session. Alembic bypasses the ORM session layer entirely — it works at the Core connection level to ensure DDL statements execute in the correct transaction context. Attempting to create or use an AsyncSession inside do_run_migrations will fail with a MissingGreenlet error because AsyncSession expects to be driven by coroutines, which cannot run inside the synchronous run_sync callback.
What happens if DATABASE_URL is not set in the environment?
os.getenv("DATABASE_URL", "postgresql+asyncpg://user:pass@localhost:5432/appdb") falls back to a hardcoded default. In production, the fallback should either be a clearly invalid placeholder (which causes a fast, obvious failure) or absent entirely — use os.environ["DATABASE_URL"] to raise KeyError immediately rather than silently connecting to a wrong database. The distinction matters when alembic upgrade head runs as part of a deployment script: a missing env var should halt deployment, not migrate the wrong database.
Does this setup work with zero-downtime migration strategies like adding nullable columns first?
Yes. The env.py configuration only controls how Alembic connects to the database and executes the migration script. It does not constrain what the migration script itself does. Zero-downtime strategies — expanding first with a nullable payment_method column on orders, then backfilling, then constraining — are implemented inside the individual migration files under alembic/versions/. The async env.py is transparent to those patterns.
Related
- Configuring Alembic with Async SQLAlchemy Engines — parent guide covering the full async Alembic setup from
alembic initthrough first migration - Autogenerating and Reviewing Migration Scripts — how to run
--autogenerateagainst an async engine and review the output for correctness - Zero-Downtime Schema Migration Strategies — sequencing migrations for live production databases without table locks
- Configuring Async Engines and Connection Pools — pool sizing,
NullPoolvsAsyncAdaptedQueuePool, and connection lifecycle management