Choosing Between asyncpg and psycopg3 Async Drivers for PostgreSQL
When architecting high-throughput Python services, selecting the correct PostgreSQL async driver is a foundational infrastructure decision that touches raw throughput, PgBouncer compatibility, COPY protocol access, and long-term ecosystem fit. This guide — part of the Async Engines, Dialects, and Connection Pooling reference — compares asyncpg and psycopg3 (the psycopg[async] package) across the axes that matter in production: execution model, type adaptation, prepared-statement caching behaviour, transaction-mode pooler compatibility, and bulk data operations.
Concept & Execution Model
Both drivers expose asyncio-native coroutines and integrate with SQLAlchemy 2.0 through dialect adapters (postgresql+asyncpg:// and postgresql+psycopg://). Beneath that unified ORM surface, however, their internals diverge in ways that produce measurably different behaviour under load.
asyncpg implements a pure-Python, native-async PostgreSQL frontend. It bypasses libpq entirely, speaking the PostgreSQL wire protocol directly over a asyncio transport. This architecture gives asyncpg full control of binary encoding, connection state, and pipeline batching. It maintains a per-connection LRU cache of prepared statements (100 entries by default) and uses the binary protocol for all types that support it — integers, floats, timestamps, JSONB, UUID — skipping the text-serialisation round-trip that dominates CPU time in high-query-rate workloads.
psycopg3 (psycopg[async]) wraps libpq, PostgreSQL's official C client library, behind an asyncio-compatible interface. Instead of owning the socket, psycopg3 registers the libpq socket with loop.add_reader() / loop.add_writer() and awaits readiness events. The libpq state machine then handles the actual protocol exchange. This approach inherits decades of correctness guarantees and edge-case handling from the official client, and it supports features — notably client-side COPY streaming and pipeline mode — that asyncpg has only partial equivalents for.
Scheduling overhead: asyncpg's coroutines yield at genuine I/O boundaries without intermediate C-extension transitions, giving asyncio's event loop precise scheduling control. psycopg3 inserts a libpq dispatch layer between the socket event and Python coroutine resumption, which adds a small but measurable scheduling jitter (typically 50–150 µs per round-trip under load) compared to asyncpg's direct-to-socket model.
SQLAlchemy dialect registration:
# asyncpg dialect
from sqlalchemy.ext.asyncio import create_async_engine
engine_asyncpg = create_async_engine(
"postgresql+asyncpg://app:secret@db:5432/production",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
echo=False,
)
# psycopg3 dialect (package: psycopg[async] or psycopg[binary])
engine_psycopg = create_async_engine(
"postgresql+psycopg://app:secret@db:5432/production",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
)
Both engines expose an identical AsyncEngine / AsyncSession interface to application code. Driver selection is a one-line URL change from SQLAlchemy's perspective; the production differences emerge at the protocol and pooler layers examined below.
Query Construction & Async Execution Patterns
SQLAlchemy 2.0's select() / execute() / scalars() pipeline works identically with both dialects. The driver becomes visible only when you inspect prepared-statement behaviour, binary encoding, and the rare case where you drop below the ORM to use driver-specific APIs.
Binary protocol and type encoding
asyncpg defaults to binary transfer for all supported types. The practical effect is zero string-parsing overhead for INTEGER, BIGINT, FLOAT, TIMESTAMP WITH TIME ZONE, UUID, JSONB, and arrays of those types. Under workloads that read thousands of rows per second, this typically reduces per-query CPU time by 15–30% compared to text-protocol drivers at equivalent concurrency.
psycopg3 supports binary protocol through psycopg[binary] and the autocommit-aware cursor configuration, but the default remains text. Enabling binary mode requires connect_args={"binary": True} or constructing a psycopg.Connection with the binary flag. The upshot: asyncpg is binary by default and pays no configuration cost for it; psycopg3 requires explicit opt-in.
Executing parameterised queries with both drivers
import asyncio
from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
AsyncSessionFactory = async_sessionmaker(
engine_asyncpg, # swap for engine_psycopg — code is identical
class_=AsyncSession,
expire_on_commit=False,
)
async def get_active_orders(session: AsyncSession, tenant_id: int) -> list:
"""
Retrieves active orders with their totals.
asyncpg transmits tenant_id as binary INTEGER;
psycopg3 (default) sends it as text '42'.
Both return Python objects via SQLAlchemy's type system.
"""
stmt = (
select(
Order.id,
Order.reference,
func.sum(OrderLine.amount).label("total"),
)
.join(OrderLine, OrderLine.order_id == Order.id)
.where(Order.tenant_id == tenant_id)
.where(Order.status == "active")
.group_by(Order.id, Order.reference)
.order_by(func.sum(OrderLine.amount).desc())
)
result = await session.execute(stmt)
return result.all()
async def main() -> None:
async with AsyncSessionFactory() as session:
rows = await get_active_orders(session, tenant_id=42)
for row in rows:
print(f"Order {row.reference}: £{row.total:.2f}")
asyncio.run(main())
State Management & Session Boundaries
Session lifecycle is driver-independent at the SQLAlchemy API level, but driver-specific caching and connection state create subtle differences worth understanding before you commit to a choice.
Prepared statement caching and session reuse
asyncpg's 100-entry LRU cache lives on the physical connection object, not the logical session. That means every AsyncSession that checks out a connection from the pool inherits the cache accumulated by previous sessions that used that physical connection. For services with a small set of parameterised query shapes — typical of CRUD APIs — this is pure gain: the database never re-parses the same statement template. For services with highly dynamic query shapes (multi-tenant filter builders, analytics dashboards with user-supplied dimensions), the cache can fill with low-reuse entries, creating mild memory pressure. Tune with connect_args={"statement_cache_size": 0} to disable it or {"statement_cache_size": 500} to extend it.
psycopg3 exposes prepare_threshold (default: 5) — after a statement is executed five times on the same connection, psycopg3 promotes it to a server-side prepared statement. This threshold-based model is more conservative than asyncpg's aggressive upfront caching, which suits workloads with genuinely heterogeneous query shapes.
# asyncpg: tuning the prepared-statement cache
engine_asyncpg_tuned = create_async_engine(
"postgresql+asyncpg://app:secret@db:5432/production",
connect_args={
"statement_cache_size": 200, # reduce from default 100 for microservices
"command_timeout": 30, # per-query timeout in seconds
},
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
# psycopg3: tune prepare threshold and row factory
engine_psycopg_tuned = create_async_engine(
"postgresql+psycopg://app:secret@db:5432/production",
connect_args={
"prepare_threshold": 10, # promote to prepared after 10 executions
"autocommit": False, # SQLAlchemy manages transactions
},
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
Transaction boundaries in async sessions
Both drivers respect SQLAlchemy 2.0's explicit transaction semantics. Always use async with session.begin(): for multi-statement transactions so that any exception triggers automatic rollback:
from sqlalchemy.exc import IntegrityError
async def transfer_credits(
session: AsyncSession,
from_account_id: int,
to_account_id: int,
amount: int,
) -> None:
async with session.begin():
# Both statements execute in the same transaction.
# If the second raises IntegrityError, SQLAlchemy rolls back automatically.
await session.execute(
Invoice.__table__.update()
.where(Invoice.account_id == from_account_id)
.values(balance=Invoice.balance - amount)
)
await session.execute(
Invoice.__table__.update()
.where(Invoice.account_id == to_account_id)
.values(balance=Invoice.balance + amount)
)
Advanced Driver-Specific Patterns
PgBouncer compatibility: the critical distinction
PgBouncer's transaction pooling mode — the configuration used by virtually every managed Postgres service (AWS RDS Proxy, Supabase, Neon, Railway) — assigns a server connection only for the duration of a transaction, then returns it to the pool. This conflicts with session-level state: SET commands, temporary tables, named prepared statements, and PostgreSQL advisory locks all survive the session but not the server connection, causing silent data corruption or statement-not-found errors.
asyncpg's server-side prepared statement cache is the most common casualty. When a connection is returned to PgBouncer after a transaction and then re-issued to a different client session, the named prepared statement from the previous session no longer exists on the server — but asyncpg still holds its local reference. The result is asyncpg.exceptions.InvalidSQLStatementNameError: prepared statement "..." does not exist.
Fix: disable asyncpg's prepared-statement cache when using PgBouncer transaction mode:
engine_pgbouncer = create_async_engine(
"postgresql+asyncpg://app:secret@pgbouncer:6432/production",
connect_args={
"statement_cache_size": 0, # REQUIRED for PgBouncer transaction mode
"prepared_statement_cache_size": 0, # asyncpg >= 0.28 alternative key
},
pool_size=5, # PgBouncer multiplexes; keep SQLAlchemy pool small
max_overflow=0, # No overflow — PgBouncer handles multiplexing
pool_pre_ping=True,
)
psycopg3 handles this more gracefully because prepare_threshold allows you to set it to None (disabling server-side preparation entirely) while retaining client-side parameterisation. psycopg3 also integrates with PgBouncer's newer DISCARD ALL semantics through its reset_session hook.
engine_psycopg_pgbouncer = create_async_engine(
"postgresql+psycopg://app:secret@pgbouncer:6432/production",
connect_args={
"prepare_threshold": None, # Disable server-side prepared statements
},
pool_size=5,
max_overflow=0,
pool_pre_ping=True,
)
The broader guide to dialect-specific gotchas and driver quirks covers additional PgBouncer edge cases including advisory locks and temporary tables.
COPY protocol support
PostgreSQL's COPY protocol is the fastest mechanism for bulk data ingestion — 5–10× faster than batched INSERT for large datasets. The two drivers expose this capability differently.
psycopg3 has first-class COPY support via cursor.copy(), which integrates with async generators to stream rows without materialising the full dataset in memory:
import asyncio
import psycopg
from typing import AsyncIterator
async def product_rows() -> AsyncIterator[tuple]:
"""Yield (sku, name, price, tenant_id) tuples from an external source."""
for i in range(1_000_000):
yield (f"SKU-{i:08d}", f"Product {i}", 9.99 + i * 0.01, 1)
async def bulk_load_products_psycopg(dsn: str) -> None:
async with await psycopg.AsyncConnection.connect(dsn) as conn:
async with conn.cursor() as cur:
async with cur.copy(
"COPY products (sku, name, price, tenant_id) FROM STDIN"
) as copy:
async for row in product_rows():
await copy.write_row(row)
print("COPY complete")
asyncio.run(
bulk_load_products_psycopg("postgresql://app:secret@db:5432/production")
)
asyncpg exposes COPY through Connection.copy_to_table() and Connection.copy_from_query(), but these bypass SQLAlchemy's session layer entirely — you must acquire a raw asyncpg connection from the pool:
import asyncio
import asyncpg
from sqlalchemy.ext.asyncio import AsyncEngine
async def bulk_load_products_asyncpg(engine: AsyncEngine) -> None:
async with engine.connect() as conn:
# Unwrap the SQLAlchemy async connection to reach the raw asyncpg connection
raw_conn = await conn.get_raw_connection()
asyncpg_conn = raw_conn.driver_connection
records = [
(f"SKU-{i:08d}", f"Product {i}", 9.99 + i * 0.01, 1)
for i in range(1_000_000)
]
await asyncpg_conn.copy_records_to_table(
"products",
records=records,
columns=["sku", "name", "price", "tenant_id"],
)
If your application's primary workload involves high-volume COPY-based ingestion, psycopg3's native integration avoids the abstraction leak required with asyncpg.
Custom type adaptation
Both drivers allow custom Python ↔ PostgreSQL type mappings, but the APIs differ.
asyncpg uses codec registration on the connection or at the pool level. SQLAlchemy surfaces this through TypeDecorator plus connect_args event hooks:
from sqlalchemy import event, TypeDecorator, String
from sqlalchemy.dialects.postgresql import UUID as PG_UUID
import uuid
class SafeUUID(TypeDecorator):
"""Stores UUIDs as native PG UUID; normalises Python str/UUID inputs."""
impl = PG_UUID
cache_ok = True
def process_bind_param(self, value, dialect):
if isinstance(value, str):
return uuid.UUID(value)
return value
def process_result_value(self, value, dialect):
return str(value) if value is not None else None
# Register asyncpg-level codec for custom Postgres composite types
@event.listens_for(engine_asyncpg, "connect")
def register_custom_codecs(dbapi_connection, connection_record):
# dbapi_connection is an asyncpg.Connection wrapped by the dialect adapter
# Use the event to call asyncpg.Connection.set_type_codec() for composite types
pass # Replace with actual asyncpg.Connection.set_type_codec() calls
psycopg3 uses a Loader/Dumper registration model that maps Python types to PostgreSQL OIDs at the connection or cursor level. The psycopg3 SQLAlchemy dialect exposes this through connect_args and the do_connect event:
from sqlalchemy import event
import psycopg.types.json
@event.listens_for(engine_psycopg, "connect")
def configure_psycopg_types(dbapi_connection, connection_record):
# Enable JSONB binary loading via psycopg3's type system
psycopg.types.json.set_json_loads(
loads=lambda s: __import__("orjson").loads(s),
context=dbapi_connection,
)
Hybrid Architectures & Migration Strategies
Migrating from asyncpg to psycopg3 (or vice versa)
SQLAlchemy 2.0's dialect abstraction makes driver swaps low-risk for most applications. The migration checklist:
- Update the dependency:
pip install psycopg[async](orasyncpg). - Change the URL scheme:
postgresql+asyncpg://→postgresql+psycopg://. - Audit
connect_args: asyncpg keys likestatement_cache_sizeandcommand_timeoutare driver-specific; replace with psycopg3 equivalents (prepare_threshold,connect_timeout). - Review custom type codecs: asyncpg
set_type_codec()calls must be rewritten as psycopg3Loader/Dumperregistrations. - Test COPY paths: if you use raw COPY, the API differs (see above).
- Benchmark: run your realistic query mix under load — don't assume one driver is faster for your specific workload without measurement.
Running both drivers in the same application
Some architectures benefit from using asyncpg for the high-throughput OLTP path while using psycopg3 for the bulk-load / reporting path (where COPY integration is cleaner). SQLAlchemy supports multiple engines in a single application — just keep them in separate AsyncSession factories:
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
# OLTP engine: asyncpg for low-latency parameterised queries
oltp_engine = create_async_engine(
"postgresql+asyncpg://app:secret@primary-db:5432/production",
pool_size=30,
max_overflow=10,
pool_pre_ping=True,
)
# Bulk engine: psycopg3 for COPY-based ingestion jobs
bulk_engine = create_async_engine(
"postgresql+psycopg://etl:secret@replica-db:5432/production",
pool_size=5,
max_overflow=0,
pool_pre_ping=True,
)
OLTPSession = async_sessionmaker(oltp_engine, class_=AsyncSession, expire_on_commit=False)
BulkSession = async_sessionmaker(bulk_engine, class_=AsyncSession, expire_on_commit=False)
This pattern also applies when routing reads to replicas — the async-driver-agnostic guide to routing reads to replicas covers the engine-per-replica strategy in detail.
For background worker processes that need async SQLAlchemy inside Celery tasks, see the dedicated guide on using SQLAlchemy async with Celery task workers, which covers asyncio.run() per task, NullPool for prefork workers, and asyncpg fork-safety pitfalls.
If your evaluation extends to SQLite for test environments or MySQL for legacy services, the guide on selecting async drivers for SQLite, MySQL, and Postgres completes the picture.
Decision matrix summary
Production Pitfalls & Anti-Patterns
- Not disabling asyncpg's prepared-statement cache behind PgBouncer. Transaction-mode pooling recycles server connections between clients; asyncpg's cached statement handles refer to server-side state that no longer exists on the reassigned connection. The symptom is
asyncpg.exceptions.InvalidSQLStatementNameError. Always setstatement_cache_size=0inconnect_argswhen the target is PgBouncer or RDS Proxy in transaction mode. - Mixing a synchronous
psycopg2engine with an asyncasyncpgengine in the same event loop. The sync engine's blockingconnect()andexecute()calls freeze the event loop. Keep them in separate processes or userun_in_executor()exclusively for the sync path — but prefer eliminating the sync engine entirely. - Setting
pool_sizehigher than Postgres'smax_connections. Each physical asyncpg or psycopg3 connection occupies a server backend process. With defaultmax_connections=100, an application deploying 5 worker processes each withpool_size=25saturates the server. Either use PgBouncer as a multiplexer or keeppool_size × workers ≤ max_connections × 0.8(reserving headroom for migrations and admin queries). - Using asyncpg's COPY API without releasing the raw connection back to the pool. Holding a raw
asyncpg.Connectionoutside a context manager bypasses pool accounting. Useasync with engine.connect() as conn:andconn.get_raw_connection()within the same block so the pool reclaims the connection on exit. - Ignoring
pool_pre_pingin cloud environments. Cloud load balancers (ALB, Google Cloud LB) silently drop idle TCP connections after 350–600 seconds. Withoutpool_pre_ping=True, the first query on a stale connection raisesasyncpg.exceptions.ConnectionDoesNotExistErrororpsycopg.OperationalErrorinstead of transparently reconnecting. - Running asyncpg in prefork workers without
NullPool. asyncpg connections are not fork-safe. If a parent process holds open connections beforeos.fork(), the child inherits corrupted connection state. When running async SQLAlchemy inside Celery prefork workers, either useNullPooland create a fresh engine per-task, or defer engine creation to theworker_process_initsignal. The guide on Celery integration covers both approaches in detail.
Frequently Asked Questions
Does asyncpg support prepared statement caching with PgBouncer?
Not in transaction-pooling mode. asyncpg's prepared statements are named server-side objects; PgBouncer returns the server connection to the pool after each transaction, discarding session-level state including prepared statements. The fix is connect_args={"statement_cache_size": 0}. Session-pooling mode (not recommended for performance) would retain session state, but defeats the purpose of a connection pooler.
Can I switch from asyncpg to psycopg3 without rewriting queries?
Yes, for nearly all SQLAlchemy-mediated queries. Change the URL prefix from postgresql+asyncpg:// to postgresql+psycopg://, update connect_args keys (asyncpg uses statement_cache_size, psycopg3 uses prepare_threshold), and rewrite any driver-level COPY or codec registration code. The ORM query layer — select(), execute(), scalars() — is identical.
Which driver is faster for high-concurrency OLTP workloads? asyncpg consistently outperforms psycopg3 on parameterised, repetitive queries (the typical API backend pattern) because of its binary-by-default protocol and native-async scheduling. Benchmarks on read-heavy workloads with 50+ concurrent coroutines typically show asyncpg at 20–35% higher throughput. For write-heavy batch ingestion using COPY, psycopg3's first-class cursor API avoids the raw-connection extraction overhead and often matches or exceeds asyncpg's COPY throughput.
Is psycopg3 easier to migrate to from a psycopg2 codebase?
Yes. psycopg3's synchronous interface is intentionally similar to psycopg2's, and many projects migrate incrementally by replacing psycopg2 with psycopg[async] while keeping the same cursor and transaction semantics. asyncpg's API is fundamentally different (no cursor objects, codec-based type adaptation) and requires more targeted migration effort.
How does asyncpg handle PostgreSQL NOTIFY / LISTEN compared to psycopg3?
asyncpg has direct await conn.add_listener(channel, callback) support that integrates naturally with asyncio event loops. psycopg3 requires explicit polling of conn.notifies() in an async for loop. For reactive notification pipelines, asyncpg's listener API is more ergonomic; for workloads that already use psycopg3 for queries, the polling model works reliably.
Related
- Async Engines, Dialects, and Connection Pooling — parent reference covering the full async engine lifecycle.
- Using SQLAlchemy Async with Celery Task Workers — event-loop-per-task patterns, NullPool for prefork workers, and asyncpg fork-safety.
- Configuring Async Engines and Connection Pools — pool_size, max_overflow, pool_recycle, and pre-ping tuning for production workloads.
- Dialect-Specific Gotchas and Driver Quirks — PgBouncer advisory locks, temporary tables, and other session-state hazards.
- Selecting Async Drivers for SQLite, MySQL, and Postgres — broader driver landscape including aiosqlite and aiomysql.