Transaction Isolation and Commit Strategies in SQLAlchemy 2.0
SQLAlchemy 2.0 enforces explicit transaction boundaries throughout — part of the broader architectural discipline covered in Mastering SQLAlchemy 2.0 Core and ORM Architecture — giving you deterministic control over isolation levels, savepoint nesting, autobegin behaviour, async commit lifecycles, and optimistic concurrency. This guide covers the complete transaction toolkit: choosing between READ COMMITTED, REPEATABLE READ, and SERIALIZABLE; wiring up session.begin() and begin_nested() correctly; managing autobegin in async code; and implementing serialization-failure retries that survive production traffic.
Concept & Execution Model
SQLAlchemy 2.0's transaction model rests on two interlocking concepts: connection-level transactions managed by the engine/DBAPI and unit-of-work flushing managed by the Session. Every Session in 2.0 operates in autobegin mode by default — the moment you touch the session (add an object, execute a statement) a BEGIN is silently issued on the underlying connection. That autobegin transaction remains open until you explicitly call commit(), rollback(), or close the session.
The consequence is that isolation level must be set before the first SQL statement in a transaction, because databases bind isolation semantics to BEGIN. You cannot promote a running READ COMMITTED transaction to SERIALIZABLE mid-flight; the engine must start a new connection-level transaction with the desired level baked in from the outset.
# Sync — minimal explicit transaction with autobegin awareness
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
with Session(engine) as session:
# autobegin fires here; connection is checked out and BEGIN issued
order = session.get(Order, 42)
order.status = "confirmed"
session.commit() # flush → COMMIT → connection returned to pool
# session is now back in autobegin-pending state; no open transaction
# Async — same lifecycle with await discipline
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
AsyncSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
async def confirm_order(order_id: int) -> None:
async with AsyncSessionFactory() as session:
order = await session.get(Order, order_id)
order.status = "confirmed"
await session.commit()
The key 2.0 rule: every Session context manager guarantees a ROLLBACK on unhandled exception and a connection return to the pool on __aexit__. There is no implicit commit on close.
Query Construction & Async Execution Patterns
Choosing an Isolation Level
The three production-relevant levels each prevent a distinct category of anomaly:
| Isolation Level | Dirty Read | Non-Repeatable Read | Phantom Read | Use Case |
|---|---|---|---|---|
| READ COMMITTED | prevented | possible | possible | Default; OLTP reads |
| REPEATABLE READ | prevented | prevented | possible (MySQL: prevented) | Financial aggregations |
| SERIALIZABLE | prevented | prevented | prevented | Inventory, booking, ledger |
READ COMMITTED (PostgreSQL default) is safe for the majority of OLTP workloads. Each statement sees the most recently committed snapshot at statement start. If two concurrent transactions both read a row and then update it, the second writer sees the first writer's committed result.
REPEATABLE READ freezes the snapshot at transaction start. Re-reading a row returns the same value regardless of concurrent commits. PostgreSQL implements this via MVCC at zero extra lock cost; MySQL InnoDB uses next-key locking and can block more aggressively.
SERIALIZABLE provides the illusion that all transactions executed one after another. PostgreSQL's implementation (Serializable Snapshot Isolation, SSI) detects dangerous read/write dependencies and raises sqlalchemy.exc.OperationalError wrapping a 40001 serialization failure rather than blocking — making it retry-friendly when implemented correctly.
# Sync — per-transaction isolation override
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
def transfer_funds(engine: Engine, src_id: int, dst_id: int, amount: int) -> None:
"""Ledger transfer using SERIALIZABLE to prevent lost-update anomalies."""
with engine.connect() as conn:
conn = conn.execution_options(isolation_level="SERIALIZABLE")
with conn.begin():
src = conn.execute(
text("SELECT balance FROM accounts WHERE id = :id FOR UPDATE"),
{"id": src_id},
).scalar_one()
if src < amount:
raise ValueError("Insufficient funds")
conn.execute(
text("UPDATE accounts SET balance = balance - :amt WHERE id = :id"),
{"amt": amount, "id": src_id},
)
conn.execute(
text("UPDATE accounts SET balance = balance + :amt WHERE id = :id"),
{"amt": amount, "id": dst_id},
)
# Async — SERIALIZABLE with asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncConnection
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
async def transfer_funds_async(src_id: int, dst_id: int, amount: int) -> None:
async with async_engine.connect() as conn:
conn = await conn.execution_options(isolation_level="SERIALIZABLE")
async with conn.begin():
src = (await conn.execute(
text("SELECT balance FROM accounts WHERE id = :id FOR UPDATE"),
{"id": src_id},
)).scalar_one()
if src < amount:
raise ValueError("Insufficient funds")
await conn.execute(
text("UPDATE accounts SET balance = balance - :amt WHERE id = :id"),
{"amt": amount, "id": src_id},
)
await conn.execute(
text("UPDATE accounts SET balance = balance + :amt WHERE id = :id"),
{"amt": amount, "id": dst_id},
)
For the full mechanics of setting isolation level per session and per engine, see the guide on setting transaction isolation level per session.
State Management & Session Boundaries
Autobegin and When the BEGIN Actually Fires
In SQLAlchemy 2.0, Session.autobegin is True by default. No BEGIN is issued until the first operation that touches the database — a session.execute(), session.add(), session.get(), or session.flush(). Until that moment the session holds no connection. This lazy acquisition is beneficial in dependency-injection patterns (e.g., FastAPI) where a session is created per request but a request might short-circuit before hitting the database.
The corollary: if you set isolation level after already touching the database, you are already inside a transaction and the override has no effect. Always set execution_options before the first statement.
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy import select, text
async_engine = create_async_engine("postgresql+asyncpg://app:secret@localhost/orders")
AsyncSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
async def read_with_repeatable_read(user_id: int) -> dict:
"""Demonstrates setting isolation before the first statement fires autobegin."""
async with AsyncSessionFactory() as session:
# Set isolation BEFORE any statement — no BEGIN issued yet
await session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
# Now autobegin fires with REPEATABLE READ in effect
result = await session.execute(
select(User).where(User.id == user_id)
)
user = result.scalar_one()
# Second read within same transaction returns identical snapshot
same_result = await session.execute(
select(User).where(User.id == user_id)
)
await session.rollback() # read-only; explicit rollback to release connection
return {"id": user.id, "name": user.name}
session.begin() vs Autobegin
with session.begin() is the explicit form. It issues a real BEGIN immediately and guarantees COMMIT on clean exit or ROLLBACK on exception. Use it when you want the transaction boundary to be visible and when the outer function's contract requires knowing exactly when the transaction started.
Autobegin is appropriate for dependency-injection patterns where the framework owns the lifecycle: the DI container creates the session, middleware closes it, and the application layer calls commit() only on success.
flush() vs commit()
session.flush() emits pending INSERT/UPDATE/DELETE statements to the database within the current transaction, making the rows visible to subsequent statements in the same connection (useful for auto-generated primary keys or for triggering database-side BEFORE triggers). It does not commit. The rows are invisible to other connections and all locks are still held.
session.commit() calls flush(), issues COMMIT, and expires all in-memory ORM state (so the next access triggers a fresh SELECT). With expire_on_commit=False, state is not expired — critical in async code where expired lazy-loading raises MissingGreenlet.
Advanced Transaction Patterns
begin_nested() and SAVEPOINT
session.begin_nested() wraps a database SAVEPOINT. Releasing the savepoint (clean exit) commits the sub-transaction's work into the parent. Rolling back the savepoint (exception) undoes only the work done since the savepoint, leaving the parent transaction intact. This is the standard pattern for batch processing where per-item failures must not abort the entire batch.
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
def import_invoices(session: Session, rows: list[dict]) -> dict:
"""Import a batch of invoices; skip duplicates without aborting the batch."""
imported = 0
skipped = 0
with session.begin():
for row in rows:
try:
with session.begin_nested():
invoice = Invoice(
number=row["number"],
tenant_id=row["tenant_id"],
amount_cents=row["amount_cents"],
)
session.add(invoice)
session.flush() # trigger unique-constraint check now
imported += 1
except IntegrityError:
# ROLLBACK TO SAVEPOINT — outer transaction continues
skipped += 1
return {"imported": imported, "skipped": skipped}
The async variant uses await session.begin_nested() and is otherwise identical in structure.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import IntegrityError
async def import_invoices_async(session: AsyncSession, rows: list[dict]) -> dict:
imported = 0
skipped = 0
async with session.begin():
for row in rows:
try:
async with session.begin_nested():
invoice = Invoice(
number=row["number"],
tenant_id=row["tenant_id"],
amount_cents=row["amount_cents"],
)
session.add(invoice)
await session.flush()
imported += 1
except IntegrityError:
skipped += 1
return {"imported": imported, "skipped": skipped}
Optimistic Concurrency with version_id_col
SQLAlchemy's Mapper supports optimistic locking via version_id_col. Every UPDATE increments a version counter; the WHERE clause includes version = <expected>. If another transaction committed a change between your read and your write, the UPDATE matches zero rows and SQLAlchemy raises sqlalchemy.orm.exc.StaleDataError.
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
import datetime
class Base(DeclarativeBase):
pass
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
name: Mapped[str] = mapped_column(String(200))
stock_qty: Mapped[int] = mapped_column(Integer, default=0)
updated_at: Mapped[datetime.datetime] = mapped_column(
DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow
)
# Optimistic locking: every UPDATE increments this column
version_id: Mapped[int] = mapped_column(Integer, nullable=False)
__mapper_args__ = {
"version_id_col": version_id,
}
from sqlalchemy.orm import Session
from sqlalchemy.orm.exc import StaleDataError
def decrement_stock(session: Session, product_id: int, qty: int) -> bool:
"""Returns False if a concurrent write beat us to the update."""
try:
with session.begin():
product = session.get(Product, product_id)
if product is None or product.stock_qty < qty:
return False
product.stock_qty -= qty
# session.commit() will issue:
# UPDATE products SET stock_qty=..., version_id=<old+1>
# WHERE id=... AND version_id=<old>
return True
except StaleDataError:
return False
Serialization-Failure Retries
Under SERIALIZABLE isolation on PostgreSQL, concurrent transactions that form a read/write cycle cause one of them to fail with ERROR: could not serialize access due to concurrent update (SQLSTATE 40001). The correct response is to retry the entire transaction from scratch — not replay individual statements.
import asyncio
import random
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.exc import OperationalError
async_engine = create_async_engine(
"postgresql+asyncpg://app:secret@localhost/orders",
pool_size=10,
max_overflow=5,
)
AsyncSessionFactory = async_sessionmaker(async_engine, expire_on_commit=False)
_SERIALIZATION_FAILURE = "40001"
def _is_serialization_failure(exc: OperationalError) -> bool:
orig = getattr(exc, "orig", None)
if orig is None:
return False
# asyncpg exposes the SQLSTATE as orig.sqlstate
return getattr(orig, "sqlstate", None) == _SERIALIZATION_FAILURE
async def serializable_retry(
operation,
*,
max_attempts: int = 5,
base_delay: float = 0.05,
) -> None:
"""Run `operation(session)` under SERIALIZABLE, retrying on 40001."""
for attempt in range(max_attempts):
async with AsyncSessionFactory() as session:
await session.connection(
execution_options={"isolation_level": "SERIALIZABLE"}
)
try:
async with session.begin():
await operation(session)
return # success
except OperationalError as exc:
if not _is_serialization_failure(exc):
raise
if attempt == max_attempts - 1:
raise
# Exponential backoff with jitter before retry
delay = base_delay * (2 ** attempt) + random.uniform(0, base_delay)
await asyncio.sleep(delay)
The key insight: each retry must open a fresh session (and therefore a fresh connection), because the previous connection is in an aborted transaction state after a serialization failure. Reusing the same session will replay the failure immediately.
Hybrid Architectures & Migration Strategies
Mixing Core and ORM Transaction Boundaries
When a single request combines high-performance Core bulk inserts with ORM object reads, both must share the same underlying connection to participate in the same transaction. The pattern is await session.connection() to extract the raw AsyncConnection and pass it to Core operations.
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import insert, select
from sqlalchemy import Table, MetaData
metadata = MetaData()
async def bulk_import_then_verify(session: AsyncSession, rows: list[dict]) -> int:
"""Core bulk insert + ORM read in a single transaction."""
async with session.begin():
# Get the underlying connection (same transaction as session)
conn = await session.connection()
# Core bulk insert — bypasses ORM overhead
await conn.execute(
insert(Invoice.__table__),
rows,
)
# ORM read within the same transaction sees the just-inserted rows
result = await session.execute(
select(Invoice).where(Invoice.tenant_id == rows[0]["tenant_id"])
)
return len(result.scalars().all())
The Core vs ORM architecture decision guide covers when bypassing ORM unit-of-work is appropriate; transaction management is one of the clearest cases where Core's lower overhead pays off for write-heavy paths.
1.4 → 2.0 Migration of Transaction Code
The session lifecycle and scope management guide covers the broad 2.0 session changes. For transaction-specific migration:
| 1.4 Pattern | 2.0 Replacement |
|---|---|
session.autocommit=True | Remove; use session.commit() explicitly |
with session.begin(subtransactions=True) | session.begin_nested() (real SAVEPOINTs only) |
session.transaction.nested | session.get_nested_transaction() |
engine.execute(...) | with engine.connect() as conn: conn.execute(...) |
connection.autocommit = True | conn.execution_options(isolation_level="AUTOCOMMIT") |
The subtransactions parameter was the most commonly abused — it silently allowed nesting begin() calls in ways that did not correspond to real database savepoints. The 2.0 removal forces explicit savepoint semantics, which is a correctness win.
Isolation Levels Across Databases: Practical Differences
The ANSI SQL standard defines four isolation levels, but each database engine implements them differently — and the differences matter in production.
PostgreSQL: MVCC and SSI
PostgreSQL uses Multi-Version Concurrency Control (MVCC) for all isolation levels. Readers never block writers and writers never block readers. Under READ COMMITTED, each statement gets a fresh snapshot of committed rows. Under REPEATABLE READ and SERIALIZABLE, the snapshot is frozen at transaction start.
The practical upshot: on PostgreSQL, upgrading from READ COMMITTED to REPEATABLE READ adds almost zero overhead because MVCC snapshots are cheap. The only meaningful overhead increase comes with SERIALIZABLE, which activates the SSI conflict-detection engine.
PostgreSQL also adds a non-standard DEFERRABLE modifier to SERIALIZABLE transactions. A SERIALIZABLE DEFERRABLE transaction will block at startup until it can be guaranteed to complete without a serialization failure — useful for long read-only reporting transactions that cannot tolerate retries.
# PostgreSQL-only: SERIALIZABLE DEFERRABLE for reporting
from sqlalchemy import create_engine, event, text
from sqlalchemy.engine import Connection
engine = create_engine("postgresql+psycopg2://app:secret@localhost/orders")
@event.listens_for(engine, "begin")
def set_deferrable(conn: Connection) -> None:
"""Set DEFERRABLE on every transaction — only use on read-only reporting connections."""
conn.execute(text("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE DEFERRABLE"))
Use this only on a dedicated read-only reporting engine. Do not apply it to the main OLTP engine — the startup delay will kill throughput.
MySQL InnoDB: Next-Key Locking
MySQL InnoDB defaults to REPEATABLE READ and uses gap locks and next-key locks to prevent phantom reads even at that level (which is stricter than the SQL standard requires). This means that under REPEATABLE READ on MySQL, range queries can block concurrent inserts, creating contention that does not exist on PostgreSQL.
If you are migrating from MySQL to PostgreSQL, do not assume behaviour is identical at the same isolation level string. Test phantom-read scenarios explicitly.
SQLite: Write Serialization
SQLite does not support true concurrent transactions. All writers are serialized at the database level regardless of isolation setting. For development and testing with aiosqlite, isolation levels behave as expected for reads but write concurrency is an illusion. Never use SQLite isolation settings to validate production PostgreSQL behaviour.
Production Pitfalls & Anti-Patterns
- Setting isolation level after the first statement: The
BEGINhas already been issued with the engine's default level. The override is silently ignored on PostgreSQL. Always callconnection.execution_options(isolation_level=...)orsession.connection(execution_options=...)before any DML or SELECT. - Reusing an
AsyncSessionafter a serialization failure: After a40001error, the connection's transaction is aborted. Callingsession.rollback()and retrying within the same session object sometimes works but is fragile — always open a new session for each retry attempt. - Flushing without committing and expecting other connections to see the rows:
flush()keeps rows in the current transaction. No other connection sees them untilCOMMIT. Misunderstanding this causes phantom "missing data" bugs in multi-process workers. - Using
expire_on_commit=True(the default) in async code after a commit: Post-commit expiry causes attribute access to trigger lazy loads. In async SQLAlchemy, lazy loading is not supported without special greenlet wrapping, raisingMissingGreenlet. Setexpire_on_commit=Falseonasync_sessionmaker. - Nesting
session.begin()insidesession.begin(): In 2.0, this raisesInvalidRequestError: A transaction is already begun on this Session. Usebegin_nested()for sub-transaction semantics. - Catching
Exceptiontoo broadly around savepoints: Catching a bareExceptionin the outerexceptblock after abegin_nested()failure may swallowKeyboardInterruptorSystemExit. Catch specific SQLAlchemy exceptions (IntegrityError,OperationalError) to let non-database signals propagate.
Understanding Isolation Anomalies in Practice
Theory is clearest when grounded in concrete failure scenarios. Here are three anomalies that regularly surface in production systems and the isolation level required to prevent each.
Lost Update — two sessions both read the same row, compute a new value, and write it back. Session B's write overwrites Session A's change silently. This occurs under READ COMMITTED when neither session uses SELECT FOR UPDATE. Fix: use SELECT FOR UPDATE or upgrade to SERIALIZABLE.
Non-Repeatable Read — a session reads a row, another session updates and commits it, and the first session reads the row again within the same transaction and gets a different value. This occurs under READ COMMITTED. Fix: upgrade to REPEATABLE READ.
Write Skew — two sessions read overlapping data, each decides it is safe to make a write based on what they read, and both writes proceed even though the combined result violates a constraint that neither could detect alone. Classic example: two physicians both see "at least one doctor on call" and both simultaneously go off-call. REPEATABLE READ does not prevent this. Fix: use SERIALIZABLE (PostgreSQL SSI detects the dangerous dependency) or use SELECT FOR UPDATE on the rows being checked.
Understanding which anomaly your application is vulnerable to determines the minimum isolation level you need — and prevents the common mistake of applying SERIALIZABLE everywhere when REPEATABLE READ with FOR UPDATE would suffice at lower cost.
Frequently Asked Questions
What is the difference between session.begin() and autobegin in SQLAlchemy 2.0?
Autobegin is the implicit BEGIN that fires on the first database operation. It is always active unless you explicitly disable it. session.begin() is the explicit context manager that issues BEGIN immediately and binds commit/rollback to the with block exit. Use session.begin() when you need the transaction boundary to be visible in code and when the commit must be guaranteed by the same lexical scope.
When should I use REPEATABLE READ instead of SERIALIZABLE? Use REPEATABLE READ when you need stable aggregate reads within a single transaction (for example, summing a user's orders while other transactions may be adding new ones) but can tolerate phantom rows appearing between two range queries. SERIALIZABLE is the correct choice for any workflow where two transactions read overlapping rows and both write based on what they read — inventory decrements, seat reservations, financial ledger entries.
Does session.flush() release row-level locks?
No. Locks acquired during a flush (due to SELECT ... FOR UPDATE or INSERT/UPDATE) are held until COMMIT or ROLLBACK. flush() only synchronizes ORM state to the database within the current transaction. Calling flush() repeatedly within a long transaction accumulates held locks and increases the risk of deadlocks.
How do serialization failures behave differently from deadlocks?
A deadlock (40P01) occurs when two transactions each hold a lock the other needs; the database chooses one as a victim and rolls it back. A serialization failure (40001) occurs when PostgreSQL's SSI detects a dangerous concurrent read/write pattern that would violate serializability, even without explicit locking. Both require full transaction retry. Deadlocks are more common under READ COMMITTED with SELECT FOR UPDATE; serialization failures are exclusive to SERIALIZABLE.
Can I use version_id_col with async sessions?
Yes. Optimistic locking via version_id_col is implemented entirely at the SQLAlchemy mapper level — it modifies the generated UPDATE SQL. It works identically with AsyncSession. The StaleDataError is raised by SQLAlchemy after checking rowcount on the DBAPI cursor result, which is available synchronously even in async drivers.
Related
- Mastering SQLAlchemy 2.0 Core and ORM Architecture — parent guide covering the full architectural picture this page fits into.
- Setting Transaction Isolation Level Per Session — step-by-step syntax for engine-wide vs per-transaction isolation overrides.
- Session Lifecycle and Scope Management — how session creation, expiry, and closure interact with transaction boundaries.
- Core vs ORM Architecture Decisions — when to drop to Core execute for transaction-sensitive bulk operations.