Common Table Expressions (CTEs) and Recursive Queries in SQLAlchemy 2.0

CTEs and recursive queries in SQLAlchemy 2.0 unlock database-side graph traversal, staged aggregation pipelines, and materialization-controlled execution plans — all covered in the broader Advanced Query Patterns and Bulk Data Operations guide. This page covers every production-relevant surface: .cte(), recursive=True, union_all, materialized hints, async execution, and the pitfalls that surface when these constructs meet real workloads.

Concept & Execution Model

A Common Table Expression is a named temporary result set defined within the scope of a single SQL statement. The database query planner treats a CTE either as an optimization fence — executing it once and caching the rows — or inlines it as a subquery depending on dialect version and explicit hints. SQLAlchemy 2.0 exposes this duality through the .cte() method available on any Select object.

SQLAlchemy 1.4 introduced the unified select() API. In 2.0, the legacy Query.cte() path is fully removed. Every CTE must now be constructed from a Select statement:

# Async — SQLAlchemy 2.0 unified API
import asyncio
from sqlalchemy import select, Integer, String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker


class Base(DeclarativeBase):
    pass


class Department(Base):
    __tablename__ = "departments"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(120))
    budget: Mapped[int] = mapped_column(Integer, default=0)


engine = create_async_engine("postgresql+asyncpg://user:pw@localhost/db", echo=False)
AsyncSession = async_sessionmaker(engine, expire_on_commit=False)


async def top_budgets(session: AsyncSession) -> list[tuple[int, str, int]]:
    # Build CTE: departments whose budget exceeds 100 000
    high_budget_cte = (
        select(Department.id, Department.name, Department.budget)
        .where(Department.budget > 100_000)
        .cte(name="high_budget_depts")
    )

    stmt = select(
        high_budget_cte.c.id,
        high_budget_cte.c.name,
        high_budget_cte.c.budget,
    ).order_by(high_budget_cte.c.budget.desc())

    result = await session.execute(stmt)
    return result.all()

The CTE object returned by .cte() exposes a .c column accessor identical to that of a Table or subquery. You can reference the same CTE multiple times in the outer SELECT, join it to other tables, or compose it into further CTEs — the planner receives a single SQL statement with one WITH clause.

Why this matters for async. Under asyncio, the event loop yields control at each await. A CTE that forces the database to materialize millions of rows blocks the database-side cursor until all rows are fetched. Pairing CTEs with stream_results=True and yield_per keeps the Python side memory-bounded while the database can pipeline rows through the asyncpg buffer.

Recursive CTE execution loop Diagram showing the anchor term producing an initial working table, the recursive term joining that working table to produce new rows, those rows being appended to the result set and used as the next iteration's working table, looping until the recursive term returns no rows. Anchor Term parent_id IS NULL Working Table (current iteration rows) appended → Result Set Recursive Term JOIN cte.parent new rows → next iteration Termination recursive term returns 0 rows OR depth guard fires Final Result UNION ALL Recursive CTE: anchor → working table → recursive join → loop until empty

Query Construction & Async Execution Patterns

Non-Recursive CTEs: Staged Filtering and Aggregation

Non-recursive CTEs shine when you need to stage filtering, aggregation, or column renaming before a join. They replace nested subqueries with named stages that the query planner can optimize individually.

# Async — multi-stage CTE pipeline for invoice reporting
from sqlalchemy import select, func, Integer, Numeric
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession
import decimal


class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    tenant_id: Mapped[int] = mapped_column(Integer)
    amount: Mapped[decimal.Decimal] = mapped_column(Numeric(12, 2))
    paid: Mapped[bool] = mapped_column(default=False)


async def tenant_revenue_summary(session: AsyncSession, min_revenue: decimal.Decimal):
    # Stage 1: paid invoices per tenant
    paid_cte = (
        select(
            Invoice.tenant_id,
            func.sum(Invoice.amount).label("total_revenue"),
            func.count().label("invoice_count"),
        )
        .where(Invoice.paid.is_(True))
        .group_by(Invoice.tenant_id)
        .cte(name="paid_by_tenant")
    )

    # Stage 2: filter tenants above the threshold
    qualified_cte = (
        select(paid_cte)
        .where(paid_cte.c.total_revenue >= min_revenue)
        .cte(name="qualified_tenants")
    )

    stmt = select(
        qualified_cte.c.tenant_id,
        qualified_cte.c.total_revenue,
        qualified_cte.c.invoice_count,
    ).order_by(qualified_cte.c.total_revenue.desc())

    result = await session.execute(stmt)
    return result.mappings().all()

Column aliases created with .label() inside a CTE are accessible through .c.alias_name on the CTE object. This is the canonical way to reference derived columns in subsequent CTEs or the outer query.

Recursive CTEs: Syntax and Anatomy

A recursive CTE requires recursive=True on the .cte() call. SQLAlchemy emits WITH RECURSIVE for PostgreSQL and MySQL 8+, and the equivalent for SQLite 3.8.3+. The pattern is always:

  1. Build the anchor Select — the base case.
  2. Call .cte(name="...", recursive=True) on it to get the CTE object.
  3. Alias the CTE: cte_alias = my_cte.alias().
  4. Build the recursive term Select that joins against cte_alias.
  5. Extend the CTE: full_cte = my_cte.union_all(recursive_term).
  6. Use full_cte in the outer select().
# Async — recursive CTE for an org chart (Employee → manager chain)
from sqlalchemy import select, Integer, String, func, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession


class Employee(Base):
    __tablename__ = "employees"
    id: Mapped[int] = mapped_column(primary_key=True)
    manager_id: Mapped[int | None] = mapped_column(ForeignKey("employees.id"))
    name: Mapped[str] = mapped_column(String(120))
    department: Mapped[str] = mapped_column(String(80))


async def fetch_reports(session: AsyncSession, manager_id: int, max_depth: int = 8):
    """Return every direct and indirect report under manager_id."""
    anchor = select(
        Employee.id,
        Employee.manager_id,
        Employee.name,
        Employee.department,
        func.cast(0, Integer).label("depth"),
    ).where(Employee.id == manager_id)

    org_cte = anchor.cte(name="org_tree", recursive=True)
    cte_alias = org_cte.alias()

    recursive_term = (
        select(
            Employee.id,
            Employee.manager_id,
            Employee.name,
            Employee.department,
            (cte_alias.c.depth + 1).label("depth"),
        )
        .join(cte_alias, Employee.manager_id == cte_alias.c.id)
        .where(cte_alias.c.depth < max_depth)
    )

    full_cte = org_cte.union_all(recursive_term)
    stmt = select(full_cte).order_by(full_cte.c.depth, full_cte.c.name)

    result = await session.execute(stmt)
    return result.mappings().all()

The depth guard (.where(cte_alias.c.depth < max_depth)) is non-negotiable in production. Without it, a corrupted adjacency list — a row whose parent chain forms a cycle — triggers unbounded recursion, exhausting the database's stack or hitting max_stack_depth.

For a complete walkthrough of the self-referential model, cycle detection strategies, and async streaming for deep trees, see Implementing Recursive CTEs for Hierarchical Data in SQLAlchemy.

State Management & Session Boundaries

CTEs are statement-scoped: they exist only for the duration of one SQL execution. There is no server-side cursor state that persists across session.execute() calls, so identity map implications are the same as for any other query.

When a CTE returns ORM-mapped rows, SQLAlchemy's identity map deduplicates by primary key. If the same Employee.id appears at multiple depths (which can happen in cyclic data that bypasses your depth guard), only the first-seen row survives in the session's identity map. For this reason, CTE results that span tree traversal should generally be consumed as plain Row tuples rather than ORM instances, unless you know the returned ids are unique.

# Preferred: consume as mappings, not ORM objects
result = await session.execute(stmt)
rows = result.mappings().all()  # [{id: ..., name: ..., depth: ...}, ...]

# Avoid for recursive results:
# employees = result.scalars().all()  # identity map collision risk

Transaction boundaries do not need special treatment for CTEs compared to ordinary queries. One important nuance: if you execute a CTE inside session.begin() and stream rows, the transaction stays open until the streaming generator is exhausted or explicitly closed. Wrap streaming consumers in async with session.begin() and ensure you fully drain or close the result before the context exits.

async def stream_org_tree(session: AsyncSession, root_id: int):
    async with session.begin():
        stmt = build_org_cte_stmt(root_id)
        result = await session.execute(
            stmt.execution_options(stream_results=True, yield_per=500)
        )
        async for partition in result.partitions(500):
            yield partition
        # Transaction closes here — asyncpg releases the connection

Advanced CTE Patterns

Chained CTEs for Multi-Stage Pipelines

Multiple CTEs can be chained: each subsequent CTE selects from the previous one. SQLAlchemy emits a single WITH cte1 AS (...), cte2 AS (...) SELECT ... statement regardless of how many stages you chain.

# Async — three-stage CTE pipeline: filter → aggregate → rank
from sqlalchemy import func, select, Numeric, Integer
import decimal


async def ranked_tenant_pipeline(session: AsyncSession):
    # Stage 1: unpaid invoices only
    unpaid = (
        select(Invoice.tenant_id, Invoice.amount)
        .where(Invoice.paid.is_(False))
        .cte(name="unpaid_invoices")
    )

    # Stage 2: sum per tenant
    summed = (
        select(
            unpaid.c.tenant_id,
            func.sum(unpaid.c.amount).label("outstanding"),
        )
        .group_by(unpaid.c.tenant_id)
        .cte(name="outstanding_by_tenant")
    )

    # Stage 3: rank by outstanding balance
    ranked = (
        select(
            summed.c.tenant_id,
            summed.c.outstanding,
            func.rank()
            .over(order_by=summed.c.outstanding.desc())
            .label("risk_rank"),
        )
        .cte(name="risk_ranked")
    )

    stmt = select(ranked).where(ranked.c.risk_rank <= 20)
    result = await session.execute(stmt)
    return result.mappings().all()

CTE + INSERT: Database-Side ETL

INSERT ... WITH ... lets you stage a derivation and write results in a single round trip. SQLAlchemy supports this via insert().from_select():

from sqlalchemy import insert, select, Table, Column, Integer, String, MetaData

metadata = MetaData()

archive_table = Table(
    "invoice_archive",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("tenant_id", Integer),
    Column("amount_str", String(32)),
)


async def archive_paid_invoices(session: AsyncSession):
    # CTE selects source rows
    source_cte = (
        select(Invoice.id, Invoice.tenant_id, Invoice.amount)
        .where(Invoice.paid.is_(True))
        .cte(name="paid_source")
    )

    # Outer INSERT references the CTE
    insert_stmt = insert(archive_table).from_select(
        ["id", "tenant_id", "amount_str"],
        select(
            source_cte.c.id,
            source_cte.c.tenant_id,
            func.cast(source_cte.c.amount, String).label("amount_str"),
        ),
    )

    async with session.begin():
        await session.execute(insert_stmt)

This pattern competes with high-performance bulk inserts using Core execute() when the source data already lives in the database and transformation is simple enough to express in SQL.

Window Functions Staged Behind a CTE

Window functions operating on pre-filtered data benefit from a CTE staging layer, because the planner can push the filter into the CTE and apply the window only over the reduced set. For full coverage of PARTITION BY, ORDER BY, and frame specs within SQLAlchemy's .over(), see the Window Functions and Analytical Queries guide.

from sqlalchemy import func, select


async def running_revenue_per_tenant(session: AsyncSession, tenant_id: int):
    # CTE: filter to one tenant's paid invoices
    tenant_invoices = (
        select(Invoice.id, Invoice.amount)
        .where(Invoice.tenant_id == tenant_id, Invoice.paid.is_(True))
        .cte(name="tenant_paid")
    )

    # Window: running total ordered by invoice id
    stmt = select(
        tenant_invoices.c.id,
        tenant_invoices.c.amount,
        func.sum(tenant_invoices.c.amount)
        .over(order_by=tenant_invoices.c.id)
        .label("running_total"),
    )

    result = await session.execute(stmt)
    return result.mappings().all()

PostgreSQL ARRAY-Based Cycle Detection

For PostgreSQL workloads where depth limiting is insufficient (graphs with valid long paths but occasional true cycles), use an accumulated path array. This is dialect-specific and requires sqlalchemy.dialects.postgresql.ARRAY:

from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy import Integer, array, func, select


async def safe_graph_traversal(session: AsyncSession):
    """Traverse a graph with true cycle detection via path accumulation."""
    anchor = select(
        Employee.id,
        Employee.manager_id,
        func.cast(
            func.array([Employee.id]),  # PostgreSQL array literal
            ARRAY(Integer),
        ).label("visited_ids"),
        func.cast(False, Boolean).label("is_cycle"),
    ).where(Employee.manager_id.is_(None))

    graph_cte = anchor.cte(name="graph_walk", recursive=True)
    alias = graph_cte.alias()

    # Only continue if the current node has not been visited
    recursive_term = (
        select(
            Employee.id,
            Employee.manager_id,
            func.array_append(alias.c.visited_ids, Employee.id).label("visited_ids"),
            Employee.id == func.any(alias.c.visited_ids).label("is_cycle"),
        )
        .join(alias, Employee.manager_id == alias.c.id)
        .where(alias.c.is_cycle.is_(False))
    )

    full = graph_cte.union_all(recursive_term)
    stmt = select(full).where(full.c.is_cycle.is_(False))
    result = await session.execute(stmt)
    return result.mappings().all()

Use depth limiting as the portable fallback on MySQL and SQLite, reserving array-based detection for PostgreSQL where correctness on cyclic data is required.

Hybrid Architectures & Migration Strategies

1.4 → 2.0 Migration Path for CTEs

If your codebase uses session.query(Model).cte(), you must rewrite to the Core select() API:

# Legacy 1.4 pattern — removed in 2.0
# cte = session.query(Node).filter(Node.parent_id.is_(None)).cte()

# SQLAlchemy 2.0 equivalent
cte = (
    select(Node)
    .where(Node.parent_id.is_(None))
    .cte(name="root_nodes")
)

The RemovedIn20Warning fires at runtime in 1.4 compatibility mode when you call .cte() on a legacy Query. Running python -W error::sqlalchemy.exc.RemovedIn20Warning surfaces all call sites at once.

Mixing Core and ORM in CTE Queries

Because CTEs are Core constructs, you can mix ORM-mapped columns with Core Table columns freely. The outer select() can map results back to ORM objects using from_statement():

from sqlalchemy.orm import Session


async def load_roots_as_orm(session: AsyncSession):
    cte = (
        select(Employee)
        .where(Employee.manager_id.is_(None))
        .cte(name="top_level")
    )

    # Map CTE results back to ORM instances
    stmt = select(Employee).from_statement(
        select(Employee.__table__).where(
            Employee.id.in_(select(cte.c.id))
        )
    )
    result = await session.execute(stmt)
    return result.scalars().all()

This lets you retain lazy-loadable ORM relationships on the returned instances while still leveraging the CTE for initial filtering.

Materialized Hints: When and How to Use Them

PostgreSQL 12+ introduced MATERIALIZED and NOT MATERIALIZED hints for CTEs. SQLAlchemy 2.0 exposes these as the materialized parameter:

# Force materialization — useful when a CTE is referenced 3+ times
expensive_cte = (
    select(Invoice.tenant_id, func.sum(Invoice.amount).label("total"))
    .group_by(Invoice.tenant_id)
    .cte(name="tenant_totals", materialized=True)
)

# Suppress materialization — let the planner inline and push predicates
simple_filter_cte = (
    select(Invoice)
    .where(Invoice.amount > 10_000)
    .cte(name="large_invoices", materialized=False)
)

Heuristic: pass materialized=True when the CTE contains an expensive aggregation referenced more than once in the outer query; pass materialized=False when it is a simple filter referenced once and you want predicate pushdown. Leave it unset (default) to let the planner decide — correct for most cases on PostgreSQL 12+.

On MySQL and SQLite, the materialized parameter has no effect: SQLAlchemy silently omits the hint because those dialects do not support it. Guard dialect-specific usage if your codebase targets multiple databases.

Production Pitfalls & Anti-Patterns

  • Missing recursive=True on anchor call. SQLAlchemy emits a plain WITH without RECURSIVE, causing a syntax error on PostgreSQL when the query references itself. Fix: always pass recursive=True to .cte() on the anchor.
  • Column count mismatch between anchor and recursive term. The UNION ALL requires identical column count and compatible types. SQLAlchemy raises ArgumentError: UNION types must match at compile time. Fix: explicitly cast every column with func.cast() or sqlalchemy.cast() and use .label() to ensure names align.
  • Aliasing the CTE before union, not after. A common mistake is calling .alias() on the CTE object before extending it with .union_all(). The alias must reference the CTE returned by .cte(), and .union_all() must be called on that same CTE object, not the alias. Fix: cte = anchor.cte(recursive=True); alias = cte.alias(); full = cte.union_all(recursive_step).
  • Using union() instead of union_all(). UNION deduplicates rows via an implicit DISTINCT, which adds a sort or hash operation to every recursive iteration. For tree traversal, this is almost always wrong and expensive. Use union_all().
  • Streaming without exhausting the async generator. If you open a stream_results=True cursor inside session.begin() and yield from it in a FastAPI endpoint without draining it, the connection is not returned to the pool until the response body is fully consumed. Fix: ensure the caller exhausts the generator or wrap in async for ... in result before the transaction context exits.
  • Applying ORM relationship loaders to CTE columns. selectinload and joinedload resolve against the ORM mapper's primary key. A CTE returning Employee columns but not mapped as an ORM entity will cause MissingGreenlet or DetachedInstanceError if lazy loading triggers. Fix: either load fully with explicit select() joins or use from_statement() to map the CTE back to ORM instances with a known identity.

Frequently Asked Questions

What is the difference between .cte() and a subquery in SQLAlchemy 2.0? A subquery (.subquery()) is inlined at the point of reference — every reference generates a separate nested SELECT. A CTE (.cte()) is hoisted to a WITH clause and named; multiple references in the same statement reuse the same definition. Whether the database executes it once or inlines it depends on the materialized setting and the dialect version. Use CTEs when you reference the same derived result more than once or when you need recursive=True.

Can I use CTEs inside a FastAPI dependency that uses yield? Yes. Construct the CTE and execute the statement inside the async with session.begin() block within your dependency. If you stream results (stream_results=True), drain them before yielding control to the endpoint, because the generator holds an open cursor. Alternatively, collect into a list inside the dependency and yield the list.

Does SQLAlchemy emit WITH RECURSIVE for all databases, or only PostgreSQL? SQLAlchemy emits WITH RECURSIVE for PostgreSQL, MySQL 8+, and SQLite 3.8.3+. For older MySQL versions (5.7 and below), recursive CTEs are not supported, and SQLAlchemy will produce SQL that the server rejects. Check your minimum supported database version before deploying recursive CTE logic.

How do I reference a CTE in a DELETE or UPDATE statement? Use the CTE as a subquery in a WHERE ... IN (SELECT ...) clause, or for PostgreSQL use a writable CTE via raw text(). SQLAlchemy's update() and delete() constructs do not directly accept a WITH clause in 2.0, so the portable approach is delete(Model).where(Model.id.in_(select(my_cte.c.id))).

What happens if my recursive CTE runs into PostgreSQL's max_stack_depth? PostgreSQL raises ERROR: stack depth limit exceeded. This happens when the recursive term calls itself too many times without a depth guard. Add .where(cte_alias.c.depth < N) in the recursive member. As a database-level backstop, set max_stack_depth in postgresql.conf (default 2MB, maximum 7.5MB on most systems) but do not rely on it as the primary termination condition.