Dialect-Specific Gotchas and Driver Quirks in Async SQLAlchemy
Async SQLAlchemy is not one thing — it is a thin async shim over four distinct database drivers, each with its own connection model, type encoding pipeline, and server negotiation protocol, all covered in the async engines, dialects, and connection pooling guide.
When a TimeoutError fires in production after two hours of stability, or Decimal values silently become floats, or every mutation raises OperationalError: cannot start a transaction within a transaction, the root cause almost always lives in one of these driver-specific layers rather than in SQLAlchemy's ORM. This guide catalogues the most expensive lessons learned — covering asyncpg, psycopg3, aiosqlite, and aiomysql — so you can fix them before they reach production.
Concept & Execution Model
SQLAlchemy 2.0 abstracts over its async drivers through the AsyncEngine / AsyncConnection / AsyncSession stack. Each call to async with AsyncSession(engine) as session ultimately calls into a driver-specific connection pool implementation. SQLAlchemy issues SQL as text or compiled constructs; the driver serialises that text and any bound parameters to the wire protocol.
The critical insight is that SQLAlchemy does not own the TCP connection lifecycle. The driver does. This means anything the driver does to a connection — caching a prepared statement, applying a server-side type codec, pinning a transaction mode — is invisible to SQLAlchemy's abstraction layer unless you explicitly thread it through connect_args or execution_options.
# Async engine creation with driver-specific connect_args
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
connect_args={
# Driver-level knobs — not SQLAlchemy pool settings
"statement_cache_size": 0,
"server_settings": {
"application_name": "my_service",
"jit": "off",
},
},
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
The connect_args dict is passed verbatim to the underlying driver's connect() call. Every driver interprets this dict differently, which is the core source of confusion.
Query Construction & Async Execution Patterns
asyncpg: Prepared Statement Caching and PgBouncer
asyncpg's defining feature is aggressive prepared-statement caching. On first execution of any query string, asyncpg sends a Parse message to PostgreSQL, which returns a named server-side prepared statement handle (e.g. __asyncpg_stmt_0__). Subsequent executions reuse that handle, skipping the parse/plan phase.
This is enormously fast for a dedicated TCP connection. It is catastrophically wrong behind PgBouncer in transaction pooling mode, where each transaction may land on a different backend server that has never seen your prepared statement. The error you see is:
asyncpg.exceptions.InvalidSQLStatementNameError:
prepared statement "__asyncpg_stmt_0__" does not exist
The fix is to disable prepared statements at the driver level:
# Async — asyncpg behind PgBouncer transaction pooling
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@pgbouncer:6432/mydb",
connect_args={
"statement_cache_size": 0, # disable client-side cache
"prepared_statement_cache_size": 0, # asyncpg ≥ 0.28 explicit knob
},
)
Both keys are required: statement_cache_size controls the LRU on the asyncpg Connection object, while prepared_statement_cache_size controls the newer per-connection cache introduced in asyncpg 0.28. Setting only one leaves the other active on newer driver versions.
For a full walkthrough of diagnosing and fixing prepared-statement errors — including DuplicatePreparedStatementError and the cannot insert multiple commands into a prepared statement variant — see the guide to handling asyncpg prepared statement errors with PgBouncer.
psycopg3: Binary Protocol and Row Factory
psycopg3 (psycopg[c] or psycopg[binary]) defaults to the binary protocol for many types, which is faster but requires the server to support it. When connecting to PgBouncer (which does not speak binary protocol), or an older Postgres version, you may see:
psycopg.errors.ProtocolViolation: expected data block, got message type 'E'
Force text protocol via:
# Async — psycopg3 with forced text protocol
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+psycopg://user:pass@pgbouncer:6432/mydb",
connect_args={
"binary": False, # disable binary protocol globally
},
)
psycopg3 also ships with an async-native connection pool (psycopg_pool.AsyncConnectionPool) that SQLAlchemy can integrate with via create_async_engine(..., async_creator=...), bypassing SQLAlchemy's own pool entirely. This is useful when you need psycopg3-specific features like LISTEN/NOTIFY support.
State Management & Session Boundaries
aiosqlite: Single-Writer Locking
SQLite enforces database-wide write locking. In async code, this creates a subtle trap: if two coroutines concurrently attempt writes inside separate AsyncSession contexts, one will block (or raise OperationalError: database is locked) because aiosqlite serialises all operations through a background thread.
# Wrong — concurrent writes will deadlock or raise
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
engine = create_async_engine("sqlite+aiosqlite:///orders.db")
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def write_order(order_id: int) -> None:
async with AsyncSessionLocal() as session:
async with session.begin():
session.add(Order(id=order_id, status="pending"))
# These will contend on the single write lock
await asyncio.gather(write_order(1), write_order(2))
The correct pattern for async SQLite is to use a single shared engine with check_same_thread=False and to serialise writes through a queue or simply accept that SQLite is appropriate only for low-concurrency scenarios (tests, local development, single-user tools):
# Correct — configure aiosqlite for async use
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"sqlite+aiosqlite:///orders.db",
connect_args={"check_same_thread": False},
# NullPool avoids "database is locked" across test isolation boundaries
poolclass=StaticPool, # or NullPool for test isolation
)
For production multi-writer workloads, SQLite is the wrong database. The guide to selecting async drivers for SQLite, MySQL, and Postgres covers when each engine is appropriate.
aiomysql: Isolation Level and Autocommit
MySQL's default transaction isolation level is REPEATABLE READ (not READ COMMITTED as in PostgreSQL's default). This causes surprising behaviour in long-running async sessions that hold a transaction open while other connections commit: the session may see stale reads because its repeatable-read snapshot was taken at transaction start.
More critically, aiomysql connections start in autocommit=False by default, meaning every statement is implicitly wrapped in a transaction. Forgetting to commit or using the session without an explicit begin() block leaves uncommitted transactions on the server:
# Wrong — implicit transaction never committed
async with AsyncSession(engine) as session:
result = await session.execute(select(Order).where(Order.status == "pending"))
orders = result.scalars().all()
for order in orders:
order.status = "processing"
# Missing: await session.commit()
# MySQL holds an open transaction until the connection returns to the pool
# Correct — explicit transaction scope
async with AsyncSession(engine) as session:
async with session.begin():
result = await session.execute(select(Order).where(Order.status == "pending"))
orders = result.scalars().all()
for order in orders:
order.status = "processing"
# session.begin() context manager commits on exit
To change the isolation level for aiomysql:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"mysql+aiomysql://user:pass@localhost/mydb",
isolation_level="READ COMMITTED", # match Postgres behaviour
connect_args={
"charset": "utf8mb4", # always set explicitly
"autocommit": False,
},
)
Advanced Driver-Specific Patterns
asyncpg Type Codec Quirks
asyncpg ships its own binary type encoding system, independent of PostgreSQL's libpq text protocol. This means some Python types map to PostgreSQL types differently than you might expect:
JSON and JSONB: asyncpg decodes both json and jsonb columns as Python str by default, not as dict. SQLAlchemy's JSON column type works around this, but raw text() queries against JSONB columns return strings:
# asyncpg returns str for raw JSON queries — must decode manually
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def get_tenant_config(session: AsyncSession, tenant_id: int) -> dict:
result = await session.execute(
text("SELECT config FROM tenants WHERE id = :id"),
{"id": tenant_id},
)
row = result.fetchone()
if row is None:
return {}
# row.config is a str when column is json/jsonb in raw text()
import json
return json.loads(row.config) if isinstance(row.config, str) else row.config
Use SQLAlchemy's JSON type on the mapped column to ensure automatic codec registration:
from sqlalchemy import Column, Integer, JSON
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Tenant(Base):
__tablename__ = "tenants"
id = Column(Integer, primary_key=True)
config = Column(JSON) # SQLAlchemy registers asyncpg codec automatically
NUMERIC and Decimal: asyncpg decodes NUMERIC columns as Python Decimal by default. However, when using text() with inline arithmetic, the result type may be inferred as float by asyncpg's codec:
# asyncpg may return float for computed numeric expressions
result = await session.execute(
text("SELECT SUM(amount) * 1.1 AS total FROM invoices WHERE tenant_id = :id"),
{"id": tenant_id},
)
# total may be float, not Decimal — use explicit CAST in SQL
result = await session.execute(
text("SELECT CAST(SUM(amount) * 1.1 AS NUMERIC(12,2)) AS total FROM invoices WHERE tenant_id = :id"),
{"id": tenant_id},
)
Enum types: PostgreSQL ENUM types require asyncpg to register a codec for each custom enum. SQLAlchemy's Enum column type handles this automatically for mapped columns, but ad-hoc text() queries against enum columns return str values:
# Create PostgreSQL enum with SQLAlchemy — codec registered automatically
from sqlalchemy import Enum as SAEnum
class OrderStatus(enum.Enum):
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True)
status = Column(SAEnum(OrderStatus, name="order_status_enum"))
server_settings via connect_args
asyncpg (and psycopg3) support passing PostgreSQL SET configuration parameters at connection establishment time via server_settings. This is the correct place to configure parameters that must be set per-connection rather than globally:
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
connect_args={
"server_settings": {
"application_name": "order_service_v2",
"statement_timeout": "30000", # ms — kill runaway queries
"lock_timeout": "5000", # ms — avoid deadlock waits
"idle_in_transaction_session_timeout": "60000", # ms — release stuck sessions
"search_path": "myschema,public", # schema routing
"jit": "off", # disable JIT for OLTP workloads
},
},
)
These are equivalent to ALTER ROLE ... SET parameter = value but scoped to the current connection, making them safe for multi-tenant engines with different per-tenant schemas.
RETURNING Clause Support
RETURNING is a PostgreSQL extension; SQLAlchemy generates it for insert().returning() and update().returning() statements on PostgreSQL dialects, but the behaviour varies:
- asyncpg / psycopg3 (PostgreSQL): Full
RETURNINGsupport, including multi-row returns from bulk inserts. - aiomysql (MySQL 8.0+): No
RETURNINGsupport. SQLAlchemy falls back toINSERT ... ; SELECT LAST_INSERT_ID()for single-row inserts. Bulk inserts withreturning()raiseCompileError. - aiosqlite (SQLite 3.35+):
RETURNINGsupported for SQLite ≥ 3.35 only. Check your SQLite version: older versions raiseOperationalError: near "RETURNING": syntax error.
# Portable insert with RETURNING — works on asyncpg/psycopg3 only
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
async def create_invoice(session: AsyncSession, data: dict) -> int:
stmt = (
insert(Invoice)
.values(**data)
.returning(Invoice.id)
)
result = await session.execute(stmt)
return result.scalar_one()
For MySQL and old SQLite, the equivalent pattern uses session.add() + await session.flush():
# Portable pattern — works across all dialects
async def create_invoice_portable(session: AsyncSession, data: dict) -> Invoice:
invoice = Invoice(**data)
session.add(invoice)
await session.flush() # populates invoice.id via post-insert SELECT or LAST_INSERT_ID()
await session.refresh(invoice)
return invoice
Named vs Positional Parameters
asyncpg uses positional parameters ($1, $2, ...) internally. SQLAlchemy translates its named parameters (:param) to positional at compile time, so you never see this directly — unless you drop to raw asyncpg via engine.raw_connection():
# Wrong — SQLAlchemy named params, but using raw asyncpg connection
conn = await engine.raw_connection()
try:
# asyncpg requires positional $1 params, not :name
await conn.fetchrow("SELECT * FROM orders WHERE id = :id", order_id) # raises
finally:
conn.close()
# Correct — use positional params with raw asyncpg
conn = await engine.raw_connection()
try:
row = await conn.fetchrow("SELECT * FROM orders WHERE id = $1", order_id)
finally:
conn.close()
When using text() through SQLAlchemy's session, always use :name style — SQLAlchemy's compiler translates to the driver's native param style:
# Correct — SQLAlchemy handles translation
result = await session.execute(
text("SELECT * FROM orders WHERE tenant_id = :tid AND status = :status"),
{"tid": tenant_id, "status": "pending"},
)
Hybrid Architectures & Migration Strategies
Mixing Sync and Async Engines in the Same Process
Some applications need both sync and async database access — for example, Celery workers (sync) alongside FastAPI handlers (async) sharing the same SQLAlchemy models. The correct approach is to create separate engines rather than attempting to share a single engine:
# Two engines, same models — sync for Celery, async for FastAPI
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine
# Sync engine for Celery workers
sync_engine = create_engine(
"postgresql+psycopg2://user:pass@localhost/mydb",
pool_size=5,
max_overflow=10,
)
# Async engine for FastAPI
async_engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=20,
max_overflow=40,
connect_args={"statement_cache_size": 0}, # if behind PgBouncer
)
The connect_args for configuring async engines and connection pools covers pool tuning in depth. For cloud-hosted databases (RDS, CloudSQL, AlloyDB), see the connection pool tuning guide for cloud databases.
Migrating from asyncpg to psycopg3
psycopg3 offers better standards compliance and simpler LISTEN/NOTIFY integration. The migration path is:
- Replace
postgresql+asyncpg://withpostgresql+psycopg://in your DSN. - Replace
asyncpg-specificconnect_argskeys (statement_cache_size,server_settings) with psycopg3 equivalents (binary,optionsfor GUC parameters). - Remove any custom asyncpg type codec registrations — psycopg3 uses
libpqtext protocol which handles most types natively. - Test RETURNING behaviour — psycopg3 uses
cursor_factoryparameter for some advanced result mapping.
# Before — asyncpg
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
connect_args={
"statement_cache_size": 0,
"server_settings": {"application_name": "api"},
},
)
# After — psycopg3
engine = create_async_engine(
"postgresql+psycopg://user:pass@localhost/mydb",
connect_args={
"options": "-c application_name=api", # GUC via options string
},
)
For a detailed comparison of asyncpg vs psycopg3 async drivers, see choosing between asyncpg and psycopg async drivers.
Production Pitfalls & Anti-Patterns
- Forgetting
statement_cache_size=0behind PgBouncer transaction pooling: Results inInvalidSQLStatementNameErrorin production, often after the first prepared statement is evicted from the server. Fix: always set bothstatement_cache_size=0andprepared_statement_cache_size=0inconnect_argswhen using PgBouncer transaction mode. - Using
text()for JSON columns with asyncpg: Raw text queries bypass SQLAlchemy's type codec registration. asyncpg returns JSON/JSONB columns asstr. Fix: use mapped ORM columns withColumn(JSON)or calljson.loads()explicitly on raw query results. - aiosqlite with concurrent writes in tests:
asyncio.gather()over multiple write coroutines using separate sessions raisesOperationalError: database is locked. Fix: useStaticPoolorNullPoolwith aiosqlite in tests, or use a single shared session per test. - aiomysql implicit transaction left open: Forgetting
await session.commit()in a plainasync with AsyncSession()block (without.begin()) leaves an open transaction on the server. MySQL may hold row locks until the connection times out. Fix: always useasync with session.begin()or callawait session.commit()explicitly. - Mixing
:nameand$Nparameter styles: Dropping toengine.raw_connection()and using SQLAlchemy-style:nameparameters fails because asyncpg requires$1-style positional params. Fix: use$1,$2, ... in raw asyncpg calls, and:namestyle only through SQLAlchemy'stext(). - RETURNING on MySQL or old SQLite:
insert().returning()raisesCompileErroron MySQL andOperationalErroron SQLite < 3.35. Fix: usesession.add()+await session.flush()+await session.refresh()for portable cross-dialect insert-and-fetch patterns.
Driver Quirk Comparison Matrix
Frequently Asked Questions
Why do I see InvalidSQLStatementNameError only after several minutes of traffic, not immediately?
asyncpg fills its prepared statement LRU cache gradually. The error surfaces when a cached statement handle references a backend server that has been swapped out by PgBouncer's transaction pooling, which only happens once the pool has rotated connections under load. The fix — setting statement_cache_size=0 — must be applied before any connections are created; restarting the engine after the fact requires replacing the engine object entirely.
Does psycopg3 have the same PgBouncer problem as asyncpg?
Not with the default text protocol. psycopg3 only sends Parse messages in binary protocol mode, which can be disabled with binary=False. In the default text protocol mode, psycopg3 is fully compatible with PgBouncer transaction pooling without any extra configuration.
Can I use asyncpg's LISTEN/NOTIFY through SQLAlchemy's async session?
No. LISTEN/NOTIFY requires a dedicated long-lived connection that is not returned to the pool between events. You must obtain a raw asyncpg connection via await engine.raw_connection(), call await conn.add_listener(channel, callback), and hold that connection outside the pool for the lifetime of the listener.
My aiosqlite tests fail with database is locked when run in parallel. What's the fix?
Use StaticPool (single shared connection) or NullPool (a new connection per operation) with aiosqlite in test fixtures. StaticPool is preferable for in-memory databases (sqlite+aiosqlite:///:memory:) because it guarantees all operations share the same SQLite database instance. Never use asyncio.gather() across sessions that each open their own aiosqlite connection to the same file in write mode.
Why does insert().returning() work in development (SQLite 3.38) but fail in CI (SQLite 3.31)?RETURNING was added in SQLite 3.35. SQLite ships with the OS on many CI images; Ubuntu 20.04 ships 3.31, Ubuntu 22.04 ships 3.37. Fix: pin the SQLite version in CI by installing pysqlite3-binary, which bundles a recent SQLite build, or switch CI to a newer OS image.
Related
- Handling asyncpg Prepared Statement Errors with PgBouncer — step-by-step fix for
InvalidSQLStatementNameErrorandDuplicatePreparedStatementErrorbehind PgBouncer transaction pooling. - Choosing Between asyncpg and psycopg Async Drivers — performance benchmarks, feature matrix, and migration guidance between the two PostgreSQL async drivers.
- Selecting Async Drivers for SQLite, MySQL, and Postgres — when to use aiosqlite vs aiomysql vs asyncpg, with install commands and engine DSN examples.
- Tuning Connection Pools for Cloud Databases —
pool_size,max_overflow,pool_recycle, andpool_pre_pingsettings for RDS, CloudSQL, and PgBouncer-fronted clusters. - Async Engines, Dialects, and Connection Pooling — parent guide covering the full async engine stack from driver selection to pool configuration.