Dynamic Schema and Multi-Tenant Routing in SQLAlchemy 2.0
SQLAlchemy 2.0 provides first-class primitives for directing every database operation to the correct schema or engine at runtime, without restructuring your ORM models. Within the broader Advanced Query Patterns and Bulk Data Operations discipline, dynamic schema routing sits at the intersection of connection pool management, transaction boundary design, and security isolation — making it one of the highest-stakes architectural decisions in a multi-tenant Python backend.
Concept & Execution Model
SQLAlchemy 2.0's unified execution model distinguishes sharply between where a query runs (which engine/connection) and which schema it targets (which set of tables the SQL names resolve against). Multi-tenant systems exploit both dimensions independently or together.
schema_translate_map is the central primitive for schema-per-tenant isolation. When applied via execution_options(schema_translate_map={None: "acme"}), SQLAlchemy rewrites every unqualified table reference in the compiled SQL — INSERT INTO users becomes INSERT INTO acme.users — without altering model definitions. The map key is the schema name as declared in the model (__table_args__ = {"schema": None} meaning "default schema") and the value is the runtime target schema.
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/saas_db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[int] = mapped_column(primary_key=True)
slug: Mapped[str] = mapped_column(unique=True)
schema_name: Mapped[str]
class Invoice(Base):
# schema=None means "default" — schema_translate_map will replace None at runtime
__tablename__ = "invoices"
__table_args__ = {"schema": None}
id: Mapped[int] = mapped_column(primary_key=True)
tenant_id: Mapped[int]
amount_cents: Mapped[int]
paid: Mapped[bool] = mapped_column(default=False)
At query time, the session or connection receives execution_options that carry the per-request schema mapping:
async def fetch_invoices_for_tenant(
session: AsyncSession, schema_name: str
) -> list[Invoice]:
stmt = select(Invoice).where(Invoice.paid == False)
# schema_translate_map rewrites {None} → schema_name in the emitted SQL
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: schema_name}},
)
return result.scalars().all()
This compiles to SELECT invoices.id, ... FROM acme.invoices WHERE invoices.paid = false — the ORM model itself never changes.
Query Construction & Async Execution Patterns
There are three levels at which you can apply schema_translate_map, each with different scope and lifetime:
| Level | API | Lifetime | Use case |
|---|---|---|---|
| Engine | engine.execution_options(schema_translate_map=...) | All connections from this engine | Single-tenant applications |
| Connection | conn.execution_options(schema_translate_map=...) | Single connection checkout | Fine-grained per-connection control |
| Session (via execute) | Pass as execution_options kwarg to session.execute() | Single statement | Per-request routing (recommended for async) |
For async production code, applying the map at the session.execute() call is the cleanest pattern because it does not mutate shared state — multiple concurrent coroutines can hold the same AsyncSession factory while routing to different schemas without interference.
# Async: per-statement schema routing (safest under concurrent load)
async def get_tenant_orders(
session: AsyncSession,
tenant_schema: str,
user_id: int,
) -> list[dict]:
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Order(Base):
__tablename__ = "orders"
__table_args__ = {"schema": None}
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int]
total_cents: Mapped[int]
stmt = select(Order).where(Order.user_id == user_id)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: tenant_schema}},
)
return [{"id": r.id, "total": r.total_cents} for r in result.scalars()]
Sync variant for background workers or Alembic migration scripts:
# Sync: applying schema_translate_map on a connection directly
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
sync_engine = create_engine("postgresql+psycopg2://user:pass@localhost/saas_db")
with sync_engine.connect() as conn:
schema_conn = conn.execution_options(schema_translate_map={None: "acme"})
with Session(bind=schema_conn) as session:
invoices = session.execute(select(Invoice)).scalars().all()
State Management & Session Boundaries
Schema routing intersects with SQLAlchemy's session identity map in a subtle way: if two coroutines share a session and execute against different schemas, the identity map will cache the same primary key from two different schemas under the same ORM class, producing silent data corruption. The safe pattern is one session per request, session factory shared globally:
from contextlib import asynccontextmanager
from fastapi import Depends, Request
def get_tenant_schema(request: Request) -> str:
# In production, derive from JWT claim or verified subdomain
tenant_slug = request.headers.get("X-Tenant-ID", "public")
# Validate against an allowlist — never pass raw header values to SQL
allowed = {"acme": "acme", "globex": "globex", "initech": "initech"}
return allowed[tenant_slug] # KeyError becomes a 422/403 upstream
@asynccontextmanager
async def tenant_session(schema: str):
async with async_session() as session:
async with session.begin():
# Bind schema to session for the request lifetime
session.info["schema"] = schema
yield session
When using this approach, helper functions read session.info["schema"] and pass it via execution_options, keeping schema derivation centralized and auditable.
PostgreSQL search_path is an alternative to schema_translate_map for environments where you control the connection-level session variable. Setting SET search_path = acme via a connection event ensures all unqualified names resolve to the tenant schema without any SQLAlchemy rewriting. The tradeoff: search_path is connection-scoped, which means PgBouncer in transaction pooling mode can leak it to the next session unless you reset it on checkout.
from sqlalchemy import event
from sqlalchemy.ext.asyncio import AsyncEngine
def configure_search_path(engine: AsyncEngine, schema: str) -> None:
@event.listens_for(engine.sync_engine, "connect")
def set_search_path(dbapi_conn, connection_record):
cursor = dbapi_conn.cursor()
# Parameterized SET is not supported — validate schema name before use
cursor.execute(f"SET search_path = {schema}, public")
cursor.close()
For connection pooling environments, prefer schema_translate_map (which rewrites SQL at the SQLAlchemy layer) over search_path (which depends on connection-level state that pools reset unpredictably).
Resetting search_path Safely on Connection Checkout
If your deployment constraints force you to use search_path — for example, because third-party tools generate raw SQL that cannot be routed through SQLAlchemy — you can reset it safely by listening to the checkout pool event and issuing a reset statement before returning the connection to application code:
from sqlalchemy import event
from sqlalchemy.pool import ConnectionPoolEntry
def register_search_path_reset(engine):
@event.listens_for(engine.sync_engine, "checkout")
def reset_search_path(dbapi_conn, connection_record: ConnectionPoolEntry, connection_proxy):
cursor = dbapi_conn.cursor()
# Reset to public to prevent prior tenant schema from leaking
cursor.execute("SET search_path = public")
cursor.close()
This adds one round-trip per connection checkout. For high-throughput workloads (thousands of requests per second), the overhead is measurable. Benchmark both approaches against your actual query mix before choosing.
Advanced Multi-Tenant Patterns
Schema-per-Tenant vs Database-per-Tenant
The choice between these two isolation models is primarily operational, not a SQLAlchemy API constraint:
| Dimension | Schema-per-tenant | Database-per-tenant |
|---|---|---|
| Isolation | Logical (same DB process) | Physical (separate server or cluster) |
| Pool cost | One engine, one pool, N schemas | One engine per tenant, N pools |
| Cross-tenant queries | Possible via qualified names | Impossible without external tools |
| Alembic migrations | Run with schema_translate_map | Run per-database connection string |
| Best for | SaaS with dozens–hundreds of tenants | Regulated industries, very large tenants |
For schema-per-tenant with Alembic, configure env.py to iterate tenant schemas and apply schema_translate_map per migration run. This is covered in depth in the guide on Configuring Alembic with Async SQLAlchemy Engines.
Per-Tenant Database with Engine Registry
When tenants require full database isolation, maintain a registry of engines keyed by tenant identifier. Async engines are lightweight — the connection pool does not open connections until the first query — so pre-building the registry at startup is safe:
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from typing import dict as Dict
_engine_registry: Dict[str, AsyncEngine] = {}
def get_or_create_engine(tenant_id: str, dsn: str) -> AsyncEngine:
if tenant_id not in _engine_registry:
_engine_registry[tenant_id] = create_async_engine(
dsn,
pool_size=5, # per-tenant pool — keep small
max_overflow=2,
pool_recycle=1800,
pool_pre_ping=True,
)
return _engine_registry[tenant_id]
async def with_tenant_engine(tenant_id: str, dsn: str):
engine = get_or_create_engine(tenant_id, dsn)
async with engine.connect() as conn:
async with AsyncSession(bind=conn) as session:
yield session
Routing Reads to Replicas
Separating read and write workloads across async engines is the second dimension of multi-tenant routing. The routing reads to replicas guide covers Session.get_bind() overrides and async routing session factories in detail.
The foundation is two distinct engines — one targeting the primary, one targeting a read replica — combined with a routing AsyncSession subclass that dispatches based on whether the operation is a write or a read:
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
primary_engine = create_async_engine(
"postgresql+asyncpg://user:pass@primary-host/saas_db",
pool_size=20,
max_overflow=10,
)
replica_engine = create_async_engine(
"postgresql+asyncpg://user:pass@replica-host/saas_db",
pool_size=30, # replicas typically handle more concurrent reads
max_overflow=15,
pool_pre_ping=True,
)
Vertical Partitioning Across Multiple Engines
Vertical partitioning means different ORM models connect to different databases — Order and Invoice live on a transactional Postgres cluster while AuditLog targets a separate analytics store. SQLAlchemy 2.0 dropped the old Session(binds={Model: engine}) API; the modern approach uses a custom Session subclass overriding get_bind():
from sqlalchemy.orm import Session
from sqlalchemy import inspect
class VerticallyPartitionedSession(Session):
_analytics_tables = frozenset(["audit_logs", "metric_snapshots"])
def get_bind(self, mapper=None, clause=None, **kwargs):
if mapper is not None:
table_name = mapper.persist_selectable.name
if table_name in self._analytics_tables:
return analytics_sync_engine
return primary_sync_engine
For async workflows, wrap this in an AsyncSession with sync_session_class:
from sqlalchemy.ext.asyncio import AsyncSession
analytics_session = AsyncSession(
sync_session_class=VerticallyPartitionedSession
)
Tenant Schema Provisioning and Teardown
Provisioning a new tenant schema at runtime requires DDL operations that must be executed outside a transaction in PostgreSQL (or within an advisory lock). Use text() with autocommit isolation to create the schema, then run Alembic migrations to build the table structure:
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection
async def provision_tenant_schema(conn: AsyncConnection, schema_name: str) -> None:
# Validate schema_name before any SQL — allowlist regex or lookup
import re
if not re.fullmatch(r"[a-z][a-z0-9_]{0,62}", schema_name):
raise ValueError(f"Invalid schema name: {schema_name!r}")
# CREATE SCHEMA IF NOT EXISTS must run outside a transaction block
await conn.execute(text("COMMIT")) # end any implicit transaction
await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
# Grant application user access to new schema
await conn.execute(
text(f"GRANT USAGE, CREATE ON SCHEMA {schema_name} TO app_user")
)
After schema creation, run Alembic programmatically by setting schema_translate_map in the Alembic env.py context, targeting the new schema name. Refer to Configuring Alembic with Async SQLAlchemy Engines for the full env.py wiring.
Teardown (offboarding a tenant) requires the reverse: DROP SCHEMA IF EXISTS {schema_name} CASCADE. Because this operation is irreversible, always snapshot the schema to an external data store before executing. Implement a two-phase process — first rename the schema to {schema_name}_deleted_YYYYMMDD, confirm data export after a grace period, then drop.
Hybrid Architectures & Migration Strategies
Migrating from SQLAlchemy 1.4 Session(binds=...)
SQLAlchemy 1.4 supported Session(binds={SomeModel: some_engine, some_table: another_engine}) at the session constructor level. This API was removed in 2.0. The migration path:
- Identify all
Session(binds=...)callsites. - For each mapped class in the old
bindsdict, determine its table name. - Implement
get_bind()in aSessionsubclass with equivalent routing logic. - Replace
Session(binds=...)withMyRoutingSession(...).
For async applications, the sync_session_class parameter on AsyncSession is the bridge:
# 1.4 pattern (removed in 2.0)
# session = Session(binds={Order: order_engine, AuditLog: audit_engine})
# 2.0 pattern
from sqlalchemy.ext.asyncio import AsyncSession
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
if mapper and mapper.class_.__tablename__ in ("audit_logs",):
return audit_sync_engine
return order_sync_engine
async_session = AsyncSession(sync_session_class=RoutingSession)
Combining Schema Routing with Read Replica Routing
The most demanding production configuration combines both: route read queries to a replica AND apply schema_translate_map for tenant isolation. Stack both execution_options at call time:
async def fetch_tenant_invoices_from_replica(
session: AsyncSession,
tenant_schema: str,
) -> list[Invoice]:
stmt = select(Invoice).where(Invoice.paid == True)
result = await session.execute(
stmt,
execution_options={
"schema_translate_map": {None: tenant_schema},
# Custom flag read by a routing Session subclass
"use_replica": True,
},
)
return result.scalars().all()
The routing session inspects self._execution_options.get("use_replica") inside get_bind() to select the replica engine, while SQLAlchemy's built-in schema_translate_map handler rewrites the table names independently.
For the mechanics of schema_translate_map with FastAPI dependency injection, see the step-by-step walkthrough on switching schemas per request.
Bulk Data Operations in Multi-Tenant Contexts
Bulk insert and update operations — such as loading a CSV of orders into a tenant's schema — require the same schema_translate_map plumbing applied to insert() and update() statements. When using high-performance bulk inserts with session.execute(insert(Model), rows), the schema translation applies to the INSERT target table automatically:
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import Invoice
async def bulk_load_invoices(
session: AsyncSession,
tenant_schema: str,
rows: list[dict],
) -> None:
stmt = insert(Invoice)
# Chunk to avoid parameter limit (asyncpg cap: ~32 767 params per statement)
chunk_size = 500
async with session.begin():
for i in range(0, len(rows), chunk_size):
await session.execute(
stmt,
rows[i : i + chunk_size],
execution_options={"schema_translate_map": {None: tenant_schema}},
)
# Emits: INSERT INTO acme.invoices (tenant_id, amount_cents, paid) VALUES ...
For streaming large per-tenant result sets back to callers, combine schema_translate_map with yield_per execution options. SQLAlchemy merges both options on the same statement without conflict.
Testing Multi-Tenant Schema Routing
Testing schema routing in CI requires either a real PostgreSQL instance (via Docker) or careful mocking. The recommended approach uses a dedicated test schema per test run, created and dropped around the test session:
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import text
TEST_SCHEMA = "test_tenant_acme"
@pytest_asyncio.fixture(scope="session")
async def test_engine():
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/test_db",
echo=False,
)
async with engine.connect() as conn:
await conn.execute(text("COMMIT"))
await conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {TEST_SCHEMA}"))
await conn.execute(text("COMMIT"))
yield engine
async with engine.connect() as conn:
await conn.execute(text("COMMIT"))
await conn.execute(text(f"DROP SCHEMA IF EXISTS {TEST_SCHEMA} CASCADE"))
await engine.dispose()
@pytest.mark.asyncio
async def test_invoice_schema_routing(test_engine):
from sqlalchemy import insert, select
from your_models import Base, Invoice
async with test_engine.begin() as conn:
# Create tables in test schema using schema_translate_map
await conn.run_sync(
Base.metadata.create_all,
schema_translate_map={None: TEST_SCHEMA},
)
session_factory = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
# Insert into test schema
await session.execute(
insert(Invoice).values(tenant_id=1, amount_cents=1000, paid=False),
execution_options={"schema_translate_map": {None: TEST_SCHEMA}},
)
await session.commit()
# Read back from test schema
result = await session.execute(
select(Invoice),
execution_options={"schema_translate_map": {None: TEST_SCHEMA}},
)
invoices = result.scalars().all()
assert len(invoices) == 1
assert invoices[0].amount_cents == 1000
This pattern validates that schema_translate_map correctly rewrites table names to the test schema, giving high confidence that production routing will behave identically across all tenant schemas.
Production Pitfalls & Anti-Patterns
- Passing raw tenant identifiers into
schema_translate_map— Never doschema_translate_map={None: request.headers["X-Tenant"]}. An attacker can set the header topublic; DROP TABLE users; --. Always validate against an allowlist of known schema names before routing. - Sharing an
AsyncSessionacross coroutines with different schemas — The identity map does not distinguish schema context, sosession.get(Invoice, 5)on schemaacmewill return a cached instance originally loaded from schemaglobexif the same primary key exists in both. Use isolated sessions per request. - Using
search_pathwith PgBouncer in transaction pooling mode —SET search_pathis a session-level variable. In transaction pooling mode, the underlying connection is returned to the pool after each transaction. The next client that checks it out inherits the previous search path unless you reset it. Useschema_translate_mapinstead, which operates entirely at the Python layer. - Forgetting
pool_pre_ping=Trueon per-tenant engines — When a tenant's engine goes idle for hours and the database server closes the TCP connection, the next query raisesasyncpg.exceptions.ConnectionDoesNotExistError.pool_pre_pingdetects dead connections before use. - Leaking async engines in the registry — If
get_or_create_engine()is called with attacker-controlled tenant IDs, the registry grows unbounded. Cap the registry size and validate tenant IDs against the tenants table before creating an engine. - Applying
schema_translate_mapat the engine level in a shared-schema deployment — An engine-level map affects every connection from that pool permanently, including health checks and background tasks. Apply the map at thesession.execute()call level, never at the engine level in multi-tenant deployments.
Frequently Asked Questions
What is the difference between schema_translate_map and setting search_path?schema_translate_map is a SQLAlchemy-level rewrite: it modifies the compiled SQL string before it is sent to the database, replacing schema references in table identifiers. search_path is a PostgreSQL session variable that instructs the server to resolve unqualified names by searching listed schemas in order. Both achieve schema routing, but schema_translate_map works at the Python layer and is connection-pool-safe, while search_path is a server-side connection state that can leak across pooled connections if not reset on checkout.
Can I use schema_translate_map with models that have an explicit schema set in __table_args__?
Yes. The map key must match the schema string declared in the model. If your model declares __table_args__ = {"schema": "shared"}, then the map entry must be {"shared": "acme_shared"}. The key None only matches models that declare no schema or schema=None.
How do async engines handle connection limits when each tenant gets its own engine?
Each AsyncEngine maintains its own connection pool. With 100 tenants each having pool_size=5, you could hold up to 500 open connections to the database server. PostgreSQL's default max_connections is 100. Use smaller per-tenant pools (pool_size=2, max_overflow=1) for inactive tenants, or implement a shared pool proxy like PgBouncer and give each tenant a distinct database URL with a connection limit enforced at the proxy level.
Does schema_translate_map work with Core text() statements?
No. schema_translate_map only rewrites table references in compiled SQLAlchemy Core expressions (Table, mapped_column, select(Model)). Raw text() statements are passed through verbatim. If you must use text() with schema routing, embed the schema name directly via string formatting after validating it against an allowlist: text(f"SELECT * FROM {validated_schema}.invoices").
How do I run Alembic migrations across all tenant schemas?
Build a migration script that iterates the list of tenant schema names from the tenants table, then calls alembic upgrade head with a modified env.py that sets schema_translate_map={None: schema_name} for each run. Use a dedicated sync engine (not the async engine) because Alembic's migration environment is synchronous. The Alembic async configuration guide shows how to wire this up.
Related
- Advanced Query Patterns and Bulk Data Operations — Parent guide covering the full spectrum of SQLAlchemy 2.0 query optimization.
- Switching Schemas per Request with schema_translate_map — Step-by-step FastAPI dependency wiring for per-request schema routing.
- Routing Reads to Replicas with Async Engines — Implementing write-to-primary read-from-replica splits with a routing session subclass.
- Async Engines, Dialects and Connection Pooling — Connection pool sizing and async driver configuration that underpins all schema and replica routing.
- Streaming Large Result Sets with yield_per — Efficiently processing large per-tenant datasets without memory exhaustion.