Migrating Legacy SQLAlchemy 1.4 Code to 2.0 Syntax
SQLAlchemy 2.0 is not a compatibility release — it is a deliberate, opinionated redesign that removes the implicit behaviors that made 1.x code unpredictable at scale. If you are running SQLAlchemy 1.4 in production, you already have a migration path available, because 1.4 ships with every 2.0 deprecation warning turned on. The question is how to move through that path without breaking a live application. This page covers the full migration arc — from surfacing your first RemovedIn20Warning all the way to running a fully typed, async-capable 2.0 codebase — and is part of the broader SQLAlchemy 2.0 core and ORM architecture guide.
Concept & Execution Model
What Actually Changed Between 1.4 and 2.0
SQLAlchemy 1.x was built around three patterns that became liabilities as Python matured: implicit autocommit behavior on Session, the "legacy" Query API that mixed ORM construction with SQL generation, and a unit-of-work model that left transaction boundaries ambiguous under async I/O. SQLAlchemy 2.0 replaces all three.
The core conceptual shift is that every database interaction now flows through an explicit Connection or Session that owns a transaction from the moment it is acquired. There is no autocommit by default, no implicit DBAPI connection checkout, and no query that can silently start a transaction behind the application's back.
The second shift is in the ORM mapping layer. The legacy Column-based declarative model still works in 2.0 (it will not be removed until 3.0), but the new Mapped[T] annotation-driven model is the preferred approach. It enables Python type checkers to verify your ORM queries, not just your Python code.
The third shift is execution: session.execute(select(User)) replaces session.query(User). This distinction matters because the new-style select() construct is a first-class SQL expression — it composes with CTEs, subqueries, and window functions the same way Core SQL does, without the hidden impedance mismatch that made complex Query objects fragile.
The 1.4 Bridge Release
SQLAlchemy's migration philosophy is incremental. Version 1.4 runs both APIs simultaneously and emits RemovedIn20Warning for every deprecated call site. Before you touch a single line of application code, you should enable those warnings and count them:
import warnings
import sqlalchemy.exc
warnings.filterwarnings("error", category=sqlalchemy.exc.RemovedIn20Warning)
With this in place, any deprecated call becomes a raised exception in your test suite, giving you a concrete, exhaustive list of migration sites rather than a vague guess. The detailed workflow for triaging those warnings is covered in fixing RemovedIn20Warning deprecation warnings.
Query Construction & Async Execution Patterns
From session.query() to session.execute(select())
The most pervasive change in everyday SQLAlchemy code is replacing session.query(Model) calls with session.execute(select(Model)). The surface area is large — it touches every repository, every service function, every background task — but the mechanical transformation is consistent enough to automate with a codemod. The step-by-step codemod checklist walks through each pattern with sed and AST transforms.
Before (1.x Query API):
from sqlalchemy.orm import Session
from myapp.models import User, Order
def get_active_users(session: Session) -> list[User]:
return (
session.query(User)
.filter(User.is_active == True)
.order_by(User.created_at.desc())
.all()
)
def get_user_orders(session: Session, user_id: int) -> list[Order]:
return (
session.query(Order)
.filter(Order.user_id == user_id, Order.status != "cancelled")
.all()
)
After (2.0 style):
from sqlalchemy import select
from sqlalchemy.orm import Session
from myapp.models import User, Order
def get_active_users(session: Session) -> list[User]:
stmt = (
select(User)
.where(User.is_active.is_(True))
.order_by(User.created_at.desc())
)
return session.execute(stmt).scalars().all()
def get_user_orders(session: Session, user_id: int) -> list[Order]:
stmt = (
select(Order)
.where(Order.user_id == user_id, Order.status != "cancelled")
)
return session.execute(stmt).scalars().all()
Note the .scalars() call. session.execute() returns a CursorResult containing Row objects. When you select a single ORM entity, .scalars() unwraps those rows to give you entity instances directly. When you select multiple columns or multiple entities, omit .scalars() and work with the Row named-tuple interface instead.
Scalar Variants and Cardinality
The 2.0 result API is explicit about cardinality in a way the old Query was not:
from sqlalchemy import select, func
from sqlalchemy.orm import Session
from myapp.models import Product, Tenant, Order
def get_product_by_sku(session: Session, sku: str) -> Product | None:
stmt = select(Product).where(Product.sku == sku)
# scalar_one_or_none(): None if missing, raises if duplicates
return session.execute(stmt).scalar_one_or_none()
def require_tenant(session: Session, slug: str) -> Tenant:
stmt = select(Tenant).where(Tenant.slug == slug)
# scalar_one(): raises NoResultFound or MultipleResultsFound
return session.execute(stmt).scalar_one()
def count_pending_orders(session: Session, tenant_id: int) -> int:
stmt = select(func.count()).select_from(Order).where(
Order.tenant_id == tenant_id,
Order.status == "pending",
)
return session.execute(stmt).scalar_one()
These cardinality helpers turn silent data surprises into loud exceptions — a meaningful improvement over the old .one(), .first(), and .all() patterns that behaved differently depending on which Query method you called.
Async Execution
Moving to AsyncSession changes the call site minimally because the underlying query construction is identical. The await keyword is the only structural addition:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Invoice, User
async def get_user_invoices(
session: AsyncSession, user_id: int, limit: int = 50
) -> list[Invoice]:
stmt = (
select(Invoice)
.where(Invoice.user_id == user_id)
.order_by(Invoice.issued_at.desc())
.limit(limit)
)
result = await session.execute(stmt)
return result.scalars().all()
async def get_invoice_with_user(
session: AsyncSession, invoice_id: int
) -> tuple[Invoice, User] | None:
stmt = (
select(Invoice, User)
.join(User, Invoice.user_id == User.id)
.where(Invoice.id == invoice_id)
)
row = (await session.execute(stmt)).first()
if row is None:
return None
return row.Invoice, row.User
The engine setup uses postgresql+asyncpg:// by default. Constructing a 2.0-style async engine looks like:
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession,
)
expire_on_commit=False is almost always the right default in async code because expired attributes cannot be lazily loaded after the event loop returns — the connection is gone by then.
State Management & Session Boundaries
Explicit Transactions Replace Autocommit
In SQLAlchemy 1.x, Session had a subtle behavior called "autobegin" that started a transaction on the first operation but committed only when you called .commit() — or never, depending on scoping. If you were using the Core Connection directly, DBAPI-level autocommit was available but opt-in and inconsistently documented.
In 2.0, the model is explicit: every Connection and Session begins a transaction immediately on acquisition, and you are responsible for committing or rolling back. There is no ambiguity.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
from myapp.models import User, Order
engine = create_engine("postgresql+psycopg2://user:password@localhost/mydb")
# 2.0 Core usage — context manager always commits or rolls back
with engine.connect() as conn:
conn.execute(
text("UPDATE orders SET status = 'shipped' WHERE id = :id"),
{"id": 42},
)
conn.commit() # explicit — no autocommit
# ORM session — same discipline
with Session(engine) as session:
with session.begin():
user = session.get(User, 1)
user.email = "new@example.com"
# commit is implicit at the end of session.begin() block
# rollback is automatic on exception
session.begin() as a context manager is the clearest way to express "this block is one transaction." If you need nested savepoints:
from sqlalchemy.orm import Session
from myapp.models import Order, Invoice
def create_order_with_invoice(
session: Session, user_id: int, amount: float
) -> Order:
order = Order(user_id=user_id, amount=amount, status="pending")
session.add(order)
session.flush() # assign order.id without committing
try:
with session.begin_nested(): # SAVEPOINT
invoice = Invoice(order_id=order.id, amount=amount)
session.add(invoice)
# if invoice creation fails, only the savepoint rolls back
except Exception:
# order survives; invoice did not — decide whether to re-raise
pass
return order
Session Scope and the Unit-of-Work Pattern
The 2.0 session scoping model matches what FastAPI and other async frameworks expect: one session per request, never shared across requests, never stored as a module-level singleton. The correct pattern for FastAPI dependency injection:
from collections.abc import AsyncGenerator
from fastapi import Depends, HTTPException, APIRouter
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.database import AsyncSessionLocal
from myapp.models import User
async def get_db() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
async with session.begin():
yield session
# commit or rollback happens automatically here
router = APIRouter()
@router.get("/users/{user_id}")
async def read_user(
user_id: int, db: AsyncSession = Depends(get_db)
) -> dict:
user = await db.get(User, user_id)
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return {"id": user.id, "email": user.email}
For a deep treatment of session lifetime options — including scoped_session alternatives and per-worker pooling — see session lifecycle and scope management.
Advanced Migration Patterns
ORM Relationships and Lazy Loading in Async Context
Lazy loading is the most common breakage point when migrating to async. In synchronous SQLAlchemy, accessing user.orders on an expired instance silently issues a SELECT. In async code, that access raises MissingGreenlet because there is no event loop context to perform I/O synchronously.
The solution is explicit eager loading everywhere an async session is involved:
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import User, Order, Product
# selectinload: separate IN query per relationship — good for collections
async def get_user_with_orders(
session: AsyncSession, user_id: int
) -> User | None:
stmt = (
select(User)
.where(User.id == user_id)
.options(selectinload(User.orders).selectinload(Order.items))
)
return (await session.execute(stmt)).scalar_one_or_none()
# joinedload: JOIN in main query — good for to-one relationships
async def get_order_with_product(
session: AsyncSession, order_id: int
) -> Order | None:
stmt = (
select(Order)
.where(Order.id == order_id)
.options(joinedload(Order.product))
)
# .unique() deduplicates rows produced by the JOIN
return (await session.execute(stmt)).unique().scalar_one_or_none()
Note the .unique() call when using joinedload — JOINs can produce duplicate rows when collections are involved, and .unique() deduplicates them by identity before the ORM assembles the result set.
Bulk Operations Migration
The legacy session.bulk_insert_mappings() and session.bulk_update_mappings() still work in 2.0 but are soft-deprecated. The 2.0 replacement uses session.execute() with Core INSERT and UPDATE constructs:
from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Product
async def bulk_update_prices(
session: AsyncSession,
updates: list[dict], # [{"id": 1, "price": 9.99}, ...]
) -> int:
result = await session.execute(update(Product), updates)
return result.rowcount
async def bulk_insert_products(
session: AsyncSession,
products: list[dict],
) -> None:
# asyncpg translates this to a pipeline of prepared statements
await session.execute(insert(Product), products)
For millions of rows, bypass the ORM entirely and use Core's executemany semantics, which asyncpg translates to a pipeline of prepared statements — dramatically faster than individual inserts because the server parses the statement once and binds parameters in batch.
Custom Base Classes and Registry Migration
If your legacy codebase uses a custom Base generated by declarative_base(), migrating to DeclarativeBase is straightforward:
# Before (1.x)
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
email = Column(String(255), nullable=False, unique=True)
is_active = Column(Boolean, default=True)
# After (2.0)
from sqlalchemy import String, Boolean
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(255), unique=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
The Mapped[T] annotation does two things at once: it tells the type checker the attribute type, and it tells SQLAlchemy the column is not nullable unless wrapped in Mapped[T | None]. This replaces nullable=False in most cases. The full annotation migration workflow — including relationship() types, column defaults, and the mypy plugin — is covered in the step-by-step guide to SQLAlchemy 2.0 type annotations.
Hybrid Architectures & Migration Strategies
Running 1.4 and 2.0 Code Side-by-Side
For large codebases, a big-bang migration is a liability. The practical approach is to run 1.4 with create_engine(..., future=True) during the transition. The future=True flag opts the engine into 2.0 execution semantics while keeping 1.4 installed, giving you a controlled surface to migrate service by service:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# 1.4 engine in 2.0 mode — detects 2.0 breakage before you upgrade
engine = create_engine(
"postgresql+psycopg2://user:password@localhost/mydb",
future=True, # 2.0 semantics on 1.4 runtime
)
# All code using this engine must use 2.0-style execute
with engine.connect() as conn:
result = conn.execute(text("SELECT count(*) FROM users"))
count = result.scalar_one()
With future=True, the legacy engine.execute() method raises immediately, the Connection no longer auto-commits, and session.execute() accepts only SQL expression constructs — not bare strings. This is the safest way to validate your migration without changing the library version in your lockfile.
Strangler-Fig Service Migration
For microservice architectures, each service can migrate independently if they share a common database. The recommended order:
- Services with pure read workloads first — they get the type safety and result API improvements immediately, with no transaction risk.
- Write-heavy services with well-isolated transactions next.
- Services that use
session.query()in complex inheritance hierarchies last — single-table inheritance had a behavior change in 2.0 regardingwith_polymorphic()that requires careful testing.
Mixing Sync and Async Code During Transition
If you are migrating a synchronous FastAPI or Flask application to async incrementally, run_sync() provides a bridge that lets you call synchronous SQLAlchemy code from an async context without blocking the event loop:
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Tenant
async def get_tenant_legacy_path(
session: AsyncSession, slug: str
) -> Tenant | None:
# run_sync executes the callable in a greenlet, allowing sync ORM calls
def _query(sync_session):
from sqlalchemy import select
return sync_session.execute(
select(Tenant).where(Tenant.slug == slug)
).scalar_one_or_none()
return await session.run_sync(_query)
This is a transitional tool — not a destination. It adds greenlet overhead and prevents full async throughput. Use it to unblock the migration, then replace the inner callable with a proper async implementation.
Alembic Compatibility During the Upgrade
Alembic 1.9+ is required for full SQLAlchemy 2.0 compatibility. Before upgrading SQLAlchemy, upgrade Alembic. Your env.py may call deprecated APIs — run your migration generation commands with SQLALCHEMY_WARN_20=1 set to catch any:
# In alembic/env.py — 2.0-compatible async migration runner
import asyncio
from alembic import context
from sqlalchemy.ext.asyncio import create_async_engine
from myapp.models import Base
def run_migrations_online() -> None:
connectable = create_async_engine(
"postgresql+asyncpg://user:password@localhost/mydb"
)
async def run_async_migrations() -> None:
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
asyncio.run(run_async_migrations())
def do_run_migrations(connection) -> None:
context.configure(connection=connection, target_metadata=Base.metadata)
with context.begin_transaction():
context.run_migrations()
Production Pitfalls & Anti-Patterns
Error Diagnostic Table
| Error string | Root cause | Fix |
|---|---|---|
RemovedIn20Warning: The Query.get() method is considered legacy | session.query(Model).get(pk) called in 1.4 | Replace with session.get(Model, pk) |
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called | Lazy relationship access inside async with AsyncSession | Add selectinload() or joinedload() to query options |
sqlalchemy.exc.InvalidRequestError: Can't attach instance ... another session | ORM object returned from one session used in another | Use session.merge(), re-query, or set expire_on_commit=False |
AttributeError: 'coroutine' object has no attribute 'scalars' | Missing await on session.execute() | Add await — all AsyncSession methods are coroutines |
sqlalchemy.exc.SAWarning: SELECT statement has a cartesian product | joinedload on a collection without .unique() | Add .unique() after execute, verify join condition |
sqlalchemy.exc.NoResultFound | scalar_one() called on empty result | Use scalar_one_or_none() or guard with explicit check |
psycopg2.errors.InFailedSqlTransaction: current transaction is aborted | Code continues after exception inside transaction without rollback | Wrap in session.begin() context manager — it rolls back automatically |
sqlalchemy.exc.TimeoutError: QueuePool limit of size X overflow Y reached | Connection pool exhausted — sessions held open too long | Shorten session lifetime, increase pool_size, check for connection leaks |
TypeError: Argument 'engine' to 'Session' has an unexpected type | Passing async engine to synchronous Session | Use AsyncSession for async engines; the two are not interchangeable |
ObjectNotExecutableError: Not an executable clause | Bare string passed to conn.execute() in 2.0 mode | Wrap raw SQL in text() |
Anti-Pattern: Module-Level Session
# WRONG — global session is shared across requests and threads
from sqlalchemy.orm import Session
from myapp.database import engine
session = Session(engine) # Never do this
def get_user(user_id: int):
return session.query(User).get(user_id) # double anti-pattern
The module-level session is not thread-safe and accumulates ORM identity map state across requests. Sessions are cheap to create — scope them to the request or task, not the module.
Anti-Pattern: Bare String Queries Without text()
In 2.0, conn.execute("SELECT 1") raises ObjectNotExecutableError. Always wrap raw SQL in text():
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def check_db_alive(session: AsyncSession) -> bool:
try:
await session.execute(text("SELECT 1"))
return True
except Exception:
return False
Anti-Pattern: expire_on_commit=True in Async Sessions
The default expire_on_commit=True works in sync code because attribute access triggers a lazy load. In async code it causes MissingGreenlet the moment you touch an expired attribute after a commit. Always set expire_on_commit=False in async session factories:
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
AsyncSessionLocal = async_sessionmaker(
engine,
expire_on_commit=False, # mandatory for async
class_=AsyncSession,
)
If you need fresh data after commit, re-query rather than relying on attribute expiry. Re-querying is explicit, testable, and has no hidden I/O side effects.
Anti-Pattern: Long-Running Sessions in Background Tasks
Celery tasks and other background workers that acquire a session at startup and reuse it across many job invocations will encounter stale connections, transaction bleed, and identity map bloat. Each task invocation should acquire and release its own session:
from celery import Celery
from sqlalchemy.orm import Session
from myapp.database import engine
from myapp.models import Order
app = Celery("tasks")
@app.task
def process_order(order_id: int) -> None:
# Correct: session is scoped to this task invocation only
with Session(engine) as session:
with session.begin():
order = session.get(Order, order_id)
if order is None:
return
order.status = "processing"
# commit on exit from session.begin() block
Anti-Pattern: Calling session.query() on an AsyncSession
AsyncSession does not have a .query() method. Calling it raises AttributeError. This often happens when shared helper functions written for sync code are passed an async session. The type checker will catch this if you annotate properly — another reason to complete the typing migration alongside the query API migration.
Frequently Asked Questions
Do I have to migrate to Mapped[] annotations to use SQLAlchemy 2.0?
No. The legacy Column-based declarative style is fully supported in 2.0 and will not be removed until a hypothetical 3.0. Mapped[] annotations and mapped_column() are the recommended new style, but you can migrate your query execution patterns first and your model definitions later. The two styles coexist in the same application without issue.
Can I use SQLAlchemy 2.0 with psycopg2 (synchronous driver)?
Yes. SQLAlchemy 2.0 works with any DBAPI driver. postgresql+psycopg2:// continues to work for synchronous applications. Async requires an async-capable driver: postgresql+asyncpg:// (asyncpg) or postgresql+psycopg:// (psycopg3 async mode). The query API is identical across drivers — only the engine URL and session class change.
What happens to session.query() in 2.0?
session.query() is present in 2.0 as a "legacy" feature. It works, it is tested, but it will not receive new capabilities and emits deprecation warnings when you enable them. It is scheduled for removal in SQLAlchemy 3.0. You should migrate to session.execute(select()) during your 2.0 upgrade, not after.
How do I handle Alembic database migrations during the SQLAlchemy upgrade?
Alembic 1.9+ supports SQLAlchemy 2.0. Update Alembic to 1.9+ before or alongside your SQLAlchemy upgrade. The env.py in your Alembic directory may need updating if it calls deprecated APIs — run your migrations with warnings-as-errors enabled to surface any issues before upgrading the runtime.
My codebase has thousands of session.query() calls. Is there an automated migration tool?
Yes. The legacy 1.4 to 2.0 codemod checklist covers libcst and sed transforms that handle the most common patterns — filter() → where(), .all() on a query → .scalars().all() on a result, and Query.get() → session.get(). No automated tool handles 100% of cases — review each transform site after running the codemod.
Does future=True in 1.4 make my code 100% compatible with 2.0?
It covers engine and connection behavior, but not ORM mapping syntax. Running future=True with warnings-as-errors covers roughly 80–90% of incompatibilities. The remaining cases include model definition patterns (Column vs mapped_column), relationship lazy loading behavior under async, and edge cases in single-table inheritance. Test thoroughly on 1.4 with future=True before the final upgrade.
What is the recommended Python version for SQLAlchemy 2.0?
SQLAlchemy 2.0 supports Python 3.7+, but the Mapped[T] annotation syntax with PEP 604 union types (int | None) and PEP 585 generics (list[User]) requires Python 3.10+ at runtime unless you use from __future__ import annotations to defer annotation evaluation. Python 3.11 is the practical minimum for new projects using the full 2.0 feature set.
How should I handle the expire_on_commit difference when my code works in both sync and async contexts?
Write two session factories — one for sync and one for async — with different expire_on_commit settings, rather than sharing a single configuration. The sync factory can leave the default (True), while the async factory sets it to False. Never pass an async engine to a sync session or vice versa.
Related
- Step-by-Step Guide to SQLAlchemy 2.0 Type Annotations — deep guide to the annotation-driven ORM mapping system,
mapped_column(), and the mypy plugin configuration. - Legacy 1.4 to 2.0 Codemod Checklist — exhaustive mechanical checklist of every deprecated pattern and its 2.0 replacement, with automated transform commands.
- Fixing RemovedIn20Warning Deprecation Warnings — how to surface, triage, and eliminate every deprecation warning before upgrading the runtime.
- Session Lifecycle and Scope Management — covers request-scoped sessions, scoped_session alternatives, and per-worker pool configuration for async applications.
- Transaction Isolation and Commit Strategies — how to reason about read committed vs repeatable read isolation, savepoints, and two-phase commit in SQLAlchemy 2.0.