Mastering SQLAlchemy 2.0 Core and ORM Architecture
SQLAlchemy 2.0 represents a fundamental architectural realignment — unifying the historically separate Core and ORM execution models into a single, type-safe, and async-native API surface. This guide covers the architectural foundations, declarative modeling with Mapped[], async session and transaction management, production pool tuning, and the precise error patterns that trip up teams during the upgrade. It is written for Python developers, data engineers, and backend architects building production systems on PostgreSQL, MySQL, or SQLite using asyncpg, psycopg3, or aiosqlite.
Architectural Foundations
SQLAlchemy's 2.0 release resolves a longstanding tension in its design. Before 2.0, the Core and ORM maintained separate execution paths: the Core's Connection.execute() accepted only clause elements tied to Table objects, while the ORM's session.query() operated in a parallel but incompatible world. Applications that needed both often duplicated logic across both layers or suffered impedance mismatch when passing objects between them.
The Unified Execution Pipeline
In 2.0, the Session and Connection share identical method signatures: both accept select(), insert(), update(), and delete() constructs indifferently. The compilation pipeline — clause element → compiled SQL string → dialect-specific parameter binding — runs once, regardless of whether the caller is the ORM or a raw connection. This unification has several concrete consequences:
- An ORM
select(User)and a Coreselect(user_table)pass through the same compiler. The difference is only in how results are hydrated: ORM results pass through the identity map; Core results return lightweightRownamedtuples. - Statements built with Core can be passed directly to
session.execute(), allowing hybrid queries that span mapped and unmapped tables in a single round trip. - The async variants (
AsyncSession,AsyncConnection) are thin adapters over their sync counterparts — there is no separate async compiler.
What 2.0 Removed and Why
The session.query() interface, Query.filter(), and all legacy chaining patterns emit RemovedIn20Warning in compatibility mode and are fully absent in strict 2.0. These APIs tied statement construction to session state, making query objects opaque to static analysis and impossible to compose cleanly. The replacement — standalone select() returned from a function, executed separately — is a functional style that aligns with how SQL actually works: statements are values, not methods on a stateful object.
The declarative_base() function from sqlalchemy.ext.declarative is similarly removed in favor of subclassing DeclarativeBase, which integrates with Python's class hierarchy directly and avoids the metaclass conflicts that declarative_base() created when combining multiple inheritance with ABCs.
The 2.0 Style Code Example
The following shows the minimal boilerplate for a production-ready async setup, demonstrating how Core and ORM share the same engine:
# Full 2.0 bootstrap — Core and ORM on one engine
from sqlalchemy import String, Integer, select, insert
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
)
engine = create_async_engine(
"postgresql+asyncpg://app:secret@db.internal/appdb",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
async_session = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
customer_id: Mapped[int] = mapped_column(Integer, nullable=False)
status: Mapped[str] = mapped_column(String(30), default="pending")
total_cents: Mapped[int] = mapped_column(Integer, default=0)
# ORM read via session — same select() construct as Core
async def get_pending_orders(session: AsyncSession) -> list[Order]:
stmt = select(Order).where(Order.status == "pending")
result = await session.execute(stmt)
return list(result.scalars())
# Core bulk insert via engine connection — bypasses ORM overhead
async def seed_orders(rows: list[dict]) -> None:
async with engine.begin() as conn:
await conn.execute(insert(Order), rows)
Both paths above compile through the same execution engine. The ORM path returns Order instances tracked in the session identity map; the Core path returns an CursorResult with raw row data and zero object allocation overhead.
Key Component Deep-Dive: Core vs ORM Unified Execution
The distinction between Core and ORM in 2.0 is not architectural — it is a question of result hydration and state tracking. Understanding where the boundary lies determines whether you need Session.execute() or Connection.execute(), and whether your results are tracked entities or raw Row objects.
When to Execute Through the ORM
Use the ORM session for operations that benefit from the identity map and Unit of Work:
- Loading entities whose relationships you will traverse (eager or lazy)
- Mutations to mapped objects tracked by the session (attribute changes, relationship appends)
- Patterns requiring
session.refresh(),session.expunge(), orsession.merge() - Any code that relies on event hooks (
@event.listens_for(Session, 'before_flush'))
# ORM path: relationship traversal and tracked mutations
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
async def fulfill_order(session: AsyncSession, order_id: int) -> None:
stmt = (
select(Order)
.where(Order.id == order_id)
.options(selectinload(Order.items))
)
result = await session.execute(stmt)
order = result.scalar_one()
for item in order.items:
item.shipped = True # tracked by session — flushed on commit
order.status = "shipped"
await session.commit()
Relationship loading strategies in async contexts require care. Lazy loading ("select") is disabled by default in async mode and raises MissingGreenlet if accessed outside an active async context. Use selectinload() or joinedload() at query time, or configure lazy="raise" on the relationship definition to catch missing eager loads at development time rather than production.
For a thorough comparison of when Core outperforms ORM — and the exact query patterns where each is appropriate — see the Core vs ORM architecture decisions guide.
When to Execute Through Core
Use a raw Connection or engine.connect() for:
- Bulk inserts where per-row identity map overhead is unacceptable (millions of rows)
- DDL operations (
CREATE TABLE,ALTER TABLE) that should not go through the ORM - Reporting queries that return aggregated data, never hydrated into entities
- Schema-level operations such as sequence manipulation or advisory locks
# Core path: bulk insert with executemany — no ORM overhead
from sqlalchemy import insert, text
from sqlalchemy.ext.asyncio import AsyncEngine
async def bulk_import_invoices(engine: AsyncEngine, rows: list[dict]) -> int:
async with engine.begin() as conn:
result = await conn.execute(insert(Order), rows)
return result.rowcount
The engine.begin() context manager opens a connection, begins a transaction, and commits (or rolls back on exception) automatically. There is no session, no identity map, and no Unit of Work — just a direct SQL round trip.
Key Component Deep-Dive: Type-Safe Mapped Declarative Modeling
The Mapped[] annotation system is the most visible 2.0 change for application developers. It replaces Column() declarations with Python type annotations, integrating SQLAlchemy's column definitions directly into the language's static typing machinery.
Declaring Models with Mapped
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal
from typing import Optional
from sqlalchemy import String, Numeric, ForeignKey, DateTime, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[int] = mapped_column(primary_key=True)
slug: Mapped[str] = mapped_column(String(80), unique=True, nullable=False)
invoices: Mapped[list[Invoice]] = relationship(
"Invoice", back_populates="tenant", lazy="raise"
)
class Invoice(Base):
__tablename__ = "invoices"
id: Mapped[int] = mapped_column(primary_key=True)
tenant_id: Mapped[int] = mapped_column(ForeignKey("tenants.id"), nullable=False)
amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
issued_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
paid_at: Mapped[Optional[datetime]] = mapped_column(DateTime(timezone=True))
tenant: Mapped[Tenant] = relationship("Tenant", back_populates="invoices")
Key rules enforced by Mapped[]:
Mapped[str]is non-nullable by default;Mapped[Optional[str]]orMapped[str | None]maps tonullable=True.mapped_column()accepts the same arguments as the oldColumn()but inherits nullability and Python type from the annotation.relationship()withback_populatesreplaces the oldbackrefstring shortcut. Bidirectional relationships must be declared on both sides explicitly, which improves discoverability and eliminates silent misconfiguration.lazy="raise"on relationships forces eager loading at query time in development — any accidental lazy load raisessqlalchemy.exc.InvalidRequestErrorimmediately rather than silently issuing a synchronous query.
Relationships and Cascade Configuration
Cascade behavior on relationships determines what happens to related objects when a parent is deleted or expunged. The default ("save-update, merge") is correct for most cases. Add cascade="all, delete-orphan" only when child objects are owned exclusively by the parent and have no independent existence:
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
customer_id: Mapped[int] = mapped_column(ForeignKey("customers.id"), nullable=False)
items: Mapped[list[OrderItem]] = relationship(
"OrderItem",
back_populates="order",
cascade="all, delete-orphan",
lazy="raise",
)
With delete-orphan, removing an OrderItem from order.items will issue a DELETE for that row during flush. Without it, the orphaned row remains in the database with a null foreign key (or raises an integrity error if the column is non-nullable).
Teams migrating from 1.4 will find the step-by-step migration guide for 2.0 type annotations useful for converting Column() declarations to mapped_column() systematically without introducing regressions.
Key Component Deep-Dive: AsyncSession and Async Engine Concurrency
The async layer in SQLAlchemy 2.0 is not a rewrite — it is a thin greenlet-based bridge that allows synchronous DBAPI drivers to be run on asyncio-compatible workers, combined with native async support for drivers like asyncpg and psycopg3 that expose async interfaces natively.
Creating a Production-Grade Async Engine
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
# Production settings for a PostgreSQL application handling 500 req/s
engine = create_async_engine(
"postgresql+asyncpg://app:secret@db.internal/appdb",
# Pool sizing: set pool_size to expected steady-state concurrent queries
# max_overflow for burst headroom — max total connections = pool_size + max_overflow
pool_size=20,
max_overflow=10,
# Return connections to pool after 30 minutes of idle (default: -1 = never)
pool_recycle=1800,
# Emit a lightweight "SELECT 1" before lending a connection — essential for
# cloud databases that silently close idle connections after 5-10 minutes
pool_pre_ping=True,
# pool_timeout: raise TimeoutError if no connection is available within N seconds
pool_timeout=30,
echo=False,
)
AsyncSessionLocal = async_sessionmaker(
engine,
expire_on_commit=False, # Prevents lazy loads after commit in async contexts
autoflush=True,
autobegin=True,
)
The expire_on_commit=False setting is critical in async applications. The default (expire_on_commit=True) expires all loaded attributes after a commit, so accessing them would trigger a lazy load to refresh data. In async contexts that lazy load raises MissingGreenlet. Setting it to False retains attribute values post-commit, which is safe as long as you don't expect the session to reflect concurrent external changes.
Transaction Management Patterns
SQLAlchemy 2.0 supports three transaction management patterns, each with distinct semantics:
from sqlalchemy.ext.asyncio import AsyncSession
# Pattern 1: Autobegin + explicit commit (recommended for most applications)
async def create_invoice(session: AsyncSession, data: dict) -> Invoice:
invoice = Invoice(**data)
session.add(invoice)
await session.commit()
return invoice
# Pattern 2: Explicit begin() context manager — cleaner for multi-step operations
async def transfer_balance(
session: AsyncSession,
from_tenant_id: int,
to_tenant_id: int,
amount_cents: int,
) -> None:
async with session.begin():
# All statements inside run in one atomic transaction
from_stmt = select(Tenant).where(Tenant.id == from_tenant_id).with_for_update()
to_stmt = select(Tenant).where(Tenant.id == to_tenant_id).with_for_update()
from_tenant = (await session.execute(from_stmt)).scalar_one()
to_tenant = (await session.execute(to_stmt)).scalar_one()
from_tenant.balance_cents -= amount_cents
to_tenant.balance_cents += amount_cents
# session.begin() commits on clean exit, rolls back on exception
# Pattern 3: Nested transactions via savepoints (for partial rollback)
async def risky_import(session: AsyncSession, rows: list[dict]) -> list[str]:
errors = []
for row in rows:
try:
async with session.begin_nested(): # SAVEPOINT
session.add(Invoice(**row))
except Exception as exc:
errors.append(f"Row {row.get('id')}: {exc}")
await session.commit()
return errors
The transaction isolation and commit strategies guide covers how to set per-session isolation levels (SERIALIZABLE, REPEATABLE READ, READ COMMITTED) and the trade-offs between them in high-concurrency environments.
Session Lifecycle in Web Frameworks
In FastAPI and Starlette, the session lifecycle should be scoped to the HTTP request. The session lifecycle and scope management guide provides the dependency injection patterns in depth. The canonical FastAPI pattern:
from typing import AsyncGenerator
from fastapi import Depends, FastAPI
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
async def get_session() -> AsyncGenerator[AsyncSession, None]:
async with AsyncSessionLocal() as session:
yield session
@app.get("/orders/{order_id}")
async def read_order(
order_id: int,
session: AsyncSession = Depends(get_session),
) -> dict:
order = await session.get(Order, order_id)
if order is None:
from fastapi import HTTPException
raise HTTPException(status_code=404)
return {"id": order.id, "status": order.status, "total_cents": order.total_cents}
The async with AsyncSessionLocal() as session pattern ensures the session is closed (and the connection returned to the pool) at the end of the request, regardless of whether the handler raised an exception. Never share an AsyncSession across requests or background tasks — session state is not thread-safe and assumes single-owner semantics.
Advanced Patterns & Production Configuration
Pool Sizing for Real Workloads
Pool sizing is one of the most consequential production configuration decisions. The correct formula depends on your database server's max_connections, the number of application replicas, and your query duration distribution.
For a PostgreSQL instance with max_connections = 200 running 4 application pods:
connections_per_pod = floor(max_connections / num_pods) - headroom
pool_size = connections_per_pod * 0.7 # steady-state
max_overflow = connections_per_pod * 0.3 # burst
Concretely: pool_size=25, max_overflow=10 per pod gives a maximum of 140 connections across 4 pods (35 × 4), leaving 60 connections free for migrations, monitoring agents, and admin queries.
RDS Proxy and PgBouncer change this equation: the proxy handles multiplexing, so your application pool size can be much smaller:
# Behind RDS Proxy or PgBouncer in transaction mode:
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=5, # Proxy multiplexes, so small application-side pool
max_overflow=2,
pool_pre_ping=True,
# asyncpg prepared statement cache must be disabled for PgBouncer transaction mode
connect_args={"statement_cache_size": 0},
)
The async engine and connection pool configuration guide covers pool tuning in depth, including the pool_recycle, pool_timeout, and pool_reset_on_return settings that most teams overlook.
Hybrid Core/ORM Queries
The most powerful pattern in 2.0 is using Core subqueries inside ORM queries or using ORM entities as targets for Core UPDATE statements. This lets you combine the type safety and identity map of the ORM with the raw SQL expressiveness of Core for complex aggregation or multi-table mutation:
from sqlalchemy import select, update, func, and_
from sqlalchemy.orm import aliased
# ORM query with Core subquery for correlated aggregation
async def get_high_value_tenants(
session: AsyncSession, min_invoice_total: int
) -> list[Tenant]:
invoice_totals = (
select(Invoice.tenant_id, func.sum(Invoice.amount).label("total"))
.group_by(Invoice.tenant_id)
.subquery()
)
stmt = (
select(Tenant)
.join(invoice_totals, Tenant.id == invoice_totals.c.tenant_id)
.where(invoice_totals.c.total >= min_invoice_total)
.order_by(invoice_totals.c.total.desc())
)
result = await session.execute(stmt)
return list(result.scalars())
# Core UPDATE against ORM-mapped table — bypasses identity map for efficiency
async def expire_old_invoices(session: AsyncSession, cutoff_days: int) -> int:
from datetime import timedelta
cutoff = datetime.now(timezone.utc) - timedelta(days=cutoff_days)
stmt = (
update(Invoice)
.where(and_(Invoice.issued_at < cutoff, Invoice.paid_at.is_(None)))
.values(status="expired")
.execution_options(synchronize_session=False)
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
execution_options(synchronize_session=False) tells SQLAlchemy not to attempt synchronizing the identity map after a bulk update. This is required when the update matches more rows than are currently loaded in the session, which is the common case for batch operations.
Streaming Large Result Sets
For result sets exceeding available memory, use yield_per() to stream rows in chunks:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
CHUNK_SIZE = 1000
async def export_all_invoices(session: AsyncSession):
stmt = select(Invoice).order_by(Invoice.id).execution_options(yield_per=CHUNK_SIZE)
async for partition in await session.stream(stmt):
for invoice in partition.scalars():
yield invoice
yield_per=1000 is a reasonable default for PostgreSQL with 10–50 column rows. For wider rows (50+ columns, JSONB blobs), reduce to 100–500. The advanced query patterns guide covers streaming, window functions, and CTEs that work efficiently alongside this pattern.
Per-Request Isolation Level Overrides
Some endpoints require stronger isolation guarantees. SQLAlchemy 2.0 allows setting isolation level per connection via execution_options:
async def get_consistent_balance_snapshot(
session: AsyncSession, tenant_id: int
) -> Decimal:
# Upgrade to REPEATABLE READ for this specific operation
await session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
stmt = (
select(func.sum(Invoice.amount))
.where(Invoice.tenant_id == tenant_id)
.where(Invoice.paid_at.is_not(None))
)
result = await session.execute(stmt)
return result.scalar_one() or Decimal(0)
This sets the isolation level for the current connection without affecting the global engine default. The connection reverts to the engine default when returned to the pool.
Common Pitfalls & Anti-Patterns
RemovedIn20Warning: The Query.filter() method is considered legacy
Root cause: session.query(Model).filter(...) is the 1.x query interface, emitted as a deprecation warning in 1.4 and removed in 2.0. Fix: replace with select(Model).where(...) executed via session.execute(). The guide to fixing RemovedIn20Warning deprecation warnings provides a systematic refactoring checklist.
DetachedInstanceError: Instance <User> is not bound to a Session
Root cause: Accessing a lazy-loaded relationship or expired attribute on an ORM object after its session has been closed or after session.expunge(). Common in FastAPI routes that load an entity in one dependency and access attributes in the response serializer after the session is closed. Fix: set expire_on_commit=False on the session factory and use selectinload() / joinedload() to eagerly load all needed relationships before the session closes. See the guide to fixing DetachedInstanceError after commit for the full diagnosis tree.
MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here
Root cause: A synchronous code path (often a Pydantic validator, __str__, or a @property) triggered a lazy load inside an async context. SQLAlchemy's greenlet bridge intercepts lazy loads and schedules them, but this only works when called from within a greenlet spawned by SQLAlchemy. Code running in a plain asyncio coroutine without the bridge raises MissingGreenlet. Fix: disable lazy loading entirely by setting lazy="raise" on all relationships, and use explicit eager loading at query time. Never access ORM attributes from Pydantic validators or __repr__ methods.
RuntimeError: This transaction is already begun
Root cause: Calling session.begin() on a session that already has an active transaction. This happens when a function that starts a transaction is called inside a with session.begin() block, or when autobegin=True (the default) starts a transaction on the first statement and you then call begin() explicitly. Fix: use session.begin_nested() for nested transactions (which issues a SAVEPOINT), or restructure code so that transaction management is the caller's responsibility rather than being embedded in service functions.
sqlalchemy.exc.TimeoutError: QueuePool limit of size N overflow N reached
Root cause: All connections in the pool are in use and max_overflow slots are exhausted. Every call to session.execute() that hasn't returned its connection is holding a slot. Fix: ensure async with session and async with engine.connect() are used with proper await and context manager semantics. Check for sessions that are not closed after use. Tune pool_size and max_overflow based on actual concurrency measurements, not defaults.
Using session.add() in a loop for bulk inserts
Root cause: The ORM identity map tracks every object added via session.add(), consuming memory proportional to the number of objects. At scale, this causes GC pressure and flush overhead. Fix: use session.execute(insert(Model), rows_list) with a list of dicts for bulk inserts, or use the Core-level engine.begin() / conn.execute(insert(Model), rows_list) pattern. The Core path bypasses the identity map entirely.
Frequently Asked Questions
Is SQLAlchemy 2.0 backward compatible with 1.4 codebases?
Partial compatibility is provided through SQLAlchemy 1.4's "2.0 style" migration path. Legacy patterns emit RemovedIn20Warning in 1.4 but still function. In strict 2.0, session.query() and Column() outside mapped_column() are removed. The safest migration path is to enable SQLALCHEMY_WARN_20=1 in your 1.4 environment, resolve all warnings, then upgrade. The legacy 1.4 to 2.0 migration checklist documents every deprecation category.
When should I use Core Connection.execute() instead of Session.execute()?
Prefer Core when: (1) you are bulk-inserting or bulk-updating more than a few hundred rows and do not need ORM change tracking; (2) you are running DDL or raw SQL that has no ORM model backing; (3) you want the lowest possible overhead for a reporting query that returns aggregates, not entities. Use the ORM session when you need the identity map, relationship traversal, event hooks, or the Unit of Work's dependency-ordered flush.
Why does MissingGreenlet appear even when I use async def everywhere?MissingGreenlet is triggered by lazy-loaded attribute access, not by the function's async status. The error means that Python code (possibly synchronous, inside Pydantic, __repr__, or a validator) attempted to access a deferred attribute on an ORM object. The fix is structural: ensure that all needed attributes and relationships are loaded before the async context they are needed in, by using eager loading options (selectinload, joinedload, contains_eager) at query time.
What is the difference between async_sessionmaker and async_scoped_session?async_sessionmaker creates a new AsyncSession on each call — the right default for request-scoped sessions in FastAPI. async_scoped_session maintains a registry of sessions keyed by a scope function (typically asyncio.current_task), sharing one session per running coroutine task. async_scoped_session is appropriate for patterns where many coroutines within the same task need to share state without explicitly passing a session, but it requires careful scope management to avoid session leaks across task boundaries.
Should I set pool_size equal to the number of web workers?
No. Set pool_size based on concurrent database queries, not web workers. A FastAPI endpoint that queries the database once per request at 100 req/s with an average query time of 20ms needs roughly 100 * 0.020 = 2 concurrent database connections at steady state. Add headroom for spikes: pool_size=5, max_overflow=10 is realistic for most endpoints. Over-provisioning pool connections wastes PostgreSQL backend processes, which consume roughly 10 MB of RAM each.
How do I handle multiple databases in one application with 2.0?
Create one engine and one async_sessionmaker per database. If the databases share a schema (multi-tenant), use schema_translate_map on the connection-level execution options to redirect table references to tenant-specific schemas at query time. For read replicas, create a separate read-only engine and route SELECT-only sessions to it. The async engines and connection pooling guide covers replica routing and multi-engine dependency injection patterns.
Related
- Core vs ORM Architecture Decisions — when to bypass the ORM in favor of raw Core statements for performance-critical paths.
- Session Lifecycle and Scope Management — request scoping, dependency injection patterns, and avoiding cross-request session leaks.
- Transaction Isolation and Commit Strategies — per-session isolation levels, savepoints, and distributed transaction patterns.
- Migrating Legacy 1.4 Code to 2.0 Syntax — systematic refactoring of
session.query(),Column(), and backref patterns. - Async Engines, Dialects and Connection Pooling — deep dive into asyncpg, psycopg3, pool sizing, and cloud database configuration.