Setting Transaction Isolation Level Per Session in SQLAlchemy 2.0

Call connection.execution_options(isolation_level="SERIALIZABLE") before the first statement in a transaction, or pass execution_options={"isolation_level": ...} to session.connection() — this guide, part of the transaction isolation and commit strategies reference, shows you exactly where each hook belongs and what breaks when you place it in the wrong order.

Quick Answer

# Before (SQLAlchemy 1.4 — engine-level global, no per-transaction override)
engine = create_engine(
    "postgresql+psycopg2://app:secret@localhost/orders",
    isolation_level="REPEATABLE_READ",  # applies to every connection, forever
)

with engine.connect() as conn:
    conn.execute(text("SELECT ..."))
# After (SQLAlchemy 2.0 — per-transaction override via execution_options)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

# Engine-wide default (optional — READ COMMITTED is the PostgreSQL default)
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")

# Per-transaction override on a Core Connection
with engine.connect() as conn:
    conn = conn.execution_options(isolation_level="REPEATABLE READ")
    with conn.begin():
        result = conn.execute(text("SELECT balance FROM accounts WHERE id = 1"))

# Per-transaction override via ORM Session
with Session(engine) as session:
    session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    with session.begin():
        account = session.get(Account, 1)
        account.balance -= 500

The critical rule: execution_options must be applied before the first SQL statement in the transaction. After BEGIN is issued, the database ignores isolation-level changes until the next transaction.

Execution Context & Async Workflow Integration

How execution_options Hooks Into the Driver

When you call conn.execution_options(isolation_level="X"), SQLAlchemy does not immediately run a SQL statement. It schedules a DBAPI-level call — connection.set_isolation_level() for psycopg2, await connection.execute("SET TRANSACTION ISOLATION LEVEL X") for asyncpg — as the first operation when BEGIN is emitted. The mechanism is dialect-specific but the SQLAlchemy API is uniform.

For asyncpg specifically, isolation level is passed as a keyword argument to the asyncpg Transaction object at construction time, not via a separate SQL statement. This means the override is zero-cost and race-free even under high concurrency.

Engine-Wide vs Per-Transaction

# Engine-wide: every connection from this engine uses REPEATABLE READ
from sqlalchemy import create_engine

engine_rr = create_engine(
    "postgresql+psycopg2://app:secret@localhost/orders",
    isolation_level="REPEATABLE READ",
)

# Per-transaction: override only for this connection checkout
with engine_rr.connect() as conn:
    # Override engine default to SERIALIZABLE for this one transaction
    conn = conn.execution_options(isolation_level="SERIALIZABLE")
    with conn.begin():
        pass  # runs under SERIALIZABLE
    # After COMMIT, the connection is returned to pool.
    # The pool resets isolation to engine default before the next checkout.

The pool automatically resets the isolation level to the engine default when a connection is returned. You do not need to manually undo the override.

ORM Session: session.connection()

For ORM workflows the hook is session.connection(execution_options=...). Call it before touching any mapped object to ensure the override fires before autobegin.

# Sync ORM — REPEATABLE READ before any DML
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, select

engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")

def aggregate_user_orders(user_id: int) -> int:
    with Session(engine) as session:
        # Set isolation BEFORE autobegin fires
        session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
        total = 0
        # First read establishes the snapshot
        orders_q1 = session.execute(
            select(Order).where(Order.user_id == user_id)
        ).scalars().all()
        # ... do some processing ...
        # Second read sees the SAME snapshot — no phantom rows
        orders_q2 = session.execute(
            select(Order).where(Order.user_id == user_id)
        ).scalars().all()
        assert len(orders_q1) == len(orders_q2), "Snapshot drifted — isolation broken"
        session.rollback()  # read-only; release connection
        return len(orders_q1)
# Async ORM — await session.connection() before first statement
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import select

async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
AsyncSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)

async def aggregate_user_orders_async(user_id: int) -> int:
    async with AsyncSessionFactory() as session:
        # Must await — asyncpg sets isolation asynchronously
        await session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
        orders = (await session.execute(
            select(Order).where(Order.user_id == user_id)
        )).scalars().all()
        await session.rollback()
        return len(orders)

Verifying the Active Isolation Level in Tests

The most common source of isolation-related bugs is believing your override is in effect when it is not. Add an assertion in your test suite:

import pytest
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import text

async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/test_db")
TestSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)

@pytest.mark.asyncio
async def test_isolation_level_is_serializable() -> None:
    async with TestSessionFactory() as session:
        await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
        # Confirm PostgreSQL actually sees SERIALIZABLE
        row = (await session.execute(
            text("SELECT current_setting('transaction_isolation')")
        )).scalar_one()
        assert row == "serializable", f"Expected serializable, got {row!r}"
        await session.rollback()

Run this in CI alongside your integration tests. It catches misconfigured engines, driver version mismatches, and PgBouncer transaction-pooling environments where SET commands may be silently dropped.

Isolation Level Strings by Dialect

SQLAlchemy validates isolation level strings against a per-dialect allowlist. Using an incorrect string raises ArgumentError at configuration time (not at runtime), which makes it easy to catch in tests.

DialectValid isolation level strings
PostgreSQL (psycopg2, asyncpg)"READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE", "AUTOCOMMIT"
MySQL (mysqlclient, aiomysql)"READ UNCOMMITTED", "READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE"
SQLite (pysqlite, aiosqlite)"DEFERRED", "IMMEDIATE", "EXCLUSIVE", "AUTOCOMMIT"
Microsoft SQL Server (pyodbc)"READ UNCOMMITTED", "READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE", "SNAPSHOT"

Note that PostgreSQL uses spaces in the string ("READ COMMITTED") while MySQL uses underscores in its native syntax but SQLAlchemy normalises both — use the space-separated form for portability across SQLAlchemy's dialect layer.

Per-Sessionmaker Default

If every session created by a factory should use a non-default isolation level, configure it on the engine passed to async_sessionmaker:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

# All sessions from this factory use REPEATABLE READ
reporting_engine = create_async_engine(
    "postgresql+asyncpg://ro_user:secret@replica.db/orders",
    isolation_level="REPEATABLE READ",
    pool_size=5,
    max_overflow=2,
)
ReportingSessionFactory = async_sessionmaker(reporting_engine, expire_on_commit=False)

async def run_report() -> list[dict]:
    async with ReportingSessionFactory() as session:
        result = await session.execute(select(Order))
        return [{"id": o.id, "total": o.total_cents} for o in result.scalars()]

Using a dedicated reporting engine with its own pool and isolation level cleanly separates read-heavy analytical workloads from transactional OLTP sessions.

Resolving Warnings, Errors & Common Mistakes

Error / WarningRoot CauseProduction Fix
sqlalchemy.exc.InvalidRequestError: Transaction already started when calling execution_optionsA statement was executed before execution_options was set, firing autobeginCall session.connection(execution_options=...) as the very first operation on the session
psycopg2.errors.ActiveSqlTransaction: SET TRANSACTION ISOLATION LEVEL must be called before any queryexecution_options was applied after BEGIN via a raw SET TRANSACTION callUse SQLAlchemy's execution_options hook, not raw SQL; it fires before BEGIN
asyncpg.exceptions.ActiveSQLTransactionErrorSame as above on asyncpgEnsure await session.connection(execution_options=...) precedes any DML or SELECT
Silent fallback to engine default isolation levelexecution_options(isolation_level=...) was called on the Engine object itself, not on a ConnectionCall execution_options on the Connection returned by engine.connect(), not on engine
pool.reset_on_return overwrite issue — isolation not reset between requestsUsing NullPool or StaticPool where connection is not returned to poolUse QueuePool (default) or call conn.execution_options(isolation_level=<engine_default>) on reuse
StaleDataError raised unexpectedly under SERIALIZABLEConcurrent transaction modified a row between your read and your write; optimistic-lock version mismatchCatch StaleDataError or OperationalError with SQLSTATE 40001; retry the full transaction from a new session
isolation_level string not recognised — OperationalError: unknown isolation levelDialect-specific string mismatch (e.g. "REPEATABLE_READ" with underscore on PostgreSQL)PostgreSQL uses spaces: "REPEATABLE READ", "READ COMMITTED", "SERIALIZABLE". MySQL uses underscores: "REPEATABLE_READ"

Advanced Isolation Optimization

The AUTOCOMMIT Pseudo-Isolation Level

SQLAlchemy exposes "AUTOCOMMIT" as an isolation level string. Setting it bypasses SQLAlchemy's transaction management entirely — no BEGIN is issued, and each statement is committed immediately by the DBAPI. This is appropriate for DDL operations that must run outside a transaction (e.g., CREATE INDEX CONCURRENTLY on PostgreSQL, which cannot run inside a transaction block) and for calling stored procedures that manage their own transaction state.

from sqlalchemy import create_engine, text

engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")

def create_index_concurrently(table: str, column: str) -> None:
    """CREATE INDEX CONCURRENTLY must run outside a transaction block."""
    with engine.connect() as conn:
        conn = conn.execution_options(isolation_level="AUTOCOMMIT")
        # No BEGIN issued; each statement commits immediately
        conn.execute(
            text(f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_{table}_{column} ON {table} ({column})")
        )
    # Connection returned to pool; pool resets to engine default (not AUTOCOMMIT)

Do not use AUTOCOMMIT for normal application reads or writes. Without transaction boundaries, partial writes cannot be rolled back and constraint violations leave the database in an inconsistent state.

Mixing Isolation Levels Within a Single Request

A common pattern in financial applications is to use READ COMMITTED for the bulk of a request (lowest overhead, best concurrency) and escalate to SERIALIZABLE only for the critical write path. Because isolation level is per-transaction, you can do this within a single HTTP request using two separate sessions that each open and close their own connection:

from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy import select, text

async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
SessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)

async def checkout(cart_id: int, user_id: int) -> dict:
    # Phase 1: read cart contents under READ COMMITTED (default)
    async with SessionFactory() as read_session:
        cart_items = (await read_session.execute(
            select(CartItem).where(CartItem.cart_id == cart_id)
        )).scalars().all()

    # Phase 2: commit the order under SERIALIZABLE to prevent double-booking
    async with SessionFactory() as write_session:
        await write_session.connection(
            execution_options={"isolation_level": "SERIALIZABLE"}
        )
        async with write_session.begin():
            for item in cart_items:
                product = await write_session.get(Product, item.product_id)
                if product.stock_qty < item.quantity:
                    raise ValueError(f"Insufficient stock for {product.name}")
                product.stock_qty -= item.quantity
            order = Order(user_id=user_id, status="confirmed")
            write_session.add(order)

    return {"order_id": order.id, "items": len(cart_items)}

This pattern avoids paying the SERIALIZABLE overhead for non-critical reads while providing full serializability guarantees exactly where they matter.

Using Execution Options on Scoped Sessions in Frameworks

In FastAPI and similar frameworks, sessions are typically created by a dependency and shared across the request. When you need a non-default isolation level for a specific endpoint, retrieve the underlying connection from the injected session rather than creating a new session:

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select

router = APIRouter()

async def get_session() -> AsyncSession:
    async with AsyncSessionFactory() as session:
        yield session

@router.post("/transfer")
async def transfer_endpoint(
    src_id: int,
    dst_id: int,
    amount: int,
    session: AsyncSession = Depends(get_session),
) -> dict:
    # Escalate isolation BEFORE the first statement in this endpoint
    await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
    async with session.begin():
        src = await session.get(Account, src_id)
        dst = await session.get(Account, dst_id)
        if src.balance < amount:
            raise ValueError("Insufficient balance")
        src.balance -= amount
        dst.balance += amount
    return {"status": "ok"}

The dependency yields the session before any statements execute, so session.connection() fires before autobegin. The isolation override applies to the entire request transaction without leaking into unrelated endpoints — each request gets a fresh session from the pool.

Frequently Asked Questions

Can I set the isolation level using a raw SET TRANSACTION SQL statement instead of execution_options? You can, but it is fragile. SQLAlchemy may issue its own BEGIN before your SET TRANSACTION fires, causing the database to reject the statement with an ActiveSqlTransaction error. The execution_options hook is the only safe way because it fires at the DBAPI layer before BEGIN is emitted.

Does the isolation level override persist across commits within the same connection? No. After COMMIT or ROLLBACK, the connection is returned to the pool and the pool resets it to the engine-wide default. Each new checkout (each new with engine.connect() or session context) starts fresh. You must re-apply execution_options on every new connection or session if you need a non-default level.

Is there a performance cost to SERIALIZABLE isolation on PostgreSQL? PostgreSQL's Serializable Snapshot Isolation adds CPU and memory overhead to track read/write dependencies — roughly 5–10% on typical OLTP workloads, more on read-heavy analytical queries. The primary cost is serialization failures requiring retries, not locking. Benchmark your specific access pattern before committing SERIALIZABLE engine-wide.

How do I verify which isolation level a live connection is actually using? For PostgreSQL, execute SELECT current_setting('transaction_isolation') inside the transaction. For MySQL, use SELECT @@transaction_isolation. In tests, you can listen on the after_begin event to assert the isolation level matches expectations.