Switching Schemas per Request with schema_translate_map

Pass execution_options={"schema_translate_map": {None: tenant_schema}} to session.execute() on every statement to rewrite all unqualified table references to the target PostgreSQL schema at SQLAlchemy compile time — no model changes required. This is the recommended pattern in the Dynamic Schema and Multi-Tenant Routing architecture for SaaS applications that isolate tenants into separate schemas within a shared database.

Quick Answer

The before/after contrast shows the 1.4 workaround (manual string formatting) versus the clean 2.0 approach:

# Legacy pattern — do NOT use (SQL injection risk, breaks compiled caching)
schema = "acme"
raw_sql = f"SELECT * FROM {schema}.invoices WHERE paid = false"
session.execute(text(raw_sql))
# SQLAlchemy 2.0 — schema_translate_map rewrites compiled SQL safely
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import Invoice  # Invoice.__table_args__ = {"schema": None}

async def get_unpaid_invoices(session: AsyncSession, tenant_schema: str) -> list[Invoice]:
    stmt = select(Invoice).where(Invoice.paid == False)
    result = await session.execute(
        stmt,
        execution_options={"schema_translate_map": {None: tenant_schema}},
    )
    return result.scalars().all()
    # Emits: SELECT invoices.id, ... FROM acme.invoices WHERE invoices.paid = false

The map key (None) matches the schema value declared in the model's __table_args__. If your model declares schema="shared", the key must be "shared", not None.

Execution Context & Async Workflow Integration

Why schema_translate_map is Connection-Pool-Safe

SQLAlchemy compiles each select() or insert() statement into a ClauseElement object and caches the compiled SQL string. When schema_translate_map is applied, it acts as a compilation modifier: the cache key includes the map, so {None: "acme"} and {None: "globex"} produce two distinct cached SQL strings. This means:

  1. The rewriting happens entirely in Python, before the SQL reaches the DBAPI.
  2. No connection-level session variable is set on the database server.
  3. The underlying asyncpg connection is returned to the pool in a clean state after the query, with no tenant-specific residue.

This is fundamentally different from SET search_path = acme (a server-side session variable), which persists on the connection until explicitly reset — a serious problem with PgBouncer or any connection pool that reuses connections across tenants.

Async Session Factory Setup

The async_sessionmaker factory is shared globally. Schema context is injected per statement, not per session:

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/saas_db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)

# expire_on_commit=False is critical in async FastAPI: after commit the response
# serializer still needs to read attributes, and the session may be closed.
AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

FastAPI Dependency Wiring

The tenant schema should be derived from a verified claim — a JWT sub-tenant field, a validated subdomain, or a server-side lookup — never passed raw from a request header.

from fastapi import Depends, FastAPI, Header, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncGenerator, Annotated

app = FastAPI()

# Allowlist of valid schema names — populated from your tenants table at startup
VALID_SCHEMAS: dict[str, str] = {
    "acme": "acme",
    "globex": "globex",
    "initech": "initech",
}


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        yield session


def resolve_tenant_schema(
    x_tenant_id: Annotated[str, Header()],
) -> str:
    schema = VALID_SCHEMAS.get(x_tenant_id)
    if schema is None:
        raise HTTPException(status_code=403, detail="Unknown tenant")
    return schema


@app.get("/invoices")
async def list_invoices(
    schema: str = Depends(resolve_tenant_schema),
    session: AsyncSession = Depends(get_db),
):
    from your_models import Invoice

    stmt = select(Invoice).where(Invoice.paid == False).order_by(Invoice.id)
    result = await session.execute(
        stmt,
        execution_options={"schema_translate_map": {None: schema}},
    )
    invoices = result.scalars().all()
    return [{"id": inv.id, "amount": inv.amount_cents} for inv in invoices]

Applying the Map to Write Operations

schema_translate_map works identically for insert(), update(), and delete() statements compiled through SQLAlchemy Core:

from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import AsyncSession
from your_models import Invoice
import datetime


async def create_invoice(
    session: AsyncSession,
    tenant_schema: str,
    amount_cents: int,
) -> int:
    stmt = (
        insert(Invoice)
        .values(tenant_id=1, amount_cents=amount_cents, paid=False)
        .returning(Invoice.id)
    )
    result = await session.execute(
        stmt,
        execution_options={"schema_translate_map": {None: tenant_schema}},
    )
    await session.commit()
    return result.scalar_one()
    # Emits: INSERT INTO acme.invoices (tenant_id, amount_cents, paid)
    #        VALUES (1, 5000, false) RETURNING invoices.id

Models with Multiple Declared Schemas

When your application mixes unschema'd models (public tables) with per-tenant models, use a multi-entry map:

from your_models import Invoice, Product, GlobalConfig

# GlobalConfig lives in the public schema — leave it unmapped
# Invoice and Product live in None schema — map to tenant
stmt = select(Invoice, Product).join(Product, Invoice.product_id == Product.id)
result = await session.execute(
    stmt,
    execution_options={"schema_translate_map": {None: "acme", "public": "public"}},
)

Including "public": "public" in the map is a no-op but makes intent explicit and prevents accidental rewriting if models are later moved.

Using schema_translate_map Inside Background Tasks

FastAPI background tasks and Celery workers need the same per-operation schema routing as request handlers. The key difference is that there is no incoming HTTP request to derive the tenant from — the schema name must be passed explicitly as a parameter:

from fastapi import BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession


async def send_invoice_email_background(session: AsyncSession, schema: str, invoice_id: int):
    """Background task: fetch invoice from tenant schema and send email."""
    from sqlalchemy import select
    from your_models import Invoice

    stmt = select(Invoice).where(Invoice.id == invoice_id)
    result = await session.execute(
        stmt,
        execution_options={"schema_translate_map": {None: schema}},
    )
    invoice = result.scalar_one_or_none()
    if invoice:
        # ... dispatch email via SMTP or SES
        pass


@app.post("/invoices/{invoice_id}/send")
async def trigger_invoice_email(
    invoice_id: int,
    background_tasks: BackgroundTasks,
    schema: str = Depends(resolve_tenant_schema),
    session: AsyncSession = Depends(get_db),
):
    background_tasks.add_task(
        send_invoice_email_background, session, schema, invoice_id
    )
    return {"queued": True}

Note that the session passed to a BackgroundTasks callback must remain open for the duration of the background task. Using async with AsyncSessionLocal() as session inside the background function (rather than injecting the request-scoped session) is safer for long-running tasks, because it prevents holding the session open past response delivery.

Resolving Warnings, Errors & Common Mistakes

Error / WarningRoot CauseProduction Fix
sqlalchemy.exc.ProgrammingError: relation "invoices" does not existschema_translate_map not applied to the statement, so SQL targets the default public schema which has no invoices table.Ensure every statement against schema'd models passes execution_options={"schema_translate_map": {...}}. Consider a middleware layer that injects the map automatically.
sqlalchemy.exc.ProgrammingError: schema "badschema" does not existAn unvalidated tenant identifier was passed to the map.Validate tenant IDs against an allowlist before constructing the map. Never pass raw request headers directly.
KeyError: 'acme' on schema_translate_mapThe map key does not match the schema string in __table_args__. E.g. model has schema="tenant" but map uses {None: "acme"}.Match the map key to the exact string in __table_args__. Use None only for models with no schema or schema=None.
sqlalchemy.exc.InvalidRequestError: Session is already flushingschema_translate_map applied inside an autoflush cycle triggered by relationship traversal.Set autoflush=False on the session factory or wrap schema-routed operations in explicit async with session.begin(): blocks.
Stale cached SQL targeting the wrong schemaschema_translate_map changed between calls but the same compiled cache entry was reused.This cannot happen — the map value is part of the cache key. If you observe this, check that execution_options is passed as a kwarg to session.execute(), not set on the engine globally.
asyncpg.exceptions.UndefinedTableError after PgBouncer migrationPreviously using search_path which leaked across pooled connections; now connections arrive without the expected search_path.Migrate all schema routing to schema_translate_map. Remove SET search_path from connection setup code.

Advanced schema_translate_map Optimization

Middleware-Level Schema Injection

Rather than threading tenant_schema through every function call, a Starlette middleware can attach the resolved schema to the request state and a session factory wrapper can read it automatically:

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request


class TenantSchemaMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        tenant_id = request.headers.get("X-Tenant-ID", "")
        schema = VALID_SCHEMAS.get(tenant_id)
        if schema is None and request.url.path.startswith("/api/"):
            from starlette.responses import JSONResponse
            return JSONResponse({"detail": "Unknown tenant"}, status_code=403)
        request.state.tenant_schema = schema
        return await call_next(request)


async def get_schema(request: Request) -> str:
    return request.state.tenant_schema


async def get_tenant_db(
    session: AsyncSession = Depends(get_db),
    schema: str = Depends(get_schema),
):
    """Yields a session with tenant schema pre-bound in session.info."""
    session.info["schema"] = schema
    yield session

All route handlers that depend on get_tenant_db can then call tenant_exec_opts(session) without caring where the schema name came from. This eliminates duplicated schema-resolution logic across dozens of endpoints.

Caching the Resolved Schema in session.info

For endpoints that call multiple ORM helpers, resolving and passing tenant_schema through every function signature is tedious. Attach it once per session and let helpers read it:

from sqlalchemy.ext.asyncio import AsyncSession


async def get_tenant_session(tenant_id: str) -> AsyncSession:
    schema = VALID_SCHEMAS[tenant_id]
    session = AsyncSessionLocal()
    session.info["schema"] = schema
    return session


def tenant_exec_opts(session: AsyncSession) -> dict:
    return {"schema_translate_map": {None: session.info["schema"]}}


# In any repository function:
async def count_invoices(session: AsyncSession) -> int:
    from sqlalchemy import func, select
    from your_models import Invoice

    stmt = select(func.count()).select_from(Invoice)
    result = await session.execute(stmt, execution_options=tenant_exec_opts(session))
    return result.scalar_one()

This pattern eliminates the risk of passing the wrong schema to an inner function and makes tenant context inspectable via session.info in logging middleware.

Combining schema_translate_map with selectinload

Eager-loaded relationships also respect schema_translate_map when the relationship target model declares a matching schema. The map is applied to the secondary SELECT ... IN (...) query that selectinload emits:

from sqlalchemy.orm import selectinload
from your_models import Tenant, Invoice


async def fetch_tenant_with_invoices(
    session: AsyncSession, tenant_id: int, schema: str
) -> Tenant:
    stmt = (
        select(Tenant)
        .where(Tenant.id == tenant_id)
        .options(selectinload(Tenant.invoices))
    )
    result = await session.execute(
        stmt,
        execution_options={"schema_translate_map": {None: schema}},
    )
    return result.scalar_one()
    # Both SELECT tenants... and SELECT invoices WHERE tenant_id IN (...)
    # are rewritten to the tenant schema.

Validating Schema Names Before Use

The single most important security control in schema-per-tenant routing is schema name validation. A schema name in PostgreSQL is an identifier, not a parameter — it cannot be safely passed through a %s placeholder. Always validate against one of these strategies:

  1. Database allowlist: Query SELECT schema_name FROM information_schema.schemata WHERE schema_name = $1 before using the name. This is authoritative but adds a round-trip.
  2. In-memory allowlist: Build a set[str] of valid schema names at application startup from the tenants table and refresh it periodically (every 60 seconds) via a background task.
  3. Regex whitelist: re.fullmatch(r"[a-z][a-z0-9_]{0,62}", schema_name) rejects all SQL injection payloads while allowing valid PostgreSQL identifiers. Use this as a secondary guard, not a substitute for the database allowlist.
import re
from functools import lru_cache


_VALID_SCHEMA_PATTERN = re.compile(r"^[a-z][a-z0-9_]{0,62}$")
_schema_allowlist: set[str] = set()


def validate_schema(schema: str) -> str:
    if not _VALID_SCHEMA_PATTERN.match(schema):
        raise ValueError(f"Invalid schema name format: {schema!r}")
    if schema not in _schema_allowlist:
        raise LookupError(f"Unknown tenant schema: {schema!r}")
    return schema

Refresh _schema_allowlist from a background asyncio task that runs every 60 seconds:

import asyncio
from sqlalchemy import text


async def refresh_schema_allowlist(engine) -> None:
    while True:
        async with engine.connect() as conn:
            result = await conn.execute(text("SELECT schema_name FROM tenants"))
            _schema_allowlist.clear()
            _schema_allowlist.update(row[0] for row in result)
        await asyncio.sleep(60)

Frequently Asked Questions

Does schema_translate_map work with session.get() and session.merge()?session.get() and session.merge() do not accept execution_options directly. The map must be pre-configured at the connection level using conn.execution_options(schema_translate_map=...) before passing the connection to the session, or you must use session.execute(select(Model).where(...)) instead of session.get().

Can I apply schema_translate_map to multiple schema keys at once? Yes. The map is a plain Python dict. {None: "acme", "analytics": "acme_analytics", "audit": "acme_audit"} redirects three different model schemas simultaneously in a single statement.

How do I verify which SQL is actually emitted? Enable engine echo: create_async_engine(..., echo=True). In production, attach an event listener to engine.sync_engine using event.listen(engine.sync_engine, "before_cursor_execute", log_fn) to capture the compiled SQL string including schema-rewritten table names.

Does schema_translate_map affect the information_schema or pg_catalog queries SQLAlchemy runs internally? No. SQLAlchemy's internal reflection and catalog queries use hardcoded schema names that are not subject to schema_translate_map rewriting. The map only applies to user-defined Table objects.