Core vs ORM Architecture Decisions in SQLAlchemy 2.0

Choosing between SQLAlchemy Core and the ORM in 2.0 is not a binary commitment — it is a throughput and complexity trade-off that sits at the centre of Mastering SQLAlchemy 2.0 Core and ORM Architecture, the definitive reference for production SQLAlchemy design. Core executes stateless SQL with zero identity-map overhead; the ORM adds unit-of-work semantics, automatic change tracking, and entity hydration at a measurable cost. The 2.0 unified execution model means the same select() construct runs on either path — the divergence is entirely in the execution context, not the query language.

Concept & Execution Model

SQLAlchemy 2.0 consolidates both paths under a single SQL compilation pipeline. A select() statement is a lazy, immutable SQL expression tree. Which executor receives it determines everything about runtime behaviour.

  • AsyncConnection.execute(stmt) — returns raw Row objects, which behave like named tuples. No identity map, no change tracking, no deferred attribute loading. The driver communicates via binary protocol (asyncpg) or text protocol (psycopg3). Python-side overhead is essentially zero after the network round-trip.
  • AsyncSession.execute(stmt) — routes the same compiled SQL through the ORM's result-processing pipeline. Each row causes an identity-map lookup keyed on the primary key. If the entity is new, it is hydrated and inserted into the map; if it already exists, the existing instance is returned. Attribute instrumentation wraps every column value, and the entity is registered in the unit-of-work for the current transaction.

The overhead gap between Core and ORM appears after the SQL executes and the network bytes arrive. Network round-trips and database index scans are identical. The ORM adds Python-side cost: attribute instrumentation interception, identity-map hash lookups (dict.__setitem__ per row per column), and deferred-loading bookkeeping. For small result sets this is imperceptible. For bulk operations it dominates.

Core vs ORM Execution Cost Layers A layered diagram comparing the execution pipeline for SQLAlchemy Core (left) and ORM (right), showing where costs diverge after shared SQL compilation and network I/O. Shared Pipeline (same cost on both paths) select() — SQL compile & bind Network I/O — asyncpg / psycopg3 Core path AsyncConnection.execute() Row namedtuples — no state Python overhead: ~0 ms No identity map No change tracking No deferred loads ORM path AsyncSession.execute() Hydration + identity map Python overhead: 15–40 ms per 1,000 rows Identity map lookup per PK Attribute instrumentation Unit-of-work registration SQL and network cost is identical; Python-side cost diverges at the executor boundary.

To make the divergence concrete, consider an ORM model and its Core counterpart sharing the same database table:

from typing import Sequence
from sqlalchemy import select, Column, Integer, String, Table, MetaData
from sqlalchemy.ext.asyncio import AsyncSession, AsyncConnection, create_async_engine
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase

metadata = MetaData()
orders_core = Table(
    "orders",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("customer_id", Integer, nullable=False),
    Column("total_cents", Integer, nullable=False),
    Column("status", String(32), nullable=False),
)


class Base(DeclarativeBase):
    pass


class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    customer_id: Mapped[int] = mapped_column(nullable=False)
    total_cents: Mapped[int] = mapped_column(nullable=False)
    status: Mapped[str] = mapped_column(String(32), nullable=False)


async def compare_execution_paths(
    conn: AsyncConnection, session: AsyncSession
) -> None:
    """
    The same WHERE predicate compiles to the same SQL on both paths.
    Cost diverges only after the rows arrive from the database.
    """
    # Core: Row namedtuples, no ORM state overhead
    core_result = await conn.execute(
        select(orders_core).where(orders_core.c.status == "pending")
    )
    core_rows: Sequence[tuple] = core_result.fetchall()
    # core_rows[0].total_cents — direct attribute via namedtuple, no instrumentation

    # ORM: full Order entities, identity map populated, unit-of-work registers each
    orm_result = await session.execute(
        select(Order).where(Order.status == "pending")
    )
    orm_entities: Sequence[Order] = orm_result.scalars().all()
    # orm_entities[0].total_cents — intercepted by attribute instrumentation

Query Construction & Async Execution Patterns

The 2.0 query API is identical whether you target Core or ORM — select(), where(), join(), order_by(), limit(), offset() all compose the same way. The runtime behaviour diverges at .execute(). This means you can write query-construction utilities that produce Select objects and let the caller decide whether to route through a Connection or Session.

Async Core — reporting aggregation, no hydration cost:

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncConnection


async def revenue_by_status(conn: AsyncConnection) -> list[dict]:
    """
    Pure aggregation: no entity hydration needed.
    Returning a list of plain dicts is idiomatic for read-heavy reporting.
    """
    stmt = (
        select(
            orders_core.c.status,
            func.count(orders_core.c.id).label("order_count"),
            func.sum(orders_core.c.total_cents).label("total_cents"),
        )
        .where(orders_core.c.total_cents > 0)
        .group_by(orders_core.c.status)
        .order_by(orders_core.c.status)
    )
    result = await conn.execute(stmt)
    return [dict(row._mapping) for row in result.fetchall()]

Async ORM — transactional domain operation with relationship loading:

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload


async def fulfill_order(session: AsyncSession, order_id: int) -> Order:
    """
    Transactional operation: relationship traversal + state mutation.
    ORM change tracking issues the UPDATE automatically on commit.
    """
    stmt = (
        select(Order)
        .where(Order.id == order_id)
        .options(selectinload(Order.line_items))  # type: ignore[attr-defined]
    )
    order = (await session.execute(stmt)).scalars().one()
    order.status = "fulfilled"
    await session.commit()
    return order

Shared query builder — routable to either executor:

from sqlalchemy import select, Select


def orders_for_customer(customer_id: int, limit: int = 100) -> Select:
    """
    Pure query construction: no executor dependency.
    The caller decides whether to route through Core or ORM.
    """
    return (
        select(Order)
        .where(Order.customer_id == customer_id, Order.status != "cancelled")
        .order_by(Order.created_at.desc())  # type: ignore[attr-defined]
        .limit(limit)
    )


# Core caller (reporting, no hydration):
async def report_customer_orders(conn: AsyncConnection, customer_id: int) -> list[dict]:
    result = await conn.execute(orders_for_customer(customer_id))
    return [dict(row._mapping) for row in result.fetchall()]


# ORM caller (transaction, full entities):
async def get_customer_orders(session: AsyncSession, customer_id: int) -> list[Order]:
    result = await session.execute(orders_for_customer(customer_id))
    return result.scalars().all()

The complete guide to replacing the legacy session.query(...).filter(...) pattern with select(...).where(...) is in How to Replace Query.filter with select.where in SQLAlchemy 2.0.

State Management & Session Boundaries

The ORM's identity map guarantees object identity within a transactional scope. If you execute two queries that return the same primary key in the same session, the second query returns the already-hydrated object from the map — no second Python object is allocated. This prevents in-memory inconsistency when the same entity is touched by multiple code paths in one request.

The liability is accumulation. A background worker that runs 100,000 ORM updates in a loop without periodic session.expunge_all() or session recycling will grow the identity map until the process runs out of memory. Core has no such accumulation — each conn.execute() yields results and discards them.

Async expire_on_commit and its consequences:

By default, expire_on_commit=True causes every attribute on every committed entity to be marked expired. The next attribute access after a commit triggers a lazy SQL reload — which in async context raises MissingGreenlet: greenlet_spawn has not been called because there is no implicit async context for the lazy load to run in.

from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession, create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

# Correct for async: disable expire_on_commit
AsyncSessionFactory = async_sessionmaker(
    engine,
    expire_on_commit=False,  # entities remain usable after commit
)

# Wrong for async: default expire_on_commit=True
AsyncSessionFactoryBad = async_sessionmaker(engine)
# After session.commit(), any attribute access on returned entities
# triggers MissingGreenlet — there is no implicit greenlet to run the reload.

Session scoping patterns for production async services:

from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker


@asynccontextmanager
async def request_session(
    factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
    """
    Request-scoped session. Tightly bound to the HTTP request lifecycle.
    Commit on success, rollback on exception, always close on exit.
    """
    async with factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


@asynccontextmanager
async def worker_batch_session(
    factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
    """
    Worker-scoped session. Call this once per batch, not once per worker lifetime.
    expunge_all() after commit prevents identity map accumulation across batches.
    """
    async with factory() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            session.expunge_all()

The Session Lifecycle and Scope Management guide covers expire_on_commit, expunge vs expunge_all, and session factory configuration for async workers and web frameworks in depth.

Advanced ETL Throughput & Bulk Operation Patterns

ETL workloads present the starkest performance contrast between Core and ORM. The numbers below reflect benchmarks on a 4-core application host against a remote PostgreSQL 15 instance over a 1 Gbit LAN, using asyncpg as the driver and a pool of 10 connections.

OperationMethodThroughput (rows/second)
INSERT, 500k rowsORM session.add_all()8,000–12,000
INSERT, 500k rowsCore conn.execute(insert(), [dicts])95,000–130,000
UPDATE by PK, 100k rowsORM flush6,000–9,000
UPDATE by PK, 100k rowsCore update().where().values() per batch50,000–75,000
SELECT, 1M rows, raw tuplesCore fetchmany(10000)400,000–600,000
SELECT, 1M rows, ORM entitiesORM scalars().all()80,000–120,000

The ORM's advantage is correctness and expressiveness, not throughput. Use it where you need relationship traversal, computed attributes, event hooks, or domain validation that involves Python-side business logic. Route bulk read and write operations through Core.

Core bulk insert — chunked batches for asyncpg parameter limits:

asyncpg imposes a maximum of 32,767 parameters per query. With a 10-column table, that limits a single executemany batch to 3,276 rows. Chunking at 2,000–5,000 rows is safe for most schemas:

from itertools import islice
from typing import Iterator
from sqlalchemy import insert, Table, MetaData, Column, Integer, String
from sqlalchemy.ext.asyncio import AsyncConnection

metadata = MetaData()
products = Table(
    "products",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("sku", String(64), nullable=False),
    Column("name", String(255), nullable=False),
    Column("price_cents", Integer, nullable=False),
)


def _chunk(it: Iterator[dict], size: int) -> Iterator[list[dict]]:
    it = iter(it)
    while batch := list(islice(it, size)):
        yield batch


async def bulk_load_products(
    conn: AsyncConnection,
    records: Iterator[dict],
    batch_size: int = 3_000,
) -> int:
    """
    Bulk-loads Product rows via Core executemany.
    Chunks at 3,000 to stay within asyncpg's 32,767 parameter limit
    for a 4-column INSERT (4 * 3,000 = 12,000 parameters, well within limits).
    Returns total rows inserted.
    """
    stmt = insert(products)
    total = 0
    async with conn.begin():
        for batch in _chunk(records, batch_size):
            result = await conn.execute(stmt, batch)
            total += result.rowcount
    return total

Core bulk UPDATE — set-based update far outperforms row-by-row ORM flush:

from sqlalchemy import update
from sqlalchemy.ext.asyncio import AsyncConnection


async def mark_orders_expired(
    conn: AsyncConnection,
    cutoff_timestamp: str,
) -> int:
    """
    Single UPDATE statement affecting potentially millions of rows.
    ORM equivalent would require fetching all entities first, then flushing
    individual dirty-checks — orders of magnitude slower.
    """
    stmt = (
        update(orders_core)
        .where(
            orders_core.c.status == "pending",
            orders_core.c.created_at < cutoff_timestamp,
        )
        .values(status="expired")
    )
    result = await conn.execute(stmt)
    return result.rowcount

Hybrid pattern — ORM domain validation then Core insertion:

For pipelines where records must pass domain rules before persisting, validate with Python-side logic and insert via Core. Share the transaction by extracting the connection from the ORM session:

from dataclasses import dataclass
from sqlalchemy import insert, Table, MetaData, Column, Integer, String
from sqlalchemy.ext.asyncio import AsyncSession

metadata = MetaData()
invoices = Table(
    "invoices",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("tenant_id", Integer, nullable=False),
    Column("amount_cents", Integer, nullable=False),
    Column("currency", String(3), nullable=False),
)


@dataclass
class InvoiceRecord:
    tenant_id: int
    amount_cents: int
    currency: str

    def validate_and_export(self) -> dict:
        if self.amount_cents <= 0:
            raise ValueError(f"Non-positive amount: {self.amount_cents}")
        if self.currency not in {"USD", "EUR", "GBP"}:
            raise ValueError(f"Unsupported currency: {self.currency}")
        return {
            "tenant_id": self.tenant_id,
            "amount_cents": self.amount_cents,
            "currency": self.currency,
        }


async def validated_bulk_insert(
    session: AsyncSession,
    records: list[InvoiceRecord],
) -> int:
    """
    Validates with domain logic, then inserts via Core to bypass
    identity-map overhead. Both use the same transaction.
    """
    validated = [r.validate_and_export() for r in records]
    conn = await session.connection()  # shares the session's transaction
    result = await conn.execute(insert(invoices), validated)
    return result.rowcount

Hybrid Architectures & Migration Strategies

Enterprise systems rarely operate purely in one layer. The most maintainable architecture routes by operation type: ORM for transactional domain logic with relationship traversal; Core for bulk I/O, reporting, and dynamic schema operations. Both paths share the same AsyncEngine, connection pool, and — when you use await session.connection() — the same transaction.

Unified transaction scope for hybrid operations:

from contextlib import asynccontextmanager
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, AsyncConnection, async_sessionmaker


@asynccontextmanager
async def hybrid_scope(
    factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[tuple[AsyncSession, AsyncConnection], None]:
    """
    Exposes both the ORM session and its underlying connection.
    ORM flush + Core executemany share the same transaction.
    A single commit/rollback covers both.
    """
    async with factory() as session:
        conn: AsyncConnection = await session.connection()
        try:
            yield session, conn
            await session.commit()
        except Exception:
            await session.rollback()
            raise


# Usage pattern:
# async with hybrid_scope(session_factory) as (session, conn):
#     # ORM: update domain entities with change tracking
#     tenant = await session.get(Tenant, tenant_id)
#     tenant.updated_at = datetime.utcnow()
#
#     # Core: insert bulk audit rows without identity-map overhead
#     await conn.execute(insert(audit_log_table), audit_records)
#     # Commit is atomic: both ORM flush and Core inserts commit together.

When to use session.execute() vs conn.execute() in a hybrid scope:

Use session.execute() when the result needs to participate in the ORM lifecycle — you want hydrated entities, change tracking, or relationship loading. Use conn.execute() for everything else: aggregation queries whose results are serialised directly to JSON, bulk writes that bypass the identity map, and DDL statements.

For teams migrating from 1.4, the most common friction point is the RemovedIn20Warning torrent produced by legacy session.query() usage. Enabling SQLALCHEMY_WARN_20=1 exposes all call sites before committing to the migration. A systematic walkthrough is in Migrating Legacy 1.4 Code to 2.0 Syntax.

Decision matrix for architecture choices in new code:

ScenarioRecommended LayerKey Reason
REST endpoint, single entity CRUDORMChange tracking, validation hooks, relationship access
GraphQL resolver, 3+ relationship hopsORM + selectinloadN+1 prevention via eager load options
Nightly ETL, 1M+ rowsCore10–15× throughput over ORM add_all
Reporting dashboard aggregationsCoreNo entity hydration required
Multi-tenant schema routingCoreDynamic Table instantiation at runtime
Background job, 10k–100k updatesHybridDomain validation via ORM, bulk write via Core
Alembic migration data backfillCoreNo mapper needed; operates on raw tables
Test fixtures (small datasets)ORMReadability and relationship management

Multi-tenant schema routing — a Core-only domain:

from sqlalchemy import Table, MetaData, Column, Integer, String
from sqlalchemy.ext.asyncio import AsyncConnection


def tenant_table(schema: str) -> Table:
    """
    Returns a Table object bound to the tenant's schema.
    ORM declarative base cannot do this without mapper duplication.
    Core Table instantiation is idiomatic for runtime schema routing.
    """
    meta = MetaData(schema=schema)
    return Table(
        "orders",
        meta,
        Column("id", Integer, primary_key=True),
        Column("total_cents", Integer, nullable=False),
        Column("status", String(32), nullable=False),
    )


async def get_tenant_revenue(conn: AsyncConnection, tenant_id: str) -> int:
    from sqlalchemy import select, func

    schema = f"tenant_{tenant_id}"
    table = tenant_table(schema)
    stmt = select(func.sum(table.c.total_cents)).where(table.c.status == "paid")
    result = await conn.execute(stmt)
    return result.scalar_one_or_none() or 0

Type Safety and the Mapped Annotation Model

One underappreciated benefit of the 2.0 unified execution model is end-to-end static type safety. The legacy Query API returned untyped collections; pyright and mypy could not infer the element type of session.query(User).all() without plugin support. The 2.0 select() + scalars() path gives type checkers enough information to infer return types from first principles.

ORM model with full Mapped[] annotations:

from datetime import datetime
from decimal import Decimal
from sqlalchemy import String, Numeric, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class Tenant(Base):
    __tablename__ = "tenants"
    id: Mapped[int] = mapped_column(primary_key=True)
    slug: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
    invoices: Mapped[list["Invoice"]] = relationship(back_populates="tenant")


class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    tenant_id: Mapped[int] = mapped_column(ForeignKey("tenants.id"), nullable=False)
    amount: Mapped[Decimal] = mapped_column(Numeric(12, 2), nullable=False)
    currency: Mapped[str] = mapped_column(String(3), nullable=False)
    created_at: Mapped[datetime] = mapped_column(nullable=False)
    status: Mapped[str] = mapped_column(String(32), nullable=False)
    tenant: Mapped[Tenant] = relationship(back_populates="invoices")

With Mapped[] annotations, pyright infers Invoice from session.scalars(select(Invoice)) without any plugin. It also infers Decimal from invoice.amount and flags type mismatches at check time:

from sqlalchemy import select
from sqlalchemy.orm import Session
from decimal import Decimal


def get_high_value_invoices(
    session: Session, threshold: Decimal
) -> list[Invoice]:
    stmt = select(Invoice).where(Invoice.amount >= threshold)
    invoices: list[Invoice] = session.scalars(stmt).all()
    # pyright knows invoices[0].amount is Decimal, .currency is str, etc.
    return invoices

Core with typed Row access:

Core Row objects also support ._mapping and named-attribute access. For typed Core queries returning scalar aggregates, use scalar_one():

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncConnection
from decimal import Decimal


async def total_invoice_amount(
    conn: AsyncConnection, tenant_id: int, currency: str
) -> Decimal:
    stmt = select(func.sum(Invoice.amount)).where(
        Invoice.tenant_id == tenant_id,
        Invoice.currency == currency,
        Invoice.status == "paid",
    )
    # scalar_one() raises NoResultFound if the aggregate returns NULL (no rows).
    # Use scalar_one_or_none() and handle None for tenants with no paid invoices.
    result = await conn.execute(stmt)
    total = result.scalar_one_or_none()
    return Decimal(total) if total is not None else Decimal("0")

Type annotations are not just a developer-experience feature — they enforce architectural boundaries. A function typed to return list[Invoice] cannot silently return Row tuples; mypy catches the mismatch before deployment. This is particularly valuable in hybrid architectures where Core and ORM results flow through the same service layer.

Production Pitfalls & Anti-Patterns

  • ORM add_all() for large datasetssession.add_all(list_of_10k_entities) allocates one Python object and one identity-map slot per row. Memory grows linearly with the batch size, and the flush triggers one INSERT statement per entity (not executemany). For batches larger than 500 rows, use Core insert() with a list of dicts, which triggers executemany and bypasses the identity map entirely.
  • Mixing Connection and Session without sharing the transaction — creating an independent AsyncConnection alongside an open AsyncSession opens two separate transactions on two separate connections from the pool. Changes made on the connection are invisible to the session's flush, and vice versa. Always extract the connection via conn = await session.connection() to share the session's transaction.
  • Calling .all() directly on a Select — a Select object has no .all() method. This raises AttributeError: 'Select' object has no attribute 'all'. The correct call chain is session.execute(stmt).scalars().all().
  • Applying ORM loader options to Core Table selects — attaching selectinload() or joinedload() to a select(table_object) raises ArgumentError: Loader option ... is not compatible with non-mapped selectables. Loader strategies are bound to ORM mappers and cannot operate on raw Table constructs.
  • Session accumulation in long-running workers — a background consumer that processes millions of events in one session lifetime without calling session.expunge_all() between batches grows the identity map until the process OOMs. Recycle the session or call expunge_all() at each batch boundary.
  • Ignoring pool_pre_ping — without pool_pre_ping=True, stale connections returned from the pool after a database restart or firewall timeout cause asyncpg.exceptions.ConnectionDoesNotExistError on the first execute. Enable it unconditionally for any async engine that may sit idle: create_async_engine(url, pool_pre_ping=True).
  • Forgetting pool_recycle for cloud databases — managed databases (AWS RDS, Cloud SQL, Supabase) enforce idle connection timeouts between 300 s and 1800 s. Set pool_recycle to 300 seconds or less to pre-empt server-side closure: create_async_engine(url, pool_recycle=300).

Frequently Asked Questions

When should I choose SQLAlchemy Core over the ORM in 2.0? Choose Core for high-volume ETL, reporting aggregations, dynamic schema routing, or any operation where the Python-side overhead of entity hydration and change tracking would dominate. The practical breakpoint is roughly 10,000 rows per transaction — above that, Core's 10–15× throughput advantage over ORM add_all is substantial enough to mandate the choice.

Does SQLAlchemy 2.0 still require separate APIs for Core and ORM? No. Both paths use the same select() construct for query construction and execute() for execution. The only difference is the executor type: AsyncConnection (Core) returns Row namedtuples; AsyncSession (ORM) returns hydrated, change-tracked entities.

Can I mix Core and ORM in the same transaction? Yes. Extract the connection from the session via conn = await session.connection(). Execute Core statements on conn; they participate in the session's open transaction. Calling session.commit() flushes any pending ORM changes and commits the shared connection — both Core inserts and ORM mutations are atomic.

How large is the ORM overhead in real numbers? On a 4-core host against a remote Postgres instance with asyncpg, ORM hydration adds roughly 15–40 ms per 1,000 entities due to identity-map hash lookups and attribute instrumentation interception. Core adds effectively 0 ms beyond the driver fetch. For 50-row request-scoped queries the difference is sub-millisecond and irrelevant. For 500k-row ETL it is decisive.

Does choosing Core mean I lose async support? No. AsyncConnection is a full async citizen — await conn.execute(), await conn.begin(), async with conn.begin(), and async with conn.connect() are all supported. Core async is often more predictable than ORM async because there are no hidden lazy-load traps that could fire across await boundaries and raise MissingGreenlet.