Routing Reads to Replicas with Async Engines
Route read queries to a PostgreSQL replica by creating a second AsyncEngine pointing at the replica host, then override get_bind() in a Session subclass used via AsyncSession(sync_session_class=...) to dispatch SELECT statements to the replica engine and all writes to the primary. This is the production-safe approach in Dynamic Schema and Multi-Tenant Routing, replacing the removed SQLAlchemy 1.4 Session(binds=...) API.
Quick Answer
# Legacy 1.4 pattern — removed in SQLAlchemy 2.0, do not use
# session = Session(binds={Order: replica_engine, Invoice: primary_engine})
# SQLAlchemy 2.0 — routing Session subclass
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
primary_engine = create_async_engine(
"postgresql+asyncpg://user:pass@primary.db.internal/saas_db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
replica_engine = create_async_engine(
"postgresql+asyncpg://user:pass@replica.db.internal/saas_db",
pool_size=30, # replicas typically absorb more concurrent reads
max_overflow=15,
pool_pre_ping=True,
)
class RoutingSession(Session):
"""Sends SELECT statements to the read replica, writes to the primary."""
def get_bind(self, mapper=None, clause=None, **kwargs):
if self._flushing or (clause is not None and clause.is_dml):
return primary_engine.sync_engine
return replica_engine.sync_engine
AsyncRoutingSession = async_sessionmaker(
class_=AsyncSession,
sync_session_class=RoutingSession,
expire_on_commit=False,
)
Execution Context & Async Workflow Integration
How get_bind() Interacts with the Async Layer
AsyncSession in SQLAlchemy 2.0 is a thin proxy over a synchronous Session. When AsyncSession.execute() is called, it runs the synchronous session machinery in a thread-safe greenlet context managed by SQLAlchemy's async bridge. get_bind() is called during this synchronous phase, so it has access to the full session state including self._flushing (set to True during autoflush and explicit session.flush() calls).
The returned engine must be a synchronous engine object (Engine, not AsyncEngine). SQLAlchemy's async layer wraps it transparently. Use async_engine.sync_engine to extract the underlying sync engine from an AsyncEngine:
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
# self._flushing is True during flush/commit write operations
if self._flushing:
return primary_engine.sync_engine
# clause is None for session.get() and relationship loading
if clause is None:
return replica_engine.sync_engine
# DML statements (INSERT, UPDATE, DELETE) go to primary
if clause.is_dml:
return primary_engine.sync_engine
# All SELECT queries go to replica
return replica_engine.sync_engine
Full Async Session Factory and Usage
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int]
total_cents: Mapped[int]
fulfilled: Mapped[bool] = mapped_column(default=False)
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
if self._flushing or (clause is not None and clause.is_dml):
return primary_engine.sync_engine
return replica_engine.sync_engine
AsyncRoutingSession = async_sessionmaker(
class_=AsyncSession,
sync_session_class=RoutingSession,
expire_on_commit=False,
)
async def get_open_orders(user_id: int) -> list[Order]:
"""Read query — transparently routed to replica."""
async with AsyncRoutingSession() as session:
stmt = select(Order).where(
Order.user_id == user_id,
Order.fulfilled == False,
)
result = await session.execute(stmt)
return result.scalars().all()
async def fulfill_order(order_id: int) -> None:
"""Write operation — routed to primary via self._flushing."""
from sqlalchemy import update
async with AsyncRoutingSession() as session:
async with session.begin():
stmt = (
update(Order)
.where(Order.id == order_id)
.values(fulfilled=True)
)
await session.execute(stmt)
# session.commit() called by context manager; _flushing=True during flush
Integrating Replica Routing with FastAPI
In a FastAPI application, expose two dependency functions — one yielding a read-optimized session, one for write sessions — so endpoint functions declare their intent explicitly:
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from contextlib import asynccontextmanager
ReadSession = async_sessionmaker(
class_=AsyncSession,
sync_session_class=RoutingSession, # routes SELECTs to replica
expire_on_commit=False,
)
WriteSession = async_sessionmaker(
class_=AsyncSession,
sync_session_class=RoutingSession,
expire_on_commit=False,
)
async def get_read_session():
async with ReadSession() as session:
yield session
async def get_write_session():
async with WriteSession() as session:
async with session.begin():
yield session
@app.get("/orders")
async def list_orders(
user_id: int,
session: AsyncSession = Depends(get_read_session),
):
from sqlalchemy import select
result = await session.execute(
select(Order).where(Order.user_id == user_id)
)
return result.scalars().all() # routes to replica automatically
@app.post("/orders")
async def create_order(
payload: dict,
session: AsyncSession = Depends(get_write_session),
):
from sqlalchemy import insert
await session.execute(insert(Order).values(**payload))
# transaction committed by context manager — routes to primary
This separation makes it trivially auditable which endpoints hit the primary and which hit the replica, and avoids accidental primary writes in read-only route handlers.
Configuring Both Engines for Production
The primary and replica engines need independent pool sizing. Reads typically outnumber writes by 5–10× on OLTP workloads, so the replica pool should be larger:
from sqlalchemy.ext.asyncio import create_async_engine
primary_engine = create_async_engine(
"postgresql+asyncpg://app:secret@primary.rds.amazonaws.com:5432/saas",
pool_size=15,
max_overflow=5,
pool_timeout=30,
pool_recycle=1800, # recycle before RDS 30-minute idle timeout
pool_pre_ping=True,
connect_args={
"server_settings": {"application_name": "saas-primary"},
},
)
replica_engine = create_async_engine(
"postgresql+asyncpg://app:secret@replica.rds.amazonaws.com:5432/saas",
pool_size=25,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
connect_args={
"server_settings": {"application_name": "saas-replica"},
},
)
The application_name server setting makes it easy to distinguish primary vs replica connections in pg_stat_activity and cloud database performance dashboards.
Resolving Warnings, Errors & Common Mistakes
| Error / Warning | Root Cause | Production Fix |
|---|---|---|
sqlalchemy.exc.InvalidRequestError: Could not locate a bind configured on mapper ... or this Session | get_bind() returned None or raised an exception; no fallback binding exists. | Ensure get_bind() always returns a valid sync engine. Add a catch-all return primary_engine.sync_engine as the final line. |
asyncpg.exceptions.ReadOnlySQLTransactionError: cannot execute INSERT in a read-only transaction | A write operation was dispatched to the replica engine, which is configured as read-only at the database level. | Verify clause.is_dml check is correct. Also check that self._flushing is evaluated before the DML check — autoflush during a relationship traversal can trigger writes unexpectedly. |
asyncpg.exceptions.ConnectionDoesNotExistError after idle period | The database server closed an idle TCP connection but the pool still held a handle to it. | Set pool_pre_ping=True on both engines. Also set pool_recycle to a value shorter than the server's tcp_keepalives_idle or cloud provider's idle connection timeout. |
sqlalchemy.exc.DetachedInstanceError: Instance <Order> is not bound to a Session | expire_on_commit=True (the default) expired attributes after commit; async code accessed them after the session closed. | Set expire_on_commit=False on the session factory. See the guide on expire_on_commit=False in FastAPI dependencies. |
| Reads returning stale data immediately after a write | Replica lag: the replica has not yet received or applied the WAL record for the write. | Do not read from the replica immediately after a write in the same request. Use session.get(Order, id) routed to the primary for the post-write read, or implement a short replica-lag check before switching to replica mode. |
MissingGreenlet: greenlet_spawn has not been called | get_bind() called an async function or triggered an async I/O operation inside the synchronous greenlet context. | Keep get_bind() entirely synchronous. Never await inside it. Resolve tenant-to-engine mapping from a pre-built in-memory dict, not from an async database lookup. |
Advanced Replica Routing Optimization
Connection Pool Sizing for Primary + Replica Topology
With a primary-plus-replica deployment, the total connection load on your PostgreSQL instances is (primary_pool_size + primary_max_overflow) + (replica_pool_size + replica_max_overflow) per application process. For a FastAPI app with four Uvicorn workers:
- Primary:
pool_size=10, max_overflow=5→ up to 60 connections per instance (4 workers × 15) - Replica:
pool_size=20, max_overflow=10→ up to 120 connections per instance (4 workers × 30)
Set max_connections on the replica at least to 120 + 20 (application headroom) plus connections from other services. For AWS RDS, max_connections defaults to approximately LEAST(DBInstanceClassMemory/9531392, 5000) — a db.t3.medium (2 GB RAM) gets roughly 170 connections, which is tight with the numbers above. Scale up the instance class or reduce per-worker pool sizes if you hit FATAL: remaining connection slots are reserved for non-replication superuser connections.
The pool_recycle parameter is critical for cloud-managed databases. AWS RDS drops idle connections after 8 minutes (480 seconds) by default. Set pool_recycle=300 on both engines to recycle before the server-side timeout, preventing asyncpg.exceptions.ConnectionDoesNotExistError on the first query after an idle period.
Opt-Out Routing for Replica-Lag-Sensitive Operations
Some reads must go to the primary — for example, reading an entity immediately after creating it to return it in the API response, or reading data that gates a financial transaction. Use a custom execution option as a routing hint:
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
# Check per-statement override first
use_primary = self.get_execution_options().get("use_primary", False)
if use_primary or self._flushing:
return primary_engine.sync_engine
if clause is not None and clause.is_dml:
return primary_engine.sync_engine
return replica_engine.sync_engine
# Usage: force read from primary for a specific query
async def get_order_post_create(session: AsyncSession, order_id: int) -> Order:
stmt = select(Order).where(Order.id == order_id)
result = await session.execute(
stmt,
execution_options={"use_primary": True},
)
return result.scalar_one()
This keeps routing logic in a single place while allowing per-call overrides without modifying the session factory.
Monitoring Replica Lag Before Routing
For applications where replica lag is measurable and consequential, query pg_stat_replication on the primary (or pg_last_wal_receive_lsn() on the replica) to gate replica usage:
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection
async def replica_lag_seconds(conn: AsyncConnection) -> float:
"""Run against the replica engine to measure current replication lag."""
result = await conn.execute(
text(
"SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float"
)
)
lag = result.scalar_one_or_none()
return lag if lag is not None else 0.0
async def get_engine_for_read(max_lag_seconds: float = 5.0):
async with replica_engine.connect() as conn:
lag = await replica_lag_seconds(conn)
if lag > max_lag_seconds:
return primary_engine
return replica_engine
In practice, this check adds a round-trip on every request. For most SaaS workloads, a simpler approach is to measure lag in a background health-check task every 10 seconds and store the result in a module-level variable that get_bind() reads.
Combining Replica Routing with schema_translate_map
Multi-tenant applications often need both: read-from-replica AND schema isolation. Stack the two patterns by reading the custom routing option from execution_options in get_bind(), while letting SQLAlchemy's built-in schema_translate_map handler rewrite table names independently:
class MultiTenantRoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
opts = self.get_execution_options()
if opts.get("use_primary") or self._flushing:
return primary_engine.sync_engine
if clause is not None and clause.is_dml:
return primary_engine.sync_engine
return replica_engine.sync_engine
async def fetch_tenant_orders_from_replica(
session: AsyncSession, tenant_schema: str, user_id: int
) -> list[Order]:
stmt = select(Order).where(Order.user_id == user_id)
result = await session.execute(
stmt,
execution_options={
"schema_translate_map": {None: tenant_schema},
# No use_primary flag → routes to replica
},
)
return result.scalars().all()
# Emits against replica: SELECT ... FROM acme.orders WHERE user_id = ?
For the full schema_translate_map wiring with FastAPI, see Switching Schemas per Request with schema_translate_map.
Frequently Asked Questions
Does get_bind() get called for every statement, including relationship loads?
Yes. Every SQL emission that goes through the Session calls get_bind(). This includes selectinload secondary queries, autoflush triggers, and session.get() identity map misses. Ensure get_bind() is fast — it should only consult an in-memory dict, not make I/O calls.
What happens if the replica is down?get_bind() returns the replica engine. When the connection attempt fails, asyncpg raises asyncpg.exceptions.CannotConnectNowError or OSError. The pool_pre_ping=True setting will detect dead connections on checkout and attempt to reconnect within the pool. If all replica connections fail, the pool raises sqlalchemy.exc.TimeoutError. To automatically fall back to the primary, wrap the execute call in a try/except and retry with execution_options={"use_primary": True}.
Can I use AsyncSession without sync_session_class for replica routing?
The sync_session_class approach is the canonical SQLAlchemy 2.0 pattern. An alternative is to skip the routing session entirely and explicitly select the engine per operation — async with primary_engine.connect() as conn for writes and async with replica_engine.connect() as conn for reads — at the cost of more verbose application code and no automatic routing.
How do I handle transactions that mix reads and writes?
Inside a session.begin() block, self._flushing becomes True when the transaction is flushed. However, reads that occur before the flush within the same transaction will still go to the replica. If you need transactional consistency (read your own writes), set execution_options={"use_primary": True} on the reads that must see the in-progress write, or execute the entire transaction against the primary engine directly.
Related
- Dynamic Schema and Multi-Tenant Routing — Parent guide covering schema isolation models, engine registries, and vertical partitioning.
- Switching Schemas per Request with schema_translate_map — Combine schema routing with replica routing for full multi-tenant isolation.
- Configuring Async Engines and Connection Pools — Pool sizing guidance for primary and replica engines under production load.
- Handling Connection Leaks and Pool Exhaustion — Diagnose and fix pool exhaustion when replica connections are not returned promptly.