Async Engines, Dialects, and Connection Pooling in SQLAlchemy 2.0
SQLAlchemy 2.0's async subsystem — built on create_async_engine, AsyncAdaptedQueuePool, and native async DBAPI drivers — gives Python backend teams a production-grade toolkit for high-concurrency database workloads; this page covers every layer of that stack, from greenlet bridge internals to cloud pool tuning numbers, for developers who want to go beyond the basic wiring and run async SQLAlchemy correctly under load.
Architectural Foundations
How the Async Engine Works
create_async_engine() does not instantiate a fundamentally different engine. It wraps a standard Engine inside an AsyncEngine adapter that translates every synchronous DBAPI operation into awaitable coroutines. The underlying connection pool, dialect stack, and SQL compilation pipeline are unchanged — what changes is the execution path that crosses the Python↔C boundary into the DBAPI layer.
When your code calls await session.execute(stmt), SQLAlchemy compiles the statement normally, then delegates to the async connection's execute() coroutine. The async connection in turn awaits the raw DBAPI cursor operation provided by the driver (asyncpg, psycopg3, aiomysql, aiosqlite). The event loop is never blocked; execution suspends at the await and resumes when the driver signals I/O completion.
# Async — full wiring from engine to result
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
customer_id: Mapped[int]
total_cents: Mapped[int]
DATABASE_URL = "postgresql+asyncpg://app:secret@localhost:5432/orders_db"
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=5)
async_session = async_sessionmaker(engine, expire_on_commit=False)
async def get_large_orders(min_cents: int) -> list[Order]:
async with async_session() as session:
stmt = select(Order).where(Order.total_cents >= min_cents)
result = await session.execute(stmt)
return result.scalars().all()
asyncio.run(get_large_orders(10_000))
The Greenlet Bridge
SQLAlchemy's async adapter relies on the greenlet library to handle the impedance mismatch between its synchronous ORM internals (attribute loading, lazy-relation traversal, before/after event hooks) and the async execution context.
When the async engine needs to call into synchronous code — for example, ORM event hooks or the connection pool's internal bookkeeping — it uses greenlet.spawn() to switch into a synchronous greenlet, completes the operation, then switches back. This "sync/async bridge" is invisible during normal await-based workflows, but it surfaces as MissingGreenletError the instant you trigger synchronous I/O from outside the bridge (typically lazy-loading a relationship in async context).
The practical rule: any attribute access that would trigger SQL must be eagerly loaded. Use selectinload() or joinedload() at query time, or explicitly await session.refresh(obj, attribute_names=["relationship"]) after the fact. The integrating SQLAlchemy async with FastAPI and Starlette guide shows how to wire this correctly in request-scoped session factories.
AsyncAdaptedQueuePool
The default pool for async engines is AsyncAdaptedQueuePool. It replaces QueuePool's threading primitives with asyncio.Queue and asyncio.Lock, so connection checkout and return are coroutine-safe without OS thread involvement.
Key internal differences from QueuePool:
| Attribute | QueuePool (sync) | AsyncAdaptedQueuePool (async) |
|---|---|---|
| Wait primitive | threading.Lock | asyncio.Lock |
| Overflow queue | queue.Queue | asyncio.Queue |
| Connection check-in | synchronous | await pool.checkin() |
| Pre-ping mechanism | thread-blocking | non-blocking coroutine |
Under the hood, AsyncAdaptedQueuePool delegates to an AsyncAdaptedQueuePool-wrapped QueuePool whose synchronous methods are called through greenlet bridges — meaning the pool configuration parameters (pool_size, max_overflow, pool_timeout, pool_recycle, pool_pre_ping) are identical to their synchronous counterparts.
DBAPI Adaptation Layer
SQLAlchemy does not talk directly to asyncpg or psycopg3. It uses an AsyncAdapt* shim layer: AsyncAdaptedConnection wraps the driver's native async connection and exposes a synchronous-looking DBAPI 2.0 interface, which the engine's dialect stack expects. This indirection is why you can use all the familiar text(), select(), insert() constructs without modification — only the execution path changes.
Key Component Deep-Dive 1: Async Driver Selection and create_async_engine
The choice of async DBAPI driver has significant runtime implications. Each driver has distinct wire-protocol behavior, prepared-statement caching defaults, and error surface.
Driver Comparison
| Driver | URL prefix | Protocol | Prepared stmt cache | Notable constraint |
|---|---|---|---|---|
| asyncpg | postgresql+asyncpg:// | binary | On by default | Incompatible with PgBouncer transaction mode (unless statement_cache_size=0) |
| psycopg3 (async) | postgresql+psycopg:// | text + binary | Optional | Requires psycopg[binary] or psycopg[c] for best performance |
| aiomysql | mysql+aiomysql:// | MySQL text | None | No server-side cursor; large result sets buffer in RAM |
| asyncmy | mysql+asyncmy:// | MySQL text | None | Lower latency than aiomysql at high QPS |
| aiosqlite | sqlite+aiosqlite:// | SQLite file | N/A | Runs sync SQLite in threadpool; not for production Postgres workloads |
For a full treatment of asyncpg vs psycopg3 trade-offs, see the choosing between asyncpg and psycopg async drivers guide. For SQLite and MySQL driver selection rationale, the selecting async drivers for SQLite, MySQL, and Postgres page covers the decision matrix.
create_async_engine Parameters
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool, NullPool
# Production PostgreSQL engine (asyncpg)
pg_engine = create_async_engine(
"postgresql+asyncpg://app:secret@db.internal:5432/app_db",
# Pool sizing
poolclass=AsyncAdaptedQueuePool,
pool_size=10, # persistent connections per process
max_overflow=5, # burst connections (total max = 15)
pool_timeout=30, # seconds to wait for a connection before TimeoutError
pool_recycle=1800, # recycle connections every 30 min (RDS idle timeout is 8h but network gear can kill sooner)
pool_pre_ping=True, # issue SELECT 1 before handing out stale connections
# Dialect / driver kwargs passed through connect_args
connect_args={
"statement_cache_size": 0, # REQUIRED if behind PgBouncer transaction mode
"server_settings": {"application_name": "orders-api"},
},
echo=False,
)
# Serverless / AWS Lambda — no pooling
lambda_engine = create_async_engine(
"postgresql+asyncpg://app:secret@db.internal:5432/app_db",
poolclass=NullPool, # no pool; each request opens and closes a raw connection
)
# psycopg3 alternative (drop-in for most workloads)
psycopg_engine = create_async_engine(
"postgresql+psycopg://app:secret@db.internal:5432/app_db",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
# MySQL (aiomysql)
mysql_engine = create_async_engine(
"mysql+aiomysql://app:secret@db.internal:3306/app_db",
pool_size=10,
max_overflow=5,
pool_recycle=3600,
)
# SQLite for testing and local development
sqlite_engine = create_async_engine(
"sqlite+aiosqlite:///./test.db",
connect_args={"check_same_thread": False},
)
Dialect-Specific Gotchas
Every driver surfaces at least one non-obvious runtime behavior in production. asyncpg's prepared-statement cache conflicts with PgBouncer's transaction-mode pooling because each new physical connection gets a fresh prepared-statement namespace, invalidating the cache entries. psycopg3 defaults to autocommit at the driver level, requiring explicit transaction wrapping. aiomysql does not expose server-side cursors, meaning yield_per() streams rows through RAM rather than wire. Full coverage lives in the dialect-specific gotchas and driver quirks reference.
Key Component Deep-Dive 2: Connection Pool Configuration
Getting pool configuration wrong is the most common cause of production incidents with async SQLAlchemy. Too small and you queue requests until TimeoutError; too large and you exhaust database connection slots.
Parameters and Their Interactions
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool
engine = create_async_engine(
"postgresql+asyncpg://app:secret@postgres:5432/app_db",
poolclass=AsyncAdaptedQueuePool,
# Base pool — connections held open permanently
pool_size=10,
# Overflow — created on demand, closed when returned (total cap = pool_size + max_overflow)
max_overflow=5,
# How long (seconds) a checkout() call waits before raising TimeoutError
# Raise QueuePool limit error: "QueuePool limit of size 10 overflow 5 reached, connection timed out, timeout 30"
pool_timeout=30,
# Force recycle connections older than N seconds (prevents stale connections from cloud NAT)
pool_recycle=1800,
# Issue a lightweight SELECT 1 before returning a connection from the pool
# Catches connections killed by DB server restart or cloud instance failover
pool_pre_ping=True,
)
pool_size sets the number of connections kept alive between requests. For an async application, this is per-process. With 4 Gunicorn uvicorn workers, a pool_size=10 means 40 connections to Postgres before overflow.
max_overflow adds burst capacity. Overflow connections are opened on demand and closed immediately when returned to the pool (not recycled). They count against the database's max_connections limit.
pool_timeout is the maximum wait before sqlalchemy.exc.TimeoutError is raised with the message QueuePool limit of size X overflow Y reached, connection timed out, timeout 30. Set it shorter than your HTTP server's request timeout to fail fast and surface saturation.
pool_recycle forces a connection refresh after N seconds regardless of health. AWS RDS and Aurora set idle connection timeout at 8 hours by default, but TCP middleboxes (NAT gateways, load balancers) can silently drop connections in 5–30 minutes. A value of 1800 (30 minutes) is safe for most cloud setups.
pool_pre_ping issues a SELECT 1 when a connection is checked out from the pool. This adds ~1 ms of latency per request but prevents the asyncpg.exceptions.ConnectionDoesNotExistError / OperationalError: server closed the connection unexpectedly that fires after a DB failover event. Enable it unconditionally in cloud environments.
NullPool for Serverless Environments
Lambda functions, Cloud Run containers, and other ephemeral compute cannot maintain persistent pool connections. The solution is NullPool, which skips the pool entirely:
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
# Each call to engine.connect() opens a fresh TCP connection.
# Each call to conn.close() tears it down immediately.
engine = create_async_engine(
"postgresql+asyncpg://app:secret@db.internal/app_db",
poolclass=NullPool,
)
For Lambda at scale, pair NullPool with RDS Proxy or PgBouncer in transaction-pool mode upstream. The proxy maintains persistent connections to Postgres on your behalf; Lambda gets a fresh proxy connection per invocation without exhausting max_connections. Full guidance lives in the configuring async engines and connection pools reference and the tuning connection pools for cloud databases deep-dive.
Pool Events for Observability
from sqlalchemy import event
from sqlalchemy.pool import AsyncAdaptedQueuePool
import logging
logger = logging.getLogger("db.pool")
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, connection_record):
logger.info("Pool: new connection opened")
@event.listens_for(engine.sync_engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
logger.debug("Pool: connection checked out")
@event.listens_for(engine.sync_engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
logger.debug("Pool: connection returned to pool")
Note the engine.sync_engine access — pool events are registered on the underlying synchronous engine, not the async wrapper. This is one of the few places the sync/async boundary is visible in application code. The handling connection leaks and pool exhaustion guide expands on pool monitoring and leak detection with pool_status().
Key Component Deep-Dive 3: Framework Integration
FastAPI and Starlette: Request-Scoped AsyncSession
The canonical FastAPI pattern creates one engine at startup, one async_sessionmaker factory, and injects a fresh session per request via a dependency:
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Annotated
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
DATABASE_URL = "postgresql+asyncpg://app:secret@postgres:5432/app_db"
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
pool_recycle=1800,
)
# expire_on_commit=False prevents DetachedInstanceError when accessing
# ORM attributes after the session has committed and closed
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
yield # engine initialised at module level
await engine.dispose() # drain pool on shutdown
app = FastAPI(lifespan=lifespan)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionFactory() as session:
yield session
# AsyncSessionFactory context manager commits or rolls back here
DBDep = Annotated[AsyncSession, Depends(get_db)]
@app.get("/users/{user_id}")
async def read_user(user_id: int, db: DBDep) -> dict:
stmt = select(User).where(User.id == user_id, User.is_active.is_(True))
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if user is None:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="User not found")
return {"id": user.id, "email": user.email}
expire_on_commit=False is critical in async FastAPI code. By default, SQLAlchemy marks all ORM attributes as "expired" after a commit, requiring a fresh SELECT to access them. In async context, that SELECT fires as a lazy load on attribute access — outside an active async context — raising MissingGreenletError. Setting expire_on_commit=False keeps the in-memory state valid after commit, trading a tiny amount of stale-data risk for correct async behavior. The dedicated using expire_on_commit=False in FastAPI dependencies page covers the trade-offs in depth.
Celery Workers: Async SQLAlchemy in Sync Task Context
Celery tasks run in synchronous worker processes, not inside an asyncio event loop. The two integration strategies are:
Strategy 1 — Run an event loop per task (simple, safe for low-throughput workers):
import asyncio
from celery import Celery
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, update
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Invoice(Base):
__tablename__ = "invoices"
id: Mapped[int] = mapped_column(primary_key=True)
status: Mapped[str]
total_cents: Mapped[int]
celery_app = Celery("tasks", broker="redis://localhost:6379/0")
engine = create_async_engine(
"postgresql+asyncpg://app:secret@postgres:5432/app_db",
pool_size=5,
max_overflow=2,
pool_pre_ping=True,
)
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)
async def _mark_invoice_paid(invoice_id: int) -> None:
async with AsyncSessionFactory() as session:
async with session.begin():
stmt = (
update(Invoice)
.where(Invoice.id == invoice_id)
.values(status="paid")
)
await session.execute(stmt)
@celery_app.task
def mark_invoice_paid(invoice_id: int) -> None:
asyncio.run(_mark_invoice_paid(invoice_id))
Strategy 2 — Shared event loop with gevent or asyncio worker pool (for high-throughput Celery):
For workers that process many tasks concurrently, use asyncio.get_event_loop().run_until_complete() with a persistent loop per worker process, or switch to a Celery executor that natively supports asyncio. The using SQLAlchemy async with Celery task workers page covers both approaches with benchmark data.
A key concern for Celery is pool_size. Each Celery worker process spins up its own engine instance. With 8 worker processes and pool_size=5, you consume 40 Postgres connections at rest before overflow. Size accordingly and consider using NullPool + PgBouncer for task workers with sporadic access patterns.
Advanced Patterns & Production Configuration
Real Pool Numbers for Common Cloud Configurations
Postgres's hard max_connections is a finite resource. The formula for pool_size per application process is:
pool_size = (max_connections - reserved_for_superuser) / (total_app_processes)
Typical cloud configurations:
| Instance | max_connections | Suggested pool_size | max_overflow | Notes |
|---|---|---|---|---|
| RDS db.t4g.medium (4 GB) | 170 | 8 (with 20 replicas) | 4 | Reserve 10 for RDS internal + admin |
| RDS db.r6g.xlarge (32 GB) | 1306 | 25 (with 40 replicas) | 10 | Comfortable headroom |
| Aurora Serverless v2 | Scales with ACU | 10–20 | 5 | Use RDS Proxy to multiplex |
| Cloud SQL (n2-standard-4) | 1000 | 20 (with 40 replicas) | 10 | Cloud SQL Proxy handles TLS termination |
| Supabase (free tier) | 60 | 5 | 2 | Use PgBouncer (transaction mode, port 6543) |
For a detailed derivation and Aurora-specific tuning, see tuning connection pools for cloud databases.
PgBouncer and RDS Proxy: Transaction-Mode Pooling
When your application pool sits in front of PgBouncer or RDS Proxy in transaction-mode, each SQL transaction may execute on a different physical Postgres connection. This breaks several asyncpg features:
- Prepared statement cache — asyncpg caches prepared statements per physical connection. A cache miss under PgBouncer raises
asyncpg.exceptions.InvalidCachedStatementError. Fix: passstatement_cache_size=0inconnect_args. - Advisory locks —
pg_advisory_lock()is connection-scoped; transaction mode invalidates them. - LISTEN/NOTIFY — requires a dedicated, session-mode connection bypassing PgBouncer.
# Engine configured for PgBouncer transaction mode
engine = create_async_engine(
"postgresql+asyncpg://app:secret@pgbouncer:6432/app_db",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
connect_args={
"statement_cache_size": 0, # disable prepared-statement cache
"prepared_statement_cache_size": 0, # asyncpg >= 0.27
},
)
RDS Proxy handles prepared statements transparently, so statement_cache_size=0 is not required there. But RDS Proxy does add ~1–3 ms of latency per query due to TLS termination and multiplexing overhead.
Read Replica Routing
For read-heavy workloads, route SELECT statements to a read replica while writes go to the primary. This requires two engines:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, text
PRIMARY_URL = "postgresql+asyncpg://app:secret@primary.db.internal:5432/app_db"
REPLICA_URL = "postgresql+asyncpg://app:secret@replica.db.internal:5432/app_db"
write_engine = create_async_engine(PRIMARY_URL, pool_size=10, max_overflow=5, pool_pre_ping=True)
read_engine = create_async_engine(REPLICA_URL, pool_size=20, max_overflow=10, pool_pre_ping=True)
WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession = async_sessionmaker(read_engine, expire_on_commit=False)
async def get_product_catalog(category: str) -> list[dict]:
"""Read from replica — eventual consistency is acceptable here."""
async with ReadSession() as session:
stmt = select(text("id, name, price_cents")).where(
text("category = :cat")
).bindparams(cat=category)
result = await session.execute(stmt)
return [dict(row) for row in result.mappings()]
async def create_product(name: str, price_cents: int, category: str) -> int:
"""Write to primary — strong consistency required."""
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Product: # simplified for illustration
pass
async with WriteSession() as session:
async with session.begin():
session.add_all([]) # placeholder
return 0
The advanced query patterns and bulk data operations pillar covers multi-tenant schema routing and dynamic replica selection in more depth.
Engine Disposal and Graceful Restart
await engine.dispose() closes all connections in the pool and, by default, waits for outstanding checkouts to complete. In containerized environments with SIGTERM-based graceful shutdown windows, call it inside the ASGI lifespan finally block:
@asynccontextmanager
async def lifespan(app: FastAPI):
try:
yield
finally:
await engine.dispose()
await read_engine.dispose()
Pass close=False to engine.dispose(close=False) if you want to invalidate the pool but allow reconnection on the next request — useful for forking processes (Gunicorn prefork model) where the parent process should not close connections that child processes will inherit.
Common Pitfalls & Anti-Patterns
The errors below appear verbatim in production logs. Each has a single, addressable root cause.
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called— You accessed a lazy-loaded ORM relationship (or triggered an expired attribute) from outside the SQLAlchemy greenlet bridge, typically by accessingobj.relationshipafter the session closed. Fix: load all needed relationships eagerly withselectinload()/joinedload()at query time, or callawait session.refresh(obj, ["relationship"])before closing the session. Setexpire_on_commit=Falseto prevent attribute expiry on commit triggering the same error.greenlet.error: cannot switch to a different thread/GreenletSpawnError— An async SQLAlchemy operation was initiated from a thread that did not create the greenlet. Common cause: runningasyncio.run()inside a Celery beat thread or inside aconcurrent.futures.ThreadPoolExecutor. Fix: ensure all async SQLAlchemy calls originate from the same thread that owns the event loop. See fixing GreenletSpawnError in async SQLAlchemy workflows.sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 5 reached, connection timed out, timeout 30— The pool is saturated. All 15 connections (pool_size + max_overflow) are checked out and no new connection was available withinpool_timeoutseconds. Root causes: long-running transactions holding connections, slow queries, orpool_sizetoo small for the request rate. Fix: increasepool_sizeormax_overflow, find and fix slow transactions, or add a read replica. Enable pool checkout events to measure wait time.asyncpg.exceptions.InvalidCachedStatementError: cached statement plan is invalid— asyncpg's prepared-statement cache is returning a plan that PgBouncer routed to a different physical backend that has no record of the statement. Fix:connect_args={"statement_cache_size": 0}. If you control PgBouncer, switching to session-mode pooling also resolves it, at the cost of fewer multiplexed connections.RuntimeError: Event loop is closed— Anawaitcall reached the event loop after it had been closed, often because code runsasyncio.run()at the top level and then attempts to reuse an engine created before the run. Fix: create the engine inside the coroutine passed toasyncio.run(), or use a single persistent loop lifecycle managed by the ASGI server.sqlalchemy.orm.exc.DetachedInstanceError: Instance <User> is not bound to a Session— An ORM object was accessed after its parent session closed, triggering lazy-load on an expired attribute. This is the sync equivalent ofMissingGreenletError. Fix:expire_on_commit=Falseon the session factory, plus eager-load all relationships you access after session close.asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already— Your application's aggregate connection count (all processes × pool_size + max_overflow) exceeds Postgresmax_connections. Fix: reducepool_size, add PgBouncer or RDS Proxy upstream, or upgrade to a larger DB instance. The setting pool_size and max_overflow for AWS RDS guide has instance-specific numbers.
Frequently Asked Questions
Can I use a synchronous DBAPI driver with create_async_engine?
No. create_async_engine will raise ArgumentError or InvalidRequestError at engine creation time if the driver URL does not map to a known async dialect (asyncpg, psycopg3 async, aiomysql, asyncmy, aiosqlite). Synchronous drivers (psycopg2, cx_Oracle, pymysql) block the OS thread and cannot be awaited. If you need to keep a synchronous driver temporarily during migration, use AsyncEngine.run_sync() with caution — it spawns a greenlet bridge call that still blocks a thread under the hood.
How do I share a single engine across multiple FastAPI workers?
You don't — and you shouldn't try. Each Uvicorn/Gunicorn worker process gets its own Python interpreter, its own event loop, and its own engine. The create_async_engine() call and async_sessionmaker factory should be module-level globals initialized at import time. The pool is per-process. This is the correct architecture; just size pool_size accordingly across all your worker processes.
Why does pool_pre_ping=True not prevent all stale-connection errors?pool_pre_ping issues a SELECT 1 when a connection is checked out, not during the actual query. If the network drops between the pre-ping and the subsequent query (a very narrow window), the stale-connection error still fires. More importantly, pre-ping adds latency. In ultra-low-latency paths, some teams disable it and instead rely on proper pool_recycle settings plus application-level retry logic on OperationalError.
Is it safe to call engine.dispose() and then continue using the engine?
Yes. After dispose(), the pool is invalidated and all existing connections are closed. The next async with engine.connect() call creates a fresh connection and rebuilds the pool transparently. This behavior is used deliberately in Gunicorn's post_fork hook to ensure child processes do not share connections with the parent.
What is the difference between AsyncSession and AsyncConnection?AsyncSession is the ORM-level interface — it tracks the identity map, manages unit-of-work, and lets you work with mapped classes. AsyncConnection is the Core-level interface — it provides execute() / executemany() directly against the engine without ORM overhead, suitable for bulk inserts, raw SQL, or administrative commands. For high-throughput bulk operations, AsyncConnection avoids ORM instrumentation overhead; see the advanced query patterns and bulk data operations pillar for benchmarks.
How do I run Alembic migrations with an async engine?
Alembic requires a synchronous connection for run_migrations_online(). The standard pattern is to use AsyncEngine.sync_engine or to call asyncio.run() with async_engine.connect() inside env.py, then call conn.run_sync(do_run_migrations). Full wiring is covered in the configuring Alembic with async SQLAlchemy engines guide.
Related
- Choosing Between asyncpg and psycopg Async Drivers — Head-to-head driver comparison with connection lifecycle and prepared-statement behavior.
- Configuring Async Engines and Connection Pools — Full parameter reference with production-validated defaults.
- Handling Connection Leaks and Pool Exhaustion — Diagnosing and fixing pool saturation before it pages your on-call.
- Integrating SQLAlchemy Async with FastAPI and Starlette — Request-scoped sessions, lifespan management, and dependency injection patterns.
- Tuning Connection Pools for Cloud Databases — RDS, Aurora, Cloud SQL, and Supabase instance-specific pool sizing.
- Dialect-Specific Gotchas and Driver Quirks — Per-driver error surface and workarounds for PgBouncer, Aurora, and more.
- Mastering SQLAlchemy 2.0 Core and ORM Architecture — The ORM session lifecycle and transaction isolation model that async engines operate within.