Switching Schemas per Request with schema_translate_map
Pass execution_options={"schema_translate_map": {None: tenant_schema}} to session.execute() on every statement to rewrite all unqualified table references to the target PostgreSQL schema at SQLAlchemy compile time — no model changes required. This is the recommended pattern in the Dynamic Schema and Multi-Tenant Routing architecture for SaaS applications that isolate tenants into separate schemas within a shared database.
Quick Answer
The before/after contrast shows the 1.4 workaround (manual string formatting) versus the clean 2.0 approach:
# Legacy pattern — do NOT use (SQL injection risk, breaks compiled caching)
schema = "acme"
raw_sql = f"SELECT * FROM {schema}.invoices WHERE paid = false"
session.execute(text(raw_sql))
# SQLAlchemy 2.0 — schema_translate_map rewrites compiled SQL safely
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import Invoice # Invoice.__table_args__ = {"schema": None}
async def get_unpaid_invoices(session: AsyncSession, tenant_schema: str) -> list[Invoice]:
stmt = select(Invoice).where(Invoice.paid == False)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: tenant_schema}},
)
return result.scalars().all()
# Emits: SELECT invoices.id, ... FROM acme.invoices WHERE invoices.paid = false
The map key (None) matches the schema value declared in the model's __table_args__. If your model declares schema="shared", the key must be "shared", not None.
Execution Context & Async Workflow Integration
Why schema_translate_map is Connection-Pool-Safe
SQLAlchemy compiles each select() or insert() statement into a ClauseElement object and caches the compiled SQL string. When schema_translate_map is applied, it acts as a compilation modifier: the cache key includes the map, so {None: "acme"} and {None: "globex"} produce two distinct cached SQL strings. This means:
- The rewriting happens entirely in Python, before the SQL reaches the DBAPI.
- No connection-level session variable is set on the database server.
- The underlying
asyncpgconnection is returned to the pool in a clean state after the query, with no tenant-specific residue.
This is fundamentally different from SET search_path = acme (a server-side session variable), which persists on the connection until explicitly reset — a serious problem with PgBouncer or any connection pool that reuses connections across tenants.
Async Session Factory Setup
The async_sessionmaker factory is shared globally. Schema context is injected per statement, not per session:
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/saas_db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
# expire_on_commit=False is critical in async FastAPI: after commit the response
# serializer still needs to read attributes, and the session may be closed.
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
FastAPI Dependency Wiring
The tenant schema should be derived from a verified claim — a JWT sub-tenant field, a validated subdomain, or a server-side lookup — never passed raw from a request header.
from fastapi import Depends, FastAPI, Header, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncGenerator, Annotated
app = FastAPI()
# Allowlist of valid schema names — populated from your tenants table at startup
VALID_SCHEMAS: dict[str, str] = {
"acme": "acme",
"globex": "globex",
"initech": "initech",
}
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
def resolve_tenant_schema(
x_tenant_id: Annotated[str, Header()],
) -> str:
schema = VALID_SCHEMAS.get(x_tenant_id)
if schema is None:
raise HTTPException(status_code=403, detail="Unknown tenant")
return schema
@app.get("/invoices")
async def list_invoices(
schema: str = Depends(resolve_tenant_schema),
session: AsyncSession = Depends(get_db),
):
from your_models import Invoice
stmt = select(Invoice).where(Invoice.paid == False).order_by(Invoice.id)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: schema}},
)
invoices = result.scalars().all()
return [{"id": inv.id, "amount": inv.amount_cents} for inv in invoices]
Applying the Map to Write Operations
schema_translate_map works identically for insert(), update(), and delete() statements compiled through SQLAlchemy Core:
from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import Invoice
import datetime
async def create_invoice(
session: AsyncSession,
tenant_schema: str,
amount_cents: int,
) -> int:
stmt = (
insert(Invoice)
.values(tenant_id=1, amount_cents=amount_cents, paid=False)
.returning(Invoice.id)
)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: tenant_schema}},
)
await session.commit()
return result.scalar_one()
# Emits: INSERT INTO acme.invoices (tenant_id, amount_cents, paid)
# VALUES (1, 5000, false) RETURNING invoices.id
Models with Multiple Declared Schemas
When your application mixes unschema'd models (public tables) with per-tenant models, use a multi-entry map:
from your_models import Invoice, Product, GlobalConfig
# GlobalConfig lives in the public schema — leave it unmapped
# Invoice and Product live in None schema — map to tenant
stmt = select(Invoice, Product).join(Product, Invoice.product_id == Product.id)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: "acme", "public": "public"}},
)
Including "public": "public" in the map is a no-op but makes intent explicit and prevents accidental rewriting if models are later moved.
Using schema_translate_map Inside Background Tasks
FastAPI background tasks and Celery workers need the same per-operation schema routing as request handlers. The key difference is that there is no incoming HTTP request to derive the tenant from — the schema name must be passed explicitly as a parameter:
from fastapi import BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
async def send_invoice_email_background(session: AsyncSession, schema: str, invoice_id: int):
"""Background task: fetch invoice from tenant schema and send email."""
from sqlalchemy import select
from your_models import Invoice
stmt = select(Invoice).where(Invoice.id == invoice_id)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: schema}},
)
invoice = result.scalar_one_or_none()
if invoice:
# ... dispatch email via SMTP or SES
pass
@app.post("/invoices/{invoice_id}/send")
async def trigger_invoice_email(
invoice_id: int,
background_tasks: BackgroundTasks,
schema: str = Depends(resolve_tenant_schema),
session: AsyncSession = Depends(get_db),
):
background_tasks.add_task(
send_invoice_email_background, session, schema, invoice_id
)
return {"queued": True}
Note that the session passed to a BackgroundTasks callback must remain open for the duration of the background task. Using async with AsyncSessionLocal() as session inside the background function (rather than injecting the request-scoped session) is safer for long-running tasks, because it prevents holding the session open past response delivery.
Resolving Warnings, Errors & Common Mistakes
| Error / Warning | Root Cause | Production Fix |
|---|---|---|
sqlalchemy.exc.ProgrammingError: relation "invoices" does not exist | schema_translate_map not applied to the statement, so SQL targets the default public schema which has no invoices table. | Ensure every statement against schema'd models passes execution_options={"schema_translate_map": {...}}. Consider a middleware layer that injects the map automatically. |
sqlalchemy.exc.ProgrammingError: schema "badschema" does not exist | An unvalidated tenant identifier was passed to the map. | Validate tenant IDs against an allowlist before constructing the map. Never pass raw request headers directly. |
KeyError: 'acme' on schema_translate_map | The map key does not match the schema string in __table_args__. E.g. model has schema="tenant" but map uses {None: "acme"}. | Match the map key to the exact string in __table_args__. Use None only for models with no schema or schema=None. |
sqlalchemy.exc.InvalidRequestError: Session is already flushing | schema_translate_map applied inside an autoflush cycle triggered by relationship traversal. | Set autoflush=False on the session factory or wrap schema-routed operations in explicit async with session.begin(): blocks. |
| Stale cached SQL targeting the wrong schema | schema_translate_map changed between calls but the same compiled cache entry was reused. | This cannot happen — the map value is part of the cache key. If you observe this, check that execution_options is passed as a kwarg to session.execute(), not set on the engine globally. |
asyncpg.exceptions.UndefinedTableError after PgBouncer migration | Previously using search_path which leaked across pooled connections; now connections arrive without the expected search_path. | Migrate all schema routing to schema_translate_map. Remove SET search_path from connection setup code. |
Advanced schema_translate_map Optimization
Middleware-Level Schema Injection
Rather than threading tenant_schema through every function call, a Starlette middleware can attach the resolved schema to the request state and a session factory wrapper can read it automatically:
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
class TenantSchemaMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
tenant_id = request.headers.get("X-Tenant-ID", "")
schema = VALID_SCHEMAS.get(tenant_id)
if schema is None and request.url.path.startswith("/api/"):
from starlette.responses import JSONResponse
return JSONResponse({"detail": "Unknown tenant"}, status_code=403)
request.state.tenant_schema = schema
return await call_next(request)
async def get_schema(request: Request) -> str:
return request.state.tenant_schema
async def get_tenant_db(
session: AsyncSession = Depends(get_db),
schema: str = Depends(get_schema),
):
"""Yields a session with tenant schema pre-bound in session.info."""
session.info["schema"] = schema
yield session
All route handlers that depend on get_tenant_db can then call tenant_exec_opts(session) without caring where the schema name came from. This eliminates duplicated schema-resolution logic across dozens of endpoints.
Caching the Resolved Schema in session.info
For endpoints that call multiple ORM helpers, resolving and passing tenant_schema through every function signature is tedious. Attach it once per session and let helpers read it:
from sqlalchemy.ext.asyncio import AsyncSession
async def get_tenant_session(tenant_id: str) -> AsyncSession:
schema = VALID_SCHEMAS[tenant_id]
session = AsyncSessionLocal()
session.info["schema"] = schema
return session
def tenant_exec_opts(session: AsyncSession) -> dict:
return {"schema_translate_map": {None: session.info["schema"]}}
# In any repository function:
async def count_invoices(session: AsyncSession) -> int:
from sqlalchemy import func, select
from your_models import Invoice
stmt = select(func.count()).select_from(Invoice)
result = await session.execute(stmt, execution_options=tenant_exec_opts(session))
return result.scalar_one()
This pattern eliminates the risk of passing the wrong schema to an inner function and makes tenant context inspectable via session.info in logging middleware.
Combining schema_translate_map with selectinload
Eager-loaded relationships also respect schema_translate_map when the relationship target model declares a matching schema. The map is applied to the secondary SELECT ... IN (...) query that selectinload emits:
from sqlalchemy.orm import selectinload
from your_models import Tenant, Invoice
async def fetch_tenant_with_invoices(
session: AsyncSession, tenant_id: int, schema: str
) -> Tenant:
stmt = (
select(Tenant)
.where(Tenant.id == tenant_id)
.options(selectinload(Tenant.invoices))
)
result = await session.execute(
stmt,
execution_options={"schema_translate_map": {None: schema}},
)
return result.scalar_one()
# Both SELECT tenants... and SELECT invoices WHERE tenant_id IN (...)
# are rewritten to the tenant schema.
Validating Schema Names Before Use
The single most important security control in schema-per-tenant routing is schema name validation. A schema name in PostgreSQL is an identifier, not a parameter — it cannot be safely passed through a %s placeholder. Always validate against one of these strategies:
- Database allowlist: Query
SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1before using the name. This is authoritative but adds a round-trip. - In-memory allowlist: Build a
set[str]of valid schema names at application startup from thetenantstable and refresh it periodically (every 60 seconds) via a background task. - Regex whitelist:
re.fullmatch(r"[a-z][a-z0-9_]{0,62}", schema_name)rejects all SQL injection payloads while allowing valid PostgreSQL identifiers. Use this as a secondary guard, not a substitute for the database allowlist.
import re
from functools import lru_cache
_VALID_SCHEMA_PATTERN = re.compile(r"^[a-z][a-z0-9_]{0,62}$")
_schema_allowlist: set[str] = set()
def validate_schema(schema: str) -> str:
if not _VALID_SCHEMA_PATTERN.match(schema):
raise ValueError(f"Invalid schema name format: {schema!r}")
if schema not in _schema_allowlist:
raise LookupError(f"Unknown tenant schema: {schema!r}")
return schema
Refresh _schema_allowlist from a background asyncio task that runs every 60 seconds:
import asyncio
from sqlalchemy import text
async def refresh_schema_allowlist(engine) -> None:
while True:
async with engine.connect() as conn:
result = await conn.execute(text("SELECT schema_name FROM tenants"))
_schema_allowlist.clear()
_schema_allowlist.update(row[0] for row in result)
await asyncio.sleep(60)
Frequently Asked Questions
Does schema_translate_map work with session.get() and session.merge()?session.get() and session.merge() do not accept execution_options directly. The map must be pre-configured at the connection level using conn.execution_options(schema_translate_map=...) before passing the connection to the session, or you must use session.execute(select(Model).where(...)) instead of session.get().
Can I apply schema_translate_map to multiple schema keys at once?
Yes. The map is a plain Python dict. {None: "acme", "analytics": "acme_analytics", "audit": "acme_audit"} redirects three different model schemas simultaneously in a single statement.
How do I verify which SQL is actually emitted?
Enable engine echo: create_async_engine(..., echo=True). In production, attach an event listener to engine.sync_engine using event.listen(engine.sync_engine, "before_cursor_execute", log_fn) to capture the compiled SQL string including schema-rewritten table names.
Does schema_translate_map affect the information_schema or pg_catalog queries SQLAlchemy runs internally?
No. SQLAlchemy's internal reflection and catalog queries use hardcoded schema names that are not subject to schema_translate_map rewriting. The map only applies to user-defined Table objects.
Related
- Dynamic Schema and Multi-Tenant Routing — Parent guide covering schema isolation models, engine-per-tenant patterns, and vertical partitioning.
- Routing Reads to Replicas with Async Engines — Combine replica routing with schema routing for full production multi-tenant architecture.
- Async Engines, Dialects and Connection Pooling — Connection pool configuration that supports concurrent per-tenant requests safely.
- Integrating SQLAlchemy Async with FastAPI and Starlette — Dependency injection patterns for session lifecycle management in FastAPI.