Configuring Async Engines and Connection Pools
Configuring create_async_engine and its connection pool correctly is the single most impactful decision you will make when building a production async SQLAlchemy application. Every keyword argument—from pool_size to connect_args—has a direct, measurable effect on throughput, tail latency, and resilience. This page covers the full parameter surface, explains the internal mechanics of AsyncAdaptedQueuePool, shows when NullPool is the right choice, and walks through async_sessionmaker configuration that avoids the common expire_on_commit footgun. For the broader architecture context, start with Async Engines, Dialects, and Connection Pooling before returning here for the configuration depth.
Concept & Execution Model
How create_async_engine wraps a synchronous engine
create_async_engine does not implement its own I/O subsystem. Instead, it constructs a standard synchronous Engine and wraps it in an AsyncEngine facade. When you call await conn.execute(stmt), SQLAlchemy dispatches the work through the async driver's event loop integration layer—either asyncpg's native asyncio transport or psycopg (v3)'s async cursor interface.
The pool remains AsyncAdaptedQueuePool by default, which is the synchronous QueuePool surfaced through a thin async adapter. Connections live in a asyncio.Queue; checkout and checkin are awaited rather than blocking. This design means all the familiar QueuePool semantics—pool_size, max_overflow, pool_timeout—apply directly.
The event loop contract
Every AsyncEngine is bound to the event loop running when it is first used. Engines must be created and used inside a single running loop. Creating an engine at module import time (before any loop exists) and then using it inside asyncio.run() works fine in practice because create_async_engine is lazy—it validates the loop only when the first connection is checked out. What fails is sharing an engine across two separate asyncio.run() calls, which creates two separate loops. Always create your engine inside application startup code that runs within your main event loop.
AsyncAdaptedQueuePool internals
The pool maintains two structures: a asyncio.Queue of idle connections (up to pool_size) and a counter of overflow connections currently in use. When a caller requests a connection:
- If an idle connection exists in the queue, it is returned immediately (after
pool_pre_pingif enabled). - If the queue is empty and total connections (idle + in-use) <
pool_size + max_overflow, a new connection is opened. - If at capacity, the caller waits up to
pool_timeoutseconds. If no connection frees up in time,sqlalchemy.exc.TimeoutErroris raised. - When a caller returns a connection, overflow connections are closed and discarded; connections below
pool_sizeare returned to the idle queue.
Understanding this flow is what makes pool sizing formulas actionable rather than arbitrary.
Query Construction & Async Execution Patterns
Full engine setup with all key kwargs
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://appuser:secret@db.internal:5432/appdb"
engine = create_async_engine(
DATABASE_URL,
# Pool sizing
pool_size=20, # Persistent connections kept alive
max_overflow=10, # Temporary burst connections (discarded after use)
pool_timeout=30.0, # Seconds to wait for a free connection before raising
# Connection health
pool_recycle=1800, # Close and replace connections older than 30 minutes
pool_pre_ping=True, # Issue SELECT 1 before checkout to detect stale connections
# Debugging
echo=False, # Set True in development to log all SQL to stdout
echo_pool=False, # Set "debug" to trace pool checkout/checkin events
# Driver-level options (asyncpg-specific)
connect_args={
"command_timeout": 10, # Per-statement timeout in seconds
"statement_cache_size": 100, # LRU cache for prepared statements
"max_cached_statement_lifetime": 1800, # Seconds before cached plans expire
"server_settings": {
"application_name": "my-service",
"jit": "off", # Disable JIT for short OLTP queries
},
},
)
What each kwarg controls
| Parameter | Type | Default | Typical production value |
|---|---|---|---|
pool_size | int | 5 | 10–30 per process |
max_overflow | int | 10 | 5–20 (spike buffer) |
pool_timeout | float | 30 | 20–30 s |
pool_recycle | int | -1 (disabled) | 1200–3600 s |
pool_pre_ping | bool | False | True in cloud |
echo | bool/str | False | False in prod |
echo_pool | bool/str | False | "debug" when diagnosing leaks |
connect_args passthrough for asyncpg
connect_args is forwarded verbatim to the asyncpg connect() call. None of these keys are interpreted by SQLAlchemy itself.
command_timeout — Maximum time in seconds that asyncpg will wait for the server to return a result. Raises asyncpg.exceptions.QueryCanceledError on expiry. Set this to your p99 acceptable query latency plus headroom (10–30 s for OLTP, 300+ s for analytics).
statement_cache_size — asyncpg uses server-side prepared statements for all parameterized queries by default. Each unique SQL string is parsed and planned once, then reused. The LRU cache holds up to statement_cache_size entries per connection. Set to 0 to disable prepared statements entirely—required when using PgBouncer in transaction mode (see Tuning Connection Pools for Cloud Databases for PgBouncer details).
max_cached_statement_lifetime — Evicts prepared statements that were last used more than this many seconds ago. Helps when schemas evolve (ALTER TABLE) or query patterns change over a deployment's lifetime.
ssl — Pass True for basic TLS, or a ssl.SSLContext for full certificate verification. Mandatory for any database not on localhost or a private VPC.
server_settings — Key-value pairs sent as SET commands during connection startup (using the PostgreSQL startup message). Use application_name for pg_stat_activity visibility, jit=off for predictable short-query latency, and idle_in_transaction_session_timeout to catch runaway transactions early.
import ssl
from sqlalchemy.ext.asyncio import create_async_engine
ssl_ctx = ssl.create_default_context(cafile="/etc/ssl/certs/rds-ca.pem")
ssl_ctx.check_hostname = True
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
engine = create_async_engine(
"postgresql+asyncpg://appuser:secret@db.us-east-1.rds.amazonaws.com:5432/orders",
pool_size=20,
max_overflow=10,
pool_recycle=1200,
pool_pre_ping=True,
connect_args={
"ssl": ssl_ctx,
"command_timeout": 15,
"statement_cache_size": 0, # Required for PgBouncer transaction pooling
"server_settings": {
"application_name": "orders-service",
"idle_in_transaction_session_timeout": "10000", # 10 s, in ms
},
},
)
State Management & Session Boundaries
async_sessionmaker configuration
async_sessionmaker is the async equivalent of the synchronous sessionmaker. It creates a reusable factory bound to your engine, so you never construct AsyncSession(engine) inline throughout your codebase.
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Critical for async: see explanation below
autoflush=True, # Flush pending changes before SELECT by default
autobegin=True, # Automatically begin a transaction on first operation
)
expire_on_commit=False is the most important setting for async applications. By default SQLAlchemy expires all ORM-mapped attributes after session.commit(). In a synchronous application, accessing an expired attribute transparently triggers a lazy load. In an async context, any attribute access after commit happens outside the session's connection context—SQLAlchemy raises MissingGreenlet (or DetachedInstanceError) because there is no running connection to use for the lazy load. Setting expire_on_commit=False keeps the last-committed values available in memory, which is always the right choice for async.
autoflush=True means that before any SELECT, SQLAlchemy will flush pending INSERT/UPDATE/DELETE operations so your queries see your own writes. Disable this (autoflush=False) only if you explicitly flush at precise points for performance reasons.
Session as a unit of work
import asyncio
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
DATABASE_URL = "postgresql+asyncpg://appuser:secret@localhost:5432/appdb"
engine = create_async_engine(DATABASE_URL, pool_pre_ping=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
customer_id: Mapped[int]
total_cents: Mapped[int]
async def get_orders_for_customer(customer_id: int) -> list[Order]:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Order).where(Order.customer_id == customer_id)
)
return result.scalars().all()
async def create_order(customer_id: int, total_cents: int) -> Order:
async with AsyncSessionLocal() as session:
async with session.begin():
order = Order(customer_id=customer_id, total_cents=total_cents)
session.add(order)
# After begin() context exits, commit has happened.
# expire_on_commit=False means order.id is still readable here.
return order
Connection borrowing lifecycle
The session does not hold a database connection for its entire lifetime. A connection is checked out from the pool only when the session needs to communicate with the database—on the first execute, flush, or commit—and returned immediately after the transaction ends. This lazy connection checkout is essential for async scalability: a hundred open sessions can share a pool of twenty connections if their transactions are short.
Advanced Pool Configuration Patterns
Sizing formulas with worked examples
The goal is to keep the pool large enough to absorb your burst concurrency without exhausting the database's max_connections. A useful formula:
pool_size = floor(db_max_connections × 0.80 / num_app_processes)
max_overflow = floor(db_max_connections × 0.10 / num_app_processes)
Reserve 10 % for DBA tools, replication slots, and monitoring agents. Never use the full 100 %.
Worked example — 16-core host, PostgreSQL configured at max_connections = 200, 4 app processes:
pool_size = floor(200 × 0.80 / 4) = floor(40) = 40
max_overflow = floor(200 × 0.10 / 4) = floor(5) = 5
Each process maintains up to 40 persistent connections, with 5 burst slots. Total worst-case connections: 4 × (40 + 5) = 180, leaving 20 for admin.
For the detailed asyncpg-specific reasoning and benchmarks, see Setting Up asyncpg Connection Pool Size for High Concurrency.
If you are starting from zero and need a step-by-step engine creation walkthrough, Setting Up an Async Engine from Scratch covers environment setup through first query execution.
AsyncAdaptedQueuePool vs NullPool
NullPool creates a new connection for every checkout and immediately closes it on return. The pool holds nothing. Use it when:
- Running inside a serverless function (AWS Lambda, Cloud Run) where the process may be frozen and connections go stale between invocations.
- Using an external connection pooler (PgBouncer, pgpool-II) that manages its own pool. Double-pooling wastes connections and adds latency.
- Running database migrations (Alembic) where you want explicit single-connection control.
from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import create_async_engine
# Serverless / migration context
migration_engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/db",
poolclass=NullPool,
)
# Standard long-running service — default AsyncAdaptedQueuePool
service_engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/db",
pool_size=20,
max_overflow=10,
)
| Scenario | Pool class | Reason |
|---|---|---|
| Long-running ASGI service | AsyncAdaptedQueuePool | Amortizes connection overhead |
| AWS Lambda / Cloud Run | NullPool | No persistent process to hold connections |
| Behind PgBouncer (transaction mode) | NullPool | Avoid double-pooling |
| Alembic migrations | NullPool | Single-connection control |
| Behind PgBouncer (session mode) | AsyncAdaptedQueuePool | Sessions are stable, pool is safe |
pool_recycle calibration for cloud databases
Cloud databases terminate idle connections server-side. The termination window varies:
| Provider | Default idle timeout |
|---|---|
| AWS RDS (PostgreSQL) | ~8 hours (configurable) |
| AWS Aurora Serverless v2 | ~5 minutes when scaled to zero |
| GCP Cloud SQL | No forced idle timeout by default |
| Azure Database for PostgreSQL | Configurable; flexible server default 10 min |
| Supabase (pooler off) | 6 hours |
Set pool_recycle to roughly 80 % of the provider's idle timeout. For Aurora Serverless v2, pool_recycle=240 (4 minutes) prevents stale connection surprises after a quiet period.
Graceful startup and shutdown
import asyncio
import logging
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker, AsyncEngine
logger = logging.getLogger(__name__)
DATABASE_URL = "postgresql+asyncpg://appuser:secret@db.internal:5432/appdb"
_engine: AsyncEngine | None = None
_session_factory: async_sessionmaker[AsyncSession] | None = None
async def startup() -> None:
global _engine, _session_factory
_engine = create_async_engine(
DATABASE_URL,
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=False,
)
_session_factory = async_sessionmaker(_engine, expire_on_commit=False)
# Warm the pool by opening a test connection
async with _engine.connect() as conn:
await conn.execute(__import__("sqlalchemy").text("SELECT 1"))
logger.info("Database engine ready")
async def shutdown() -> None:
global _engine
if _engine is not None:
await _engine.dispose()
logger.info("Database engine disposed")
@asynccontextmanager
async def get_session() -> AsyncSession:
assert _session_factory is not None, "Call startup() before using get_session()"
async with _session_factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
For a FastAPI application, wire startup and shutdown into the lifespan async context manager so that engine.dispose() is always awaited before the process exits.
Hybrid Architectures & Migration Strategies
Migrating from SQLAlchemy 1.4 sync engine to 2.0 async engine
SQLAlchemy 1.4 introduced the async API under the sqlalchemy.ext.asyncio namespace as an opt-in feature. SQLAlchemy 2.0 makes it first-class. The migration path is mechanical but has a few sharp edges.
Step 1 — Swap the engine factory
# Before (1.4 sync)
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@host/db", pool_size=10)
# After (2.0 async)
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db", pool_size=10)
Step 2 — Replace Session with AsyncSession and sessionmaker with async_sessionmaker
# Before
from sqlalchemy.orm import Session, sessionmaker
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
users = session.execute(select(User)).scalars().all()
# After
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async with AsyncSessionLocal() as session:
users = (await session.execute(select(User))).scalars().all()
Step 3 — Audit relationship loading
All lazy-loaded relationships (lazy="select", the default) become MissingGreenlet errors in async. Switch to:
lazy="selectin"— issues a separateSELECT INquery per relationship (safe, explicit).lazy="joined"— uses a JOIN in the parent query.selectinload()/joinedload()at query time.
Step 4 — Move synchronous ORM utilities
session.refresh(), inspect(), and hybrid properties that trigger attribute access must be awaited or wrapped in run_sync. Use await session.refresh(obj) for the former.
Step 5 — Update Alembic env.py
# In env.py for async migrations
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
def run_migrations_online() -> None:
connectable = create_async_engine(DATABASE_URL, poolclass=NullPool)
async def do_run() -> None:
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
asyncio.run(do_run())
Running sync utility code alongside async handlers
Sometimes you genuinely need synchronous database access—in a Celery task, a CLI script, or a legacy integration. The correct pattern is to create a separate synchronous engine rather than trying to call async code from a sync context:
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
SYNC_DATABASE_URL = "postgresql+psycopg2://appuser:secret@db.internal:5432/appdb"
sync_engine = create_engine(SYNC_DATABASE_URL, pool_size=5)
def get_invoice_sync(invoice_id: int) -> Invoice | None:
with Session(sync_engine) as session:
return session.get(Invoice, invoice_id)
Never call asyncio.run() inside an async context, and never call loop.run_until_complete() from inside a running loop—both cause RuntimeError: This event loop is already running.
Multi-tenant engines with runtime URL switching
Some applications need per-tenant database URLs—typically in a schema-per-tenant or database-per-tenant model. Create engines lazily and cache them:
import asyncio
from functools import lru_cache
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession, async_sessionmaker
_engine_cache: dict[str, AsyncEngine] = {}
_lock = asyncio.Lock()
async def get_tenant_engine(tenant_id: str, database_url: str) -> AsyncEngine:
if tenant_id not in _engine_cache:
async with _lock:
if tenant_id not in _engine_cache: # double-checked
_engine_cache[tenant_id] = create_async_engine(
database_url,
pool_size=5, # Smaller per-tenant; many tenants share the host
max_overflow=2,
pool_recycle=1800,
pool_pre_ping=True,
)
return _engine_cache[tenant_id]
async def get_tenant_session(tenant_id: str, database_url: str) -> AsyncSession:
engine = await get_tenant_engine(tenant_id, database_url)
factory = async_sessionmaker(engine, expire_on_commit=False)
return factory()
Cap the total engines (and therefore total connections) with an eviction policy when tenant count is large.
Production Pitfalls & Anti-Patterns
pool_sizeexceedsmax_connections— Each process openspool_sizepersistent connections. With 8 processes andpool_size=30, you hit 240 connections. If PostgreSQL'smax_connections=200, you getasyncpg.exceptions.TooManyConnectionsErrorat startup. Fix: apply the sizing formula above and setmax_connectionsinpostgresql.confdeliberately.- Missing
pool_pre_pingon cloud-hosted databases — Cloud load balancers silently drop idle TCP connections. The pool holds a reference to a socket that the database has already closed. The next query raisesasyncpg.exceptions.ConnectionDoesNotExistError. Fix: setpool_pre_ping=Trueunconditionally for any managed database service. expire_on_commit=True(the default) in async code — Aftersession.commit(), accessingobj.idor any mapped attribute triggers an implicit lazy load on a closed connection, raisingsqlalchemy.exc.MissingGreenlet. Fix: always setexpire_on_commit=Falseinasync_sessionmaker.statement_cache_size > 0behind PgBouncer in transaction mode — PgBouncer in transaction pooling mode may route different queries from the same asyncpg connection to different backend connections. Prepared statements are backend-local; a cached statement reference sent to the wrong backend raisesasyncpg.exceptions.InvalidSQLStatementNameError. Fix: setstatement_cache_size=0inconnect_argswhen using PgBouncer in transaction mode.- Forgetting
await engine.dispose()at shutdown — asyncpg maintains background tasks per connection (keepalive, notification listeners). If the event loop exits while connections are open, Python logsTask was destroyed but it is pending!warnings and may suppress actual application errors. Fix: alwaysawait engine.dispose()in your shutdown hook. - Creating the engine at module import time with
echo=True—echo=Trueroutes all SQL through Python'sloggingmodule synchronously. In high-throughput async applications this serializes log writes and creates measurable latency. Fix: useecho=Falsein production; redirect SQLAlchemy'ssqlalchemy.enginelogger to an async-compatible handler if SQL logging is needed.
Frequently Asked Questions
What is the difference between pool_size and max_overflow, and which should I tune first?pool_size is the number of connections that persist in the pool indefinitely. max_overflow adds temporary connections during traffic peaks; these are closed and not returned to the pool when released. Tune pool_size to match your steady-state concurrency (the number of simultaneous database operations your application runs at P50 load). Tune max_overflow to cover your P99 burst without exhausting max_connections. Start with pool_size — getting that right matters more than overflow headroom.
When should I use NullPool instead of the default AsyncAdaptedQueuePool?
Use NullPool in three situations: serverless/ephemeral processes where connections would be frozen between invocations; behind an external connection pooler like PgBouncer in transaction mode (to avoid double-pooling); and in Alembic migration scripts where you want single-connection control. For every long-running ASGI service, keep AsyncAdaptedQueuePool — the connection amortization is the entire point of a pool.
Why does accessing order.id after session.commit() raise MissingGreenlet?
SQLAlchemy expires ORM attributes after commit by default. When you later access an expired attribute, the ORM attempts a lazy SELECT to refresh it. In an async context there is no implicit greenlet to run that blocking SELECT—hence MissingGreenlet. The fix is expire_on_commit=False in async_sessionmaker, which keeps the last-committed values in memory. The trade-off is that the object in memory may become stale if another process modifies the same row; refresh explicitly with await session.refresh(obj) if you need the latest database state.
How do I pass TLS certificates to asyncpg through create_async_engine?
Build an ssl.SSLContext, set the CA file, and pass it as connect_args={"ssl": ssl_ctx}. SQLAlchemy forwards the value directly to asyncpg.connect(). Do not pass ssl=True—that uses Python's default context without certificate verification, which is insecure for production. For RDS, download the AWS RDS CA bundle and reference it in ssl.create_default_context(cafile=...).
Can I use the same AsyncEngine from multiple threads?
No. AsyncAdaptedQueuePool uses asyncio.Queue, which is not thread-safe. The engine must be used entirely within the event loop thread. If you have background threads (e.g., a Celery worker), create a separate synchronous engine for them using create_engine with a sync driver. For details on combining async and sync in the same process, see Handling Connection Leaks and Pool Exhaustion.
Related
- Async Engines, Dialects, and Connection Pooling — parent overview covering driver selection, pool architecture, and async fundamentals
- Setting Up asyncpg Connection Pool Size for High Concurrency — worked benchmarks and sizing formulas specific to asyncpg
- Setting Up an Async Engine from Scratch — step-by-step guide from environment setup to first query
- Tuning Connection Pools for Cloud Databases — PgBouncer integration, cloud provider idle timeout settings, and recycle calibration
- Handling Connection Leaks and Pool Exhaustion — diagnosing and fixing
TimeoutErrorand leaked connections in production