Implementing Recursive CTEs for Hierarchical Data in SQLAlchemy
Call .cte(recursive=True) on your anchor Select, alias the result, join the alias in a recursive term, extend with .union_all(), then execute against an AsyncSession — that is the complete pattern, covered in depth in the Common Table Expressions and Recursive Queries guide.
Quick Answer
Before/after: the 1.4 pattern used session.query() with no recursive=True parameter and relied on implicit dialect handling. SQLAlchemy 2.0 requires an explicit Select-based anchor:
# Legacy 1.4 — broken in 2.0 async contexts
# cte = session.query(Category).filter(Category.parent_id == None).cte(recursive=True)
# recursive_q = session.query(Category).join(cte, Category.parent_id == cte.c.id)
# full = cte.union_all(recursive_q)
# SQLAlchemy 2.0 — works in sync and async
from sqlalchemy import select, Integer, String, ForeignKey, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.ext.asyncio import AsyncSession
class Base(DeclarativeBase):
pass
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
parent_id: Mapped[int | None] = mapped_column(ForeignKey("categories.id"))
name: Mapped[str] = mapped_column(String(120))
children: Mapped[list["Category"]] = relationship(
back_populates="parent", foreign_keys="[Category.parent_id]"
)
parent: Mapped["Category | None"] = relationship(
back_populates="children",
foreign_keys="[Category.parent_id]",
remote_side="Category.id",
)
async def fetch_category_tree(session: AsyncSession, root_id: int, max_depth: int = 10):
anchor = select(
Category.id,
Category.parent_id,
Category.name,
func.cast(0, Integer).label("depth"),
).where(Category.id == root_id)
tree_cte = anchor.cte(name="category_tree", recursive=True)
alias = tree_cte.alias()
recursive_term = (
select(
Category.id,
Category.parent_id,
Category.name,
(alias.c.depth + 1).label("depth"),
)
.join(alias, Category.parent_id == alias.c.id)
.where(alias.c.depth < max_depth)
)
full_cte = tree_cte.union_all(recursive_term)
stmt = select(full_cte).order_by(full_cte.c.depth, full_cte.c.name)
result = await session.execute(stmt)
return result.mappings().all()
Execution Context & Async Workflow Integration
Why the database handles this better than Python. A Python loop that fetches children one level at a time issues one SELECT per depth level — O(d) round trips for a tree of depth d, with each blocking on network latency. A recursive CTE issues one statement; the database engine evaluates it using an internal working table, fully within a single transaction cursor. For an org chart with 6 management levels and 10 000 employees, this is the difference between 6 network round trips and 1.
Under asyncio with asyncpg, the execution is:
await session.execute(stmt)suspends the coroutine and yields to the event loop.asyncpgsends the compiled SQL to PostgreSQL over its native binary protocol.- PostgreSQL executes the recursive CTE internally, then streams rows back.
asyncpgbuffers rows; the coroutine resumes when the cursor is ready.- For large trees,
stream_results=Truefetches rows in configurable server-side chunks rather than materializing the entire result in Python memory.
# Async streaming variant for large org charts
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncGenerator
async def stream_org_chart(
session: AsyncSession, root_id: int
) -> AsyncGenerator[dict, None]:
anchor = select(
Employee.id,
Employee.manager_id,
Employee.name,
Employee.department,
func.cast(0, Integer).label("depth"),
).where(Employee.id == root_id)
org_cte = anchor.cte(name="org_chart", recursive=True)
alias = org_cte.alias()
recursive_term = (
select(
Employee.id,
Employee.manager_id,
Employee.name,
Employee.department,
(alias.c.depth + 1).label("depth"),
)
.join(alias, Employee.manager_id == alias.c.id)
.where(alias.c.depth < 15)
)
full_cte = org_cte.union_all(recursive_term)
stmt = select(full_cte).order_by(full_cte.c.depth)
async with session.begin():
result = await session.execute(
stmt.execution_options(stream_results=True, yield_per=500)
)
async for partition in result.partitions(500):
for row in partition:
yield dict(row._mapping)
The yield_per=500 execution option tells asyncpg to fetch 500 rows per server round trip. For a 50 000-node org chart, this produces 100 fetches instead of loading all rows into a single Python list — keeping peak memory bounded to roughly 500 × row_size.
Connection pool interaction. The async with session.begin() context holds one connection from the pool for the lifetime of the generator. If the caller stops iterating early, Python's async generator protocol calls aclose(), which triggers the __aexit__ of session.begin(), returning the connection. Never leave a streaming generator suspended without a timeout when connections are scarce.
Resolving Warnings, Errors & Common Mistakes
| Error / Warning | Root Cause | Production Fix |
|---|---|---|
ArgumentError: UNION types must match | Anchor and recursive term have different column types (e.g., Integer vs BigInteger) or counts | Use func.cast(col, TargetType).label("name") in both terms; verify column count parity |
ProgrammingError: syntax error at or near "RECURSIVE" | Driver targeting MySQL 5.7 or MariaDB below 10.2, which do not support recursive CTEs | Gate on dialect version; use adjacency list + application-side recursion as fallback |
RecursionError (Python-side) | Calling tree_cte.union_all() inside a Python recursive function that itself recurses | CTEs are built by calling Python methods in sequence, not by recursing in Python; build anchor then recursive_term linearly |
ERROR: stack depth limit exceeded (PostgreSQL) | No depth guard; cyclic or deeply nested data exhausts max_stack_depth | Add .where(alias.c.depth < N) in recursive term; verify with EXPLAIN ANALYZE |
MissingGreenlet: greenlet_spawn has not been called | Accessing a lazy-loaded ORM relationship attribute on a CTE-mapped row outside async context | Use result.mappings().all() to consume plain dicts; avoid accessing .children or .parent on results |
DetachedInstanceError after async session close | Returning ORM instances from a CTE query and accessing unloaded attributes outside the session | Use .mappings() or explicitly selectinload before the session closes |
RemovedIn20Warning: Query.cte() is deprecated | Using session.query(Model).cte() from 1.4 code | Replace with select(Model).cte(recursive=True) |
CompileError: Unconsumed column names | Passing wrong column names to from_select() when inserting from a CTE | Ensure the name list matches CTE .c attributes exactly, including aliases |
Advanced Hierarchical Query Optimization
Path Materialization for Breadcrumb Rendering
Many UI applications need not just depth but the full ancestor path (e.g., "Electronics > Computers > Laptops"). Compute this inside the CTE using string concatenation, avoiding a second query:
# Async — category tree with materialized path for breadcrumb rendering
from sqlalchemy import select, Integer, String, func
from sqlalchemy.ext.asyncio import AsyncSession
async def fetch_tree_with_paths(session: AsyncSession, root_id: int):
anchor = select(
Category.id,
Category.parent_id,
Category.name,
Category.name.label("path"), # seed the path with root name
func.cast(0, Integer).label("depth"),
).where(Category.id == root_id)
path_cte = anchor.cte(name="category_path", recursive=True)
alias = path_cte.alias()
recursive_term = select(
Category.id,
Category.parent_id,
Category.name,
(alias.c.path + " > " + Category.name).label("path"),
(alias.c.depth + 1).label("depth"),
).join(alias, Category.parent_id == alias.c.id).where(alias.c.depth < 8)
full_cte = path_cte.union_all(recursive_term)
stmt = select(full_cte).order_by(full_cte.c.depth, full_cte.c.name)
result = await session.execute(stmt)
return result.mappings().all()
# Each row: {id, parent_id, name, path: "Electronics > Computers > Laptops", depth: 2}
The path column grows by concatenation at each recursion level. For PostgreSQL, use || operator equivalent via SQLAlchemy's + on String columns; the ORM compiler maps this correctly. On MySQL, the same + may need func.concat() — test against your target dialect.
Partial Tree Queries: Subtree and Ancestors
Rather than always starting from the root, production applications commonly need a subtree (all descendants below a given node) or an ancestor chain (all parents up to the root). Both use the same recursive CTE structure with a different anchor condition:
# Ancestors: walk up from a leaf to the root
async def fetch_ancestors(session: AsyncSession, leaf_id: int):
anchor = select(
Employee.id,
Employee.manager_id,
Employee.name,
func.cast(0, Integer).label("depth"),
).where(Employee.id == leaf_id)
anc_cte = anchor.cte(name="ancestors", recursive=True)
alias = anc_cte.alias()
# Join in reverse: current row's manager_id == alias.c.id
recursive_term = (
select(
Employee.id,
Employee.manager_id,
Employee.name,
(alias.c.depth + 1).label("depth"),
)
.join(alias, Employee.id == alias.c.manager_id)
.where(alias.c.depth < 20)
)
full_cte = anc_cte.union_all(recursive_term)
stmt = select(full_cte).order_by(full_cte.c.depth)
result = await session.execute(stmt)
return result.mappings().all()
Note the reversed join condition: Employee.id == alias.c.manager_id (child's manager is the current row) rather than the downward Employee.manager_id == alias.c.id. Both directions use identical CTE syntax; only the join predicate changes.
Index Strategy for Recursive CTE Performance
Without an index on parent_id, PostgreSQL performs a sequential scan on employees at every recursive iteration. For a 100 000-row table with 10 recursion levels, that is 10 sequential scans — catastrophic at scale.
-- Run once at migration time; the async engine never needs to know
CREATE INDEX ix_employees_manager_id ON employees (manager_id);
CREATE INDEX ix_categories_parent_id ON categories (parent_id);
In SQLAlchemy model definitions, add index=True to the self-referential foreign key:
class Employee(Base):
__tablename__ = "employees"
id: Mapped[int] = mapped_column(primary_key=True)
manager_id: Mapped[int | None] = mapped_column(
ForeignKey("employees.id"), index=True # critical for recursive CTE performance
)
name: Mapped[str] = mapped_column(String(120))
department: Mapped[str] = mapped_column(String(80))
With the index in place, each iteration of the recursive term performs an index seek on manager_id, reducing the total query cost from O(n × d) to O(k × d) where k is the branching factor — typically 5-20 for real org charts.
Frequently Asked Questions
Do I need recursive=True if my CTE does not reference itself?
No. recursive=True is only required when the CTE references its own name in the body. For non-recursive staged CTEs, omit it — passing recursive=True on a non-recursive CTE compiles to WITH RECURSIVE which is valid SQL but wastes a keyword and may confuse readers.
Why does SQLAlchemy require .alias() on the CTE before joining it in the recursive term?
The CTE object returned by .cte() represents the named CTE declaration. When you reference it inside its own recursive definition, SQLAlchemy needs a distinct alias object so it can distinguish "the CTE being defined" from "the CTE being referenced as an input to this iteration". Without .alias(), the compiler cannot generate unambiguous SQL for the self-join.
Can I use selectinload or joinedload on a recursive CTE result?
Not directly. Relationship loaders work against the ORM's identity map and primary key, not arbitrary CTE columns. If you need related objects (e.g., each Employee's Department), either include the related columns in the CTE's anchor/recursive select (via a join inside the CTE), or issue a second query using Employee.id.in_(cte_id_subquery) with selectinload.
Is there a maximum recursion depth enforced by the database?
PostgreSQL does not impose a configurable row-count limit on CTE recursion; the limit is stack memory (max_stack_depth, default 2MB). MySQL 8+ enforces cte_max_recursion_depth (default 1000). SQLite has a compile-time limit (SQLITE_MAX_EXPR_DEPTH, default 1000). Always add an application-level depth guard regardless of database, and document the maximum expected tree depth in your schema comments.
Related
- Common Table Expressions and Recursive Queries — full coverage of CTE mechanics, materialized hints, chained CTEs, and async execution patterns.
- Window Functions and Analytical Queries — staging recursive results behind window functions for ranking and running totals over hierarchical data.
- Complex Joins and Relationship Loading Strategies — loading related objects alongside CTE results without N+1 queries.
- Advanced Query Patterns and Bulk Data Operations — parent guide with the complete map of SQLAlchemy 2.0 query techniques.