Setting Transaction Isolation Level Per Session in SQLAlchemy 2.0
Call connection.execution_options(isolation_level="SERIALIZABLE") before the first statement in a transaction, or pass execution_options={"isolation_level": ...} to session.connection() — this guide, part of the transaction isolation and commit strategies reference, shows you exactly where each hook belongs and what breaks when you place it in the wrong order.
Quick Answer
# Before (SQLAlchemy 1.4 — engine-level global, no per-transaction override)
engine = create_engine(
"postgresql+psycopg2://app:secret@localhost/orders",
isolation_level="REPEATABLE_READ", # applies to every connection, forever
)
with engine.connect() as conn:
conn.execute(text("SELECT ..."))
# After (SQLAlchemy 2.0 — per-transaction override via execution_options)
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
# Engine-wide default (optional — READ COMMITTED is the PostgreSQL default)
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
# Per-transaction override on a Core Connection
with engine.connect() as conn:
conn = conn.execution_options(isolation_level="REPEATABLE READ")
with conn.begin():
result = conn.execute(text("SELECT balance FROM accounts WHERE id = 1"))
# Per-transaction override via ORM Session
with Session(engine) as session:
session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
with session.begin():
account = session.get(Account, 1)
account.balance -= 500
The critical rule: execution_options must be applied before the first SQL statement in the transaction. After BEGIN is issued, the database ignores isolation-level changes until the next transaction.
Execution Context & Async Workflow Integration
How execution_options Hooks Into the Driver
When you call conn.execution_options(isolation_level="X"), SQLAlchemy does not immediately run a SQL statement. It schedules a DBAPI-level call — connection.set_isolation_level() for psycopg2, await connection.execute("SET TRANSACTION ISOLATION LEVEL X") for asyncpg — as the first operation when BEGIN is emitted. The mechanism is dialect-specific but the SQLAlchemy API is uniform.
For asyncpg specifically, isolation level is passed as a keyword argument to the asyncpg Transaction object at construction time, not via a separate SQL statement. This means the override is zero-cost and race-free even under high concurrency.
Engine-Wide vs Per-Transaction
# Engine-wide: every connection from this engine uses REPEATABLE READ
from sqlalchemy import create_engine
engine_rr = create_engine(
"postgresql+psycopg2://app:secret@localhost/orders",
isolation_level="REPEATABLE READ",
)
# Per-transaction: override only for this connection checkout
with engine_rr.connect() as conn:
# Override engine default to SERIALIZABLE for this one transaction
conn = conn.execution_options(isolation_level="SERIALIZABLE")
with conn.begin():
pass # runs under SERIALIZABLE
# After COMMIT, the connection is returned to pool.
# The pool resets isolation to engine default before the next checkout.
The pool automatically resets the isolation level to the engine default when a connection is returned. You do not need to manually undo the override.
ORM Session: session.connection()
For ORM workflows the hook is session.connection(execution_options=...). Call it before touching any mapped object to ensure the override fires before autobegin.
# Sync ORM — REPEATABLE READ before any DML
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, select
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
def aggregate_user_orders(user_id: int) -> int:
with Session(engine) as session:
# Set isolation BEFORE autobegin fires
session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
total = 0
# First read establishes the snapshot
orders_q1 = session.execute(
select(Order).where(Order.user_id == user_id)
).scalars().all()
# ... do some processing ...
# Second read sees the SAME snapshot — no phantom rows
orders_q2 = session.execute(
select(Order).where(Order.user_id == user_id)
).scalars().all()
assert len(orders_q1) == len(orders_q2), "Snapshot drifted — isolation broken"
session.rollback() # read-only; release connection
return len(orders_q1)
# Async ORM — await session.connection() before first statement
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import select
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
AsyncSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
async def aggregate_user_orders_async(user_id: int) -> int:
async with AsyncSessionFactory() as session:
# Must await — asyncpg sets isolation asynchronously
await session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
orders = (await session.execute(
select(Order).where(Order.user_id == user_id)
)).scalars().all()
await session.rollback()
return len(orders)
Verifying the Active Isolation Level in Tests
The most common source of isolation-related bugs is believing your override is in effect when it is not. Add an assertion in your test suite:
import pytest
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import text
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/test_db")
TestSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
@pytest.mark.asyncio
async def test_isolation_level_is_serializable() -> None:
async with TestSessionFactory() as session:
await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# Confirm PostgreSQL actually sees SERIALIZABLE
row = (await session.execute(
text("SELECT current_setting('transaction_isolation')")
)).scalar_one()
assert row == "serializable", f"Expected serializable, got {row!r}"
await session.rollback()
Run this in CI alongside your integration tests. It catches misconfigured engines, driver version mismatches, and PgBouncer transaction-pooling environments where SET commands may be silently dropped.
Isolation Level Strings by Dialect
SQLAlchemy validates isolation level strings against a per-dialect allowlist. Using an incorrect string raises ArgumentError at configuration time (not at runtime), which makes it easy to catch in tests.
| Dialect | Valid isolation level strings |
|---|---|
| PostgreSQL (psycopg2, asyncpg) | "READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE", "AUTOCOMMIT" |
| MySQL (mysqlclient, aiomysql) | "READ UNCOMMITTED", "READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE" |
| SQLite (pysqlite, aiosqlite) | "DEFERRED", "IMMEDIATE", "EXCLUSIVE", "AUTOCOMMIT" |
| Microsoft SQL Server (pyodbc) | "READ UNCOMMITTED", "READ COMMITTED", "REPEATABLE READ", "SERIALIZABLE", "SNAPSHOT" |
Note that PostgreSQL uses spaces in the string ("READ COMMITTED") while MySQL uses underscores in its native syntax but SQLAlchemy normalises both — use the space-separated form for portability across SQLAlchemy's dialect layer.
Per-Sessionmaker Default
If every session created by a factory should use a non-default isolation level, configure it on the engine passed to async_sessionmaker:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
# All sessions from this factory use REPEATABLE READ
reporting_engine = create_async_engine(
"postgresql+asyncpg://ro_user:secret@replica.db/orders",
isolation_level="REPEATABLE READ",
pool_size=5,
max_overflow=2,
)
ReportingSessionFactory = async_sessionmaker(reporting_engine, expire_on_commit=False)
async def run_report() -> list[dict]:
async with ReportingSessionFactory() as session:
result = await session.execute(select(Order))
return [{"id": o.id, "total": o.total_cents} for o in result.scalars()]
Using a dedicated reporting engine with its own pool and isolation level cleanly separates read-heavy analytical workloads from transactional OLTP sessions.
Resolving Warnings, Errors & Common Mistakes
| Error / Warning | Root Cause | Production Fix |
|---|---|---|
sqlalchemy.exc.InvalidRequestError: Transaction already started when calling execution_options | A statement was executed before execution_options was set, firing autobegin | Call session.connection(execution_options=...) as the very first operation on the session |
psycopg2.errors.ActiveSqlTransaction: SET TRANSACTION ISOLATION LEVEL must be called before any query | execution_options was applied after BEGIN via a raw SET TRANSACTION call | Use SQLAlchemy's execution_options hook, not raw SQL; it fires before BEGIN |
asyncpg.exceptions.ActiveSQLTransactionError | Same as above on asyncpg | Ensure await session.connection(execution_options=...) precedes any DML or SELECT |
| Silent fallback to engine default isolation level | execution_options(isolation_level=...) was called on the Engine object itself, not on a Connection | Call execution_options on the Connection returned by engine.connect(), not on engine |
pool.reset_on_return overwrite issue — isolation not reset between requests | Using NullPool or StaticPool where connection is not returned to pool | Use QueuePool (default) or call conn.execution_options(isolation_level=<engine_default>) on reuse |
StaleDataError raised unexpectedly under SERIALIZABLE | Concurrent transaction modified a row between your read and your write; optimistic-lock version mismatch | Catch StaleDataError or OperationalError with SQLSTATE 40001; retry the full transaction from a new session |
isolation_level string not recognised — OperationalError: unknown isolation level | Dialect-specific string mismatch (e.g. "REPEATABLE_READ" with underscore on PostgreSQL) | PostgreSQL uses spaces: "REPEATABLE READ", "READ COMMITTED", "SERIALIZABLE". MySQL uses underscores: "REPEATABLE_READ" |
Advanced Isolation Optimization
The AUTOCOMMIT Pseudo-Isolation Level
SQLAlchemy exposes "AUTOCOMMIT" as an isolation level string. Setting it bypasses SQLAlchemy's transaction management entirely — no BEGIN is issued, and each statement is committed immediately by the DBAPI. This is appropriate for DDL operations that must run outside a transaction (e.g., CREATE INDEX CONCURRENTLY on PostgreSQL, which cannot run inside a transaction block) and for calling stored procedures that manage their own transaction state.
from sqlalchemy import create_engine, text
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
def create_index_concurrently(table: str, column: str) -> None:
"""CREATE INDEX CONCURRENTLY must run outside a transaction block."""
with engine.connect() as conn:
conn = conn.execution_options(isolation_level="AUTOCOMMIT")
# No BEGIN issued; each statement commits immediately
conn.execute(
text(f"CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_{table}_{column} ON {table} ({column})")
)
# Connection returned to pool; pool resets to engine default (not AUTOCOMMIT)
Do not use AUTOCOMMIT for normal application reads or writes. Without transaction boundaries, partial writes cannot be rolled back and constraint violations leave the database in an inconsistent state.
Mixing Isolation Levels Within a Single Request
A common pattern in financial applications is to use READ COMMITTED for the bulk of a request (lowest overhead, best concurrency) and escalate to SERIALIZABLE only for the critical write path. Because isolation level is per-transaction, you can do this within a single HTTP request using two separate sessions that each open and close their own connection:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy import select, text
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
SessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
async def checkout(cart_id: int, user_id: int) -> dict:
# Phase 1: read cart contents under READ COMMITTED (default)
async with SessionFactory() as read_session:
cart_items = (await read_session.execute(
select(CartItem).where(CartItem.cart_id == cart_id)
)).scalars().all()
# Phase 2: commit the order under SERIALIZABLE to prevent double-booking
async with SessionFactory() as write_session:
await write_session.connection(
execution_options={"isolation_level": "SERIALIZABLE"}
)
async with write_session.begin():
for item in cart_items:
product = await write_session.get(Product, item.product_id)
if product.stock_qty < item.quantity:
raise ValueError(f"Insufficient stock for {product.name}")
product.stock_qty -= item.quantity
order = Order(user_id=user_id, status="confirmed")
write_session.add(order)
return {"order_id": order.id, "items": len(cart_items)}
This pattern avoids paying the SERIALIZABLE overhead for non-critical reads while providing full serializability guarantees exactly where they matter.
Using Execution Options on Scoped Sessions in Frameworks
In FastAPI and similar frameworks, sessions are typically created by a dependency and shared across the request. When you need a non-default isolation level for a specific endpoint, retrieve the underlying connection from the injected session rather than creating a new session:
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
router = APIRouter()
async def get_session() -> AsyncSession:
async with AsyncSessionFactory() as session:
yield session
@router.post("/transfer")
async def transfer_endpoint(
src_id: int,
dst_id: int,
amount: int,
session: AsyncSession = Depends(get_session),
) -> dict:
# Escalate isolation BEFORE the first statement in this endpoint
await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
async with session.begin():
src = await session.get(Account, src_id)
dst = await session.get(Account, dst_id)
if src.balance < amount:
raise ValueError("Insufficient balance")
src.balance -= amount
dst.balance += amount
return {"status": "ok"}
The dependency yields the session before any statements execute, so session.connection() fires before autobegin. The isolation override applies to the entire request transaction without leaking into unrelated endpoints — each request gets a fresh session from the pool.
Frequently Asked Questions
Can I set the isolation level using a raw SET TRANSACTION SQL statement instead of execution_options?
You can, but it is fragile. SQLAlchemy may issue its own BEGIN before your SET TRANSACTION fires, causing the database to reject the statement with an ActiveSqlTransaction error. The execution_options hook is the only safe way because it fires at the DBAPI layer before BEGIN is emitted.
Does the isolation level override persist across commits within the same connection?
No. After COMMIT or ROLLBACK, the connection is returned to the pool and the pool resets it to the engine-wide default. Each new checkout (each new with engine.connect() or session context) starts fresh. You must re-apply execution_options on every new connection or session if you need a non-default level.
Is there a performance cost to SERIALIZABLE isolation on PostgreSQL? PostgreSQL's Serializable Snapshot Isolation adds CPU and memory overhead to track read/write dependencies — roughly 5–10% on typical OLTP workloads, more on read-heavy analytical queries. The primary cost is serialization failures requiring retries, not locking. Benchmark your specific access pattern before committing SERIALIZABLE engine-wide.
How do I verify which isolation level a live connection is actually using?
For PostgreSQL, execute SELECT current_setting('transaction_isolation') inside the transaction. For MySQL, use SELECT @@transaction_isolation. In tests, you can listen on the after_begin event to assert the isolation level matches expectations.
Related
- Transaction Isolation and Commit Strategies — parent guide covering isolation level theory, savepoints, optimistic locking, and retry patterns.
- Session Lifecycle and Scope Management — how session creation and closure affect when isolation overrides must be applied.
- Core vs ORM Architecture Decisions — when to use a raw
Connectioninstead of aSessionfor isolation-sensitive operations.