Routing Reads to Replicas with Async Engines

Route read queries to a PostgreSQL replica by creating a second AsyncEngine pointing at the replica host, then override get_bind() in a Session subclass used via AsyncSession(sync_session_class=...) to dispatch SELECT statements to the replica engine and all writes to the primary. This is the production-safe approach in Dynamic Schema and Multi-Tenant Routing, replacing the removed SQLAlchemy 1.4 Session(binds=...) API.

Quick Answer

# Legacy 1.4 pattern — removed in SQLAlchemy 2.0, do not use
# session = Session(binds={Order: replica_engine, Invoice: primary_engine})

# SQLAlchemy 2.0 — routing Session subclass
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker

primary_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@primary.db.internal/saas_db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)

replica_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@replica.db.internal/saas_db",
    pool_size=30,   # replicas typically absorb more concurrent reads
    max_overflow=15,
    pool_pre_ping=True,
)


class RoutingSession(Session):
    """Sends SELECT statements to the read replica, writes to the primary."""

    def get_bind(self, mapper=None, clause=None, **kwargs):
        if self._flushing or (clause is not None and clause.is_dml):
            return primary_engine.sync_engine
        return replica_engine.sync_engine


AsyncRoutingSession = async_sessionmaker(
    class_=AsyncSession,
    sync_session_class=RoutingSession,
    expire_on_commit=False,
)

Execution Context & Async Workflow Integration

How get_bind() Interacts with the Async Layer

AsyncSession in SQLAlchemy 2.0 is a thin proxy over a synchronous Session. When AsyncSession.execute() is called, it runs the synchronous session machinery in a thread-safe greenlet context managed by SQLAlchemy's async bridge. get_bind() is called during this synchronous phase, so it has access to the full session state including self._flushing (set to True during autoflush and explicit session.flush() calls).

The returned engine must be a synchronous engine object (Engine, not AsyncEngine). SQLAlchemy's async layer wraps it transparently. Use async_engine.sync_engine to extract the underlying sync engine from an AsyncEngine:

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kwargs):
        # self._flushing is True during flush/commit write operations
        if self._flushing:
            return primary_engine.sync_engine
        # clause is None for session.get() and relationship loading
        if clause is None:
            return replica_engine.sync_engine
        # DML statements (INSERT, UPDATE, DELETE) go to primary
        if clause.is_dml:
            return primary_engine.sync_engine
        # All SELECT queries go to replica
        return replica_engine.sync_engine

Full Async Session Factory and Usage

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    total_cents: Mapped[int]
    fulfilled: Mapped[bool] = mapped_column(default=False)


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kwargs):
        if self._flushing or (clause is not None and clause.is_dml):
            return primary_engine.sync_engine
        return replica_engine.sync_engine


AsyncRoutingSession = async_sessionmaker(
    class_=AsyncSession,
    sync_session_class=RoutingSession,
    expire_on_commit=False,
)


async def get_open_orders(user_id: int) -> list[Order]:
    """Read query — transparently routed to replica."""
    async with AsyncRoutingSession() as session:
        stmt = select(Order).where(
            Order.user_id == user_id,
            Order.fulfilled == False,
        )
        result = await session.execute(stmt)
        return result.scalars().all()


async def fulfill_order(order_id: int) -> None:
    """Write operation — routed to primary via self._flushing."""
    from sqlalchemy import update

    async with AsyncRoutingSession() as session:
        async with session.begin():
            stmt = (
                update(Order)
                .where(Order.id == order_id)
                .values(fulfilled=True)
            )
            await session.execute(stmt)
            # session.commit() called by context manager; _flushing=True during flush

Integrating Replica Routing with FastAPI

In a FastAPI application, expose two dependency functions — one yielding a read-optimized session, one for write sessions — so endpoint functions declare their intent explicitly:

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from contextlib import asynccontextmanager

ReadSession = async_sessionmaker(
    class_=AsyncSession,
    sync_session_class=RoutingSession,  # routes SELECTs to replica
    expire_on_commit=False,
)

WriteSession = async_sessionmaker(
    class_=AsyncSession,
    sync_session_class=RoutingSession,
    expire_on_commit=False,
)


async def get_read_session():
    async with ReadSession() as session:
        yield session


async def get_write_session():
    async with WriteSession() as session:
        async with session.begin():
            yield session


@app.get("/orders")
async def list_orders(
    user_id: int,
    session: AsyncSession = Depends(get_read_session),
):
    from sqlalchemy import select
    result = await session.execute(
        select(Order).where(Order.user_id == user_id)
    )
    return result.scalars().all()  # routes to replica automatically


@app.post("/orders")
async def create_order(
    payload: dict,
    session: AsyncSession = Depends(get_write_session),
):
    from sqlalchemy import insert
    await session.execute(insert(Order).values(**payload))
    # transaction committed by context manager — routes to primary

This separation makes it trivially auditable which endpoints hit the primary and which hit the replica, and avoids accidental primary writes in read-only route handlers.

Configuring Both Engines for Production

The primary and replica engines need independent pool sizing. Reads typically outnumber writes by 5–10× on OLTP workloads, so the replica pool should be larger:

from sqlalchemy.ext.asyncio import create_async_engine

primary_engine = create_async_engine(
    "postgresql+asyncpg://app:secret@primary.rds.amazonaws.com:5432/saas",
    pool_size=15,
    max_overflow=5,
    pool_timeout=30,
    pool_recycle=1800,   # recycle before RDS 30-minute idle timeout
    pool_pre_ping=True,
    connect_args={
        "server_settings": {"application_name": "saas-primary"},
    },
)

replica_engine = create_async_engine(
    "postgresql+asyncpg://app:secret@replica.rds.amazonaws.com:5432/saas",
    pool_size=25,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,
    connect_args={
        "server_settings": {"application_name": "saas-replica"},
    },
)

The application_name server setting makes it easy to distinguish primary vs replica connections in pg_stat_activity and cloud database performance dashboards.

Resolving Warnings, Errors & Common Mistakes

Error / WarningRoot CauseProduction Fix
sqlalchemy.exc.InvalidRequestError: Could not locate a bind configured on mapper ... or this Sessionget_bind() returned None or raised an exception; no fallback binding exists.Ensure get_bind() always returns a valid sync engine. Add a catch-all return primary_engine.sync_engine as the final line.
asyncpg.exceptions.ReadOnlySQLTransactionError: cannot execute INSERT in a read-only transactionA write operation was dispatched to the replica engine, which is configured as read-only at the database level.Verify clause.is_dml check is correct. Also check that self._flushing is evaluated before the DML check — autoflush during a relationship traversal can trigger writes unexpectedly.
asyncpg.exceptions.ConnectionDoesNotExistError after idle periodThe database server closed an idle TCP connection but the pool still held a handle to it.Set pool_pre_ping=True on both engines. Also set pool_recycle to a value shorter than the server's tcp_keepalives_idle or cloud provider's idle connection timeout.
sqlalchemy.exc.DetachedInstanceError: Instance <Order> is not bound to a Sessionexpire_on_commit=True (the default) expired attributes after commit; async code accessed them after the session closed.Set expire_on_commit=False on the session factory. See the guide on expire_on_commit=False in FastAPI dependencies.
Reads returning stale data immediately after a writeReplica lag: the replica has not yet received or applied the WAL record for the write.Do not read from the replica immediately after a write in the same request. Use session.get(Order, id) routed to the primary for the post-write read, or implement a short replica-lag check before switching to replica mode.
MissingGreenlet: greenlet_spawn has not been calledget_bind() called an async function or triggered an async I/O operation inside the synchronous greenlet context.Keep get_bind() entirely synchronous. Never await inside it. Resolve tenant-to-engine mapping from a pre-built in-memory dict, not from an async database lookup.

Advanced Replica Routing Optimization

Connection Pool Sizing for Primary + Replica Topology

With a primary-plus-replica deployment, the total connection load on your PostgreSQL instances is (primary_pool_size + primary_max_overflow) + (replica_pool_size + replica_max_overflow) per application process. For a FastAPI app with four Uvicorn workers:

  • Primary: pool_size=10, max_overflow=5 → up to 60 connections per instance (4 workers × 15)
  • Replica: pool_size=20, max_overflow=10 → up to 120 connections per instance (4 workers × 30)

Set max_connections on the replica at least to 120 + 20 (application headroom) plus connections from other services. For AWS RDS, max_connections defaults to approximately LEAST(DBInstanceClassMemory/9531392, 5000) — a db.t3.medium (2 GB RAM) gets roughly 170 connections, which is tight with the numbers above. Scale up the instance class or reduce per-worker pool sizes if you hit FATAL: remaining connection slots are reserved for non-replication superuser connections.

The pool_recycle parameter is critical for cloud-managed databases. AWS RDS drops idle connections after 8 minutes (480 seconds) by default. Set pool_recycle=300 on both engines to recycle before the server-side timeout, preventing asyncpg.exceptions.ConnectionDoesNotExistError on the first query after an idle period.

Opt-Out Routing for Replica-Lag-Sensitive Operations

Some reads must go to the primary — for example, reading an entity immediately after creating it to return it in the API response, or reading data that gates a financial transaction. Use a custom execution option as a routing hint:

from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import AsyncSession


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kwargs):
        # Check per-statement override first
        use_primary = self.get_execution_options().get("use_primary", False)
        if use_primary or self._flushing:
            return primary_engine.sync_engine
        if clause is not None and clause.is_dml:
            return primary_engine.sync_engine
        return replica_engine.sync_engine


# Usage: force read from primary for a specific query
async def get_order_post_create(session: AsyncSession, order_id: int) -> Order:
    stmt = select(Order).where(Order.id == order_id)
    result = await session.execute(
        stmt,
        execution_options={"use_primary": True},
    )
    return result.scalar_one()

This keeps routing logic in a single place while allowing per-call overrides without modifying the session factory.

Monitoring Replica Lag Before Routing

For applications where replica lag is measurable and consequential, query pg_stat_replication on the primary (or pg_last_wal_receive_lsn() on the replica) to gate replica usage:

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection


async def replica_lag_seconds(conn: AsyncConnection) -> float:
    """Run against the replica engine to measure current replication lag."""
    result = await conn.execute(
        text(
            "SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))::float"
        )
    )
    lag = result.scalar_one_or_none()
    return lag if lag is not None else 0.0


async def get_engine_for_read(max_lag_seconds: float = 5.0):
    async with replica_engine.connect() as conn:
        lag = await replica_lag_seconds(conn)
    if lag > max_lag_seconds:
        return primary_engine
    return replica_engine

In practice, this check adds a round-trip on every request. For most SaaS workloads, a simpler approach is to measure lag in a background health-check task every 10 seconds and store the result in a module-level variable that get_bind() reads.

Combining Replica Routing with schema_translate_map

Multi-tenant applications often need both: read-from-replica AND schema isolation. Stack the two patterns by reading the custom routing option from execution_options in get_bind(), while letting SQLAlchemy's built-in schema_translate_map handler rewrite table names independently:

class MultiTenantRoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kwargs):
        opts = self.get_execution_options()
        if opts.get("use_primary") or self._flushing:
            return primary_engine.sync_engine
        if clause is not None and clause.is_dml:
            return primary_engine.sync_engine
        return replica_engine.sync_engine


async def fetch_tenant_orders_from_replica(
    session: AsyncSession, tenant_schema: str, user_id: int
) -> list[Order]:
    stmt = select(Order).where(Order.user_id == user_id)
    result = await session.execute(
        stmt,
        execution_options={
            "schema_translate_map": {None: tenant_schema},
            # No use_primary flag → routes to replica
        },
    )
    return result.scalars().all()
    # Emits against replica: SELECT ... FROM acme.orders WHERE user_id = ?

For the full schema_translate_map wiring with FastAPI, see Switching Schemas per Request with schema_translate_map.

Frequently Asked Questions

Does get_bind() get called for every statement, including relationship loads? Yes. Every SQL emission that goes through the Session calls get_bind(). This includes selectinload secondary queries, autoflush triggers, and session.get() identity map misses. Ensure get_bind() is fast — it should only consult an in-memory dict, not make I/O calls.

What happens if the replica is down?get_bind() returns the replica engine. When the connection attempt fails, asyncpg raises asyncpg.exceptions.CannotConnectNowError or OSError. The pool_pre_ping=True setting will detect dead connections on checkout and attempt to reconnect within the pool. If all replica connections fail, the pool raises sqlalchemy.exc.TimeoutError. To automatically fall back to the primary, wrap the execute call in a try/except and retry with execution_options={"use_primary": True}.

Can I use AsyncSession without sync_session_class for replica routing? The sync_session_class approach is the canonical SQLAlchemy 2.0 pattern. An alternative is to skip the routing session entirely and explicitly select the engine per operation — async with primary_engine.connect() as conn for writes and async with replica_engine.connect() as conn for reads — at the cost of more verbose application code and no automatic routing.

How do I handle transactions that mix reads and writes? Inside a session.begin() block, self._flushing becomes True when the transaction is flushed. However, reads that occur before the flush within the same transaction will still go to the replica. If you need transactional consistency (read your own writes), set execution_options={"use_primary": True} on the reads that must see the in-progress write, or execute the entire transaction against the primary engine directly.