Advanced Query Patterns and Bulk Data Operations in SQLAlchemy 2.0

Modern Python applications demand a database layer that scales predictably under concurrent load, resolves complex relational graphs without N+1 degradation, and ingests high-velocity datasets without exhausting memory or connection pools — all covered in this guide to advanced query patterns and bulk data operations.

SQLAlchemy 2.0 delivers a fundamentally restructured execution model that unifies Core and ORM constructs under a single select()/execute()/scalars() pipeline, enforces explicit async boundaries, and exposes first-class primitives for bulk inserts, analytical SQL, and streaming iteration. This guide targets Python developers and backend architects who need production-grade patterns for every layer of that pipeline — from relationship loader selection to multi-tenant schema routing to keyset pagination of hundred-million-row tables.

SQLAlchemy 2.0 Query/Data-Flow Pipeline Shows how select(), insert(), and update() constructs flow through execute() into a Result object, which then fans out to scalars(), mappings(), partitions(), and stream() for ORM objects, dicts, bulk streaming, and async streaming respectively. Construct Execute Result Consume select(Model) insert().values() update().where() func.row_number() .over(...) select().cte( recursive=True) await session. execute(stmt) conn.execute() for Core bulk session.stream() for streaming Result CursorResult / AsyncResult Row namedtuple per column RETURNING rows from bulk insert AsyncScalar for stream() .scalars() ORM instances .mappings() dict-like rows .rowcount bulk result .partitions(N) yield_per ORM select path Core bulk path Analytical / streaming

Architectural Foundations

The Unified select()/execute()/scalars() API

SQLAlchemy 2.0's most consequential change is the retirement of session.query() in favour of a single construction/execution pipeline that works identically for Core and ORM. Every query begins with select(), insert(), update(), or delete() from sqlalchemy — these produce statement objects that know nothing about the session or connection. Execution is explicit: you pass the statement to await session.execute(stmt) or await conn.execute(stmt), receive a Result (or CursorResult for Core), and then choose how to consume it.

The Result object is lazy by default. Calling .scalars() extracts the first column of each row (usually an ORM instance), .mappings() produces RowMapping dicts, .all() materialises the full list, and .partitions(N) yields fixed-size batches. For async workloads that must not block the event loop during iteration, await session.stream(stmt) returns an AsyncResult with the same API surface but backed by server-side cursors.

# Full imports shown — runnable Python 3.11+
from __future__ import annotations

from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(unique=True, index=True)
    tenant_id: Mapped[int] = mapped_column(index=True)
    status: Mapped[str] = mapped_column(default="active")


engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
)


async def fetch_active_users(tenant: int, limit: int = 100) -> Sequence[User]:
    stmt = (
        select(User)
        .where(User.tenant_id == tenant, User.status == "active")
        .order_by(User.id)
        .limit(limit)
    )
    async with AsyncSession(engine) as session:
        result = await session.execute(stmt)
        return result.scalars().all()  # returns list[User]

Result and Row: What You Actually Get Back

Every execute() call returns a CursorResult (sync) or wraps one in an AsyncResult (async). The internal representation is a sequence of Row objects — named tuples keyed by column label. When you call .scalars() on an ORM select(Model), SQLAlchemy unwraps each Row to produce the mapped instance. When you call .mappings(), you get RowMapping objects that behave like read-only dicts with both attribute and key access.

Understanding Row matters for analytical queries, where you select raw expressions alongside ORM columns:

from sqlalchemy import func, select

# Result rows contain (User, int) pairs — not just User instances
stmt = select(User, func.count(Order.id).label("order_count")).join(
    Order, Order.user_id == User.id
).group_by(User.id)

async with AsyncSession(engine) as session:
    result = await session.execute(stmt)
    for user, count in result:          # unpack the Row directly
        print(f"{user.email}: {count} orders")

Core vs ORM for Bulk Operations

The ORM excels at transactional writes on individual or small groups of objects — it maintains the identity map, fires events, and handles relationship cascades. For ingestion workloads operating on thousands to millions of rows, those features become overhead. Core's conn.execute(insert(Model), list_of_dicts) bypasses ORM instrumentation entirely, batches rows via the driver's executemany protocol, and returns a lightweight CursorResult with rowcount and optional RETURNING data.

The boundary is not arbitrary. Use ORM session.add() / session.add_all() when: object identity matters, you need before_flush / after_flush events, or you're writing fewer than ~500 rows per transaction. Use Core conn.execute(insert(...)) when: throughput is the primary constraint, you want ON CONFLICT DO UPDATE, or you need RETURNING to capture generated primary keys in bulk.

Key Component Deep-Dive: Relationship Loading Strategies and N+1

The N+1 problem emerges whenever code triggers one query for a parent collection and then one additional query per parent to resolve a relationship. In SQLAlchemy 2.0, loader strategies are specified per-statement rather than on the mapping — options(selectinload(...)), options(joinedload(...)), options(subqueryload(...)), and options(raiseload("*")) — giving you fine-grained control that changes per endpoint.

Refer to complex joins and relationship loading strategies for the full treatment; this section establishes the decision rules.

selectinload: The Safe Default for Collections

selectinload issues a second SELECT … WHERE parent_id IN (…) after the primary query resolves. The entire collection population happens in two round trips regardless of how many parents were loaded, making it O(1) in terms of query count. It is safe with yield_per and with AsyncSession because there is no JOIN that could produce duplicate parent rows.

from sqlalchemy import select
from sqlalchemy.orm import selectinload, Mapped, mapped_column, relationship


class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column()
    amount: Mapped[float] = mapped_column()
    items: Mapped[list["OrderItem"]] = relationship(back_populates="order")


class OrderItem(Base):
    __tablename__ = "order_items"
    id: Mapped[int] = mapped_column(primary_key=True)
    order_id: Mapped[int] = mapped_column()
    sku: Mapped[str] = mapped_column()
    order: Mapped[Order] = relationship(back_populates="items")


async def get_orders_with_items(session: AsyncSession) -> list[Order]:
    stmt = (
        select(Order)
        .options(selectinload(Order.items))   # one IN query after primary SELECT
        .order_by(Order.id)
        .limit(200)
    )
    result = await session.execute(stmt)
    return result.scalars().all()

joinedload: When to Use It and When Not To

joinedload emits a LEFT OUTER JOIN so that parent rows and their related rows arrive in a single round trip. This is ideal for many-to-one or one-to-one relationships (user → tenant, order → invoice), where each parent matches exactly one related row and there is no cardinality explosion. It is dangerous for one-to-many collections when the parent set is large, because the JOIN multiplies rows — 1000 orders each with 50 items produces 50,000 result rows that SQLAlchemy deduplicates in Python.

The exact warning you will see when this goes wrong is:

SAWarning: SELECT statement has a cartesian product

raiseload("*") is the guard: put it on every relationship except the ones you explicitly load in a given query, and any accidental lazy load raises sqlalchemy.exc.InvalidRequestError immediately rather than silently issuing a query.

from sqlalchemy.orm import joinedload, raiseload

async def get_order_with_invoice(session: AsyncSession, order_id: int) -> Order:
    stmt = (
        select(Order)
        .options(
            joinedload(Order.invoice),   # one-to-one: safe
            raiseload("*"),              # block all other lazy loads
        )
        .where(Order.id == order_id)
    )
    result = await session.execute(stmt)
    return result.scalars().one()

The guide on using selectinload vs joinedload for N+1 prevention benchmarks both strategies across payload sizes.

subqueryload and raiseload

subqueryload embeds the relationship query as a correlated subquery. It is rarely preferable to selectinload in 2.0 — the subquery approach can confuse some query planners and the IN strategy is generally more predictable. raiseload has no performance cost; it simply registers a guard. Use it unconditionally in APIs where you cannot enumerate every relationship in advance.

Key Component Deep-Dive: Bulk Inserts, Updates, and RETURNING

High-throughput data ingestion is where Core's superiority over ORM is most pronounced. The high-performance bulk inserts and updates guide covers driver-level benchmarks in detail; here we establish the patterns and their rationale.

Core executemany via conn.execute(insert(...), rows)

When you pass a list of dicts as the second argument to conn.execute(), SQLAlchemy delegates to the driver's native executemany implementation. With asyncpg, this compiles to a server-side prepared statement executed once per batch — dramatically reducing per-row overhead compared to individual INSERT statements.

from __future__ import annotations

import asyncio
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from myapp.models import Base, Product  # Product has id, sku, price, tenant_id


async def bulk_insert_products(engine: AsyncEngine, rows: list[dict]) -> int:
    """Insert rows in chunks of 10 000. Returns total inserted count."""
    CHUNK = 10_000
    total = 0
    async with engine.begin() as conn:
        for start in range(0, len(rows), CHUNK):
            chunk = rows[start : start + CHUNK]
            result = await conn.execute(insert(Product), chunk)
            total += result.rowcount
    return total

Wrap chunks in a single engine.begin() context to commit all-or-nothing. If idempotency is required, switch to the PostgreSQL-specific dialect:

from sqlalchemy.dialects.postgresql import insert as pg_insert

async def upsert_products(engine: AsyncEngine, rows: list[dict]) -> None:
    stmt = pg_insert(Product).values(rows)
    upsert = stmt.on_conflict_do_update(
        index_elements=["sku", "tenant_id"],
        set_={
            "price": stmt.excluded.price,
        },
    )
    async with engine.begin() as conn:
        await conn.execute(upsert)

ORM bulk_insert_mappings and bulk_update_mappings (2.0 style)

SQLAlchemy 2.0 formalised the 1.x bulk_insert_mappings approach into session.execute(insert(Model), list_of_dicts) via the ORM insert() path — which triggers before_bulk_insert events and respects column defaults while still using executemany. This is the right middle ground when you need ORM events but not per-object identity tracking.

from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import AsyncSession


async def bulk_create_users(session: AsyncSession, data: list[dict]) -> None:
    # ORM-routed executemany: fires mapper events, respects column defaults
    await session.execute(insert(User), data)
    await session.commit()


async def bulk_deactivate_users(session: AsyncSession, user_ids: list[int]) -> None:
    stmt = (
        update(User)
        .where(User.id.in_(user_ids))
        .values(status="inactive")
        .execution_options(synchronize_session=False)
    )
    await session.execute(stmt)
    await session.commit()

RETURNING for Generated Keys

PostgreSQL's RETURNING clause lets you capture auto-generated primary keys or computed columns from bulk inserts without a separate SELECT round trip. In SQLAlchemy 2.0, chain .returning(Model.id) onto any DML statement:

from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine

from myapp.models import Invoice


async def insert_invoices_returning_ids(
    engine: AsyncEngine, rows: list[dict]
) -> list[int]:
    stmt = insert(Invoice).returning(Invoice.id)
    async with engine.begin() as conn:
        result = await conn.execute(stmt, rows)
        return result.scalars().all()   # list of generated Invoice.id values

The batch-inserting guide at batch inserting millions of rows with SQLAlchemy Core execute covers memory-safe chunking at that scale.

Key Component Deep-Dive: Analytical SQL — Window Functions, CTEs, and Recursive Queries

Analytical workloads push SQLAlchemy's expression language to its full extent. The key insight is that func.* and .over() produce SQL expression objects — they compose with select() like any other clause, which means they participate in subqueries, CTEs, and RETURNING just as scalar columns do.

Window Functions for Running Totals and Rankings

func.<name>().over(partition_by=..., order_by=...) translates directly to FUNCTION() OVER (PARTITION BY … ORDER BY …). The result column appears in the Row alongside any ORM columns you selected.

from __future__ import annotations

from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Order, User


async def orders_with_running_total(
    session: AsyncSession, tenant_id: int
) -> list[tuple]:
    """Return each order with a running revenue total, partitioned by user."""
    stmt = select(
        Order.id,
        Order.user_id,
        Order.amount,
        func.sum(Order.amount)
        .over(
            partition_by=Order.user_id,
            order_by=Order.id,
            rows=(None, 0),           # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        )
        .label("running_total"),
        func.row_number()
        .over(partition_by=Order.user_id, order_by=Order.id)
        .label("row_num"),
    ).where(Order.tenant_id == tenant_id)

    result = await session.execute(stmt)
    return result.all()   # list of Row(id, user_id, amount, running_total, row_num)

The window functions and analytical queries cluster covers LAG, LEAD, NTILE, and FIRST_VALUE with dialect-specific notes. A worked example for time-series data appears in writing window functions for running totals in Python.

Non-Recursive CTEs for Query Decomposition

A CTE (Common Table Expression) names an intermediate result set that subsequent clauses can reference. In SQLAlchemy 2.0, call .cte(name="...") on any Select to produce a CTE object. You then select() from the CTE alias exactly as if it were a table:

from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Order


async def top_customers_by_revenue(session: AsyncSession, n: int = 10) -> list:
    # Step 1: revenue per user as a named CTE
    revenue_cte = (
        select(
            Order.user_id,
            func.sum(Order.amount).label("total_revenue"),
        )
        .group_by(Order.user_id)
        .cte("revenue_summary")
    )

    # Step 2: rank and limit using the CTE
    stmt = (
        select(revenue_cte.c.user_id, revenue_cte.c.total_revenue)
        .order_by(revenue_cte.c.total_revenue.desc())
        .limit(n)
    )
    result = await session.execute(stmt)
    return result.all()

The common table expressions and recursive queries cluster explains how CTEs interact with DISTINCT ON, lateral joins, and multi-step analytical pipelines.

Recursive CTEs for Hierarchical Data

Recursive CTEs traverse trees and graphs by having the CTE reference its own alias. SQLAlchemy 2.0's cte(recursive=True) combined with .union_all() maps cleanly onto the SQL WITH RECURSIVE clause. The anchor query selects the root nodes; the recursive member joins the CTE alias to find children.

from sqlalchemy import Integer, func, select
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Base


class Category(Base):
    __tablename__ = "categories"
    id: Mapped[int] = mapped_column(primary_key=True)
    parent_id: Mapped[int | None] = mapped_column()
    name: Mapped[str] = mapped_column()
    slug: Mapped[str] = mapped_column()


async def fetch_category_tree(session: AsyncSession) -> list:
    # Anchor: root nodes (no parent)
    anchor = select(
        Category.id,
        Category.parent_id,
        Category.name,
        func.cast(0, Integer).label("depth"),
    ).where(Category.parent_id.is_(None))

    tree_cte = anchor.cte(name="category_tree", recursive=True)
    cte_alias = tree_cte.alias()

    # Recursive member: children joined to CTE alias
    recursive_part = select(
        Category.id,
        Category.parent_id,
        Category.name,
        (cte_alias.c.depth + 1).label("depth"),
    ).join(cte_alias, Category.parent_id == cte_alias.c.id)

    full_tree = tree_cte.union_all(recursive_part)

    stmt = select(full_tree).order_by(full_tree.c.depth, full_tree.c.id)
    result = await session.execute(stmt)
    return result.all()

The full implementation guide — including cycle detection with CYCLE clauses and depth guards for unbounded graphs — lives in implementing recursive CTEs for hierarchical data in SQLAlchemy.

Advanced Patterns and Production Configuration

yield_per and stream_results: Choosing Thresholds

yield_per instructs SQLAlchemy to fetch rows from the cursor in fixed-size batches rather than materialising the entire result set. It is applied as an execution option:

stmt = select(User).order_by(User.id).execution_options(yield_per=5000)
async with session.stream(stmt) as result:
    async for partition in result.partitions(5000):
        process(partition)

The right batch size depends on row width, available heap, and downstream processing speed. For narrow rows (4-6 columns, scalar values), 5000–10 000 rows per partition is a reasonable starting point. For wide rows with large text or JSON columns, drop to 500–1000. Setting yield_per too high with joinedload is a correctness hazard (see Pitfalls), not just a memory one.

stream_results=True is a lower-level flag that instructs the driver to use a server-side cursor on backends that support it (asyncpg, psycopg). It is set automatically when you call session.stream(), but you can set it explicitly on a Core connection for fine-grained control:

async with engine.connect() as conn:
    result = await conn.execution_options(stream_results=True).execute(stmt)

The complete analysis of thresholds and memory profiles is in using yield_per to stream millions of rows in async.

Keyset Pagination for Deep Result Sets

OFFSET-based pagination degrades linearly with depth: OFFSET 500000 LIMIT 100 requires the database to scan and discard half a million rows. Keyset pagination (also called cursor pagination) solves this by filtering on the last-seen key values:

from sqlalchemy import and_, select, tuple_
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Order


async def page_orders(
    session: AsyncSession,
    last_id: int | None = None,
    page_size: int = 100,
) -> list[Order]:
    stmt = select(Order).order_by(Order.id).limit(page_size)
    if last_id is not None:
        stmt = stmt.where(Order.id > last_id)
    result = await session.execute(stmt)
    return result.scalars().all()

For composite sort keys (e.g., created_at DESC, id ASC), use tuple_() for row-value comparison, which PostgreSQL evaluates with a single index scan. Full patterns including bidirectional pagination are covered in paginating large result sets with keyset pagination.

Multi-Tenant Schema Routing

PostgreSQL's schema-per-tenant isolation model routes each request to a different search_path. SQLAlchemy 2.0 supports this via execution_options(schema_translate_map=...), which rewrites table references at compile time:

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Product


async def get_tenant_products(session: AsyncSession, tenant_slug: str) -> list[Product]:
    # Rewrite __main__ schema to the tenant's actual schema at execute time
    stmt = select(Product).execution_options(
        schema_translate_map={"__main__": tenant_slug}
    )
    result = await session.execute(stmt)
    return result.scalars().all()

The schema_translate_map is applied per-statement, so you can mix tenant and shared-schema tables in a single query. Pair this with a FastAPI dependency that extracts the tenant slug from the JWT and wraps the session. The full guide is in dynamic schema and multi-tenant routing.

Query Plan Caching

SQLAlchemy 2.0 introduced a compiled query cache that stores the string SQL and bound parameter positions keyed by the statement object's structure. This avoids re-compiling identical queries on every call. The cache is enabled by default with query_cache_size=500 on the engine. Queries that use literal_column() or Python-side string interpolation bypass the cache; always use bound parameters instead.

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    pool_size=10,
    max_overflow=20,
    query_cache_size=1000,   # increase for APIs with many distinct query shapes
)

For asyncpg specifically, the driver also maintains a prepared statement cache per connection. When using PgBouncer in transaction-pooling mode, you must set statement_cache_size=0 in the connect_args to disable it — see the async engines and connection pooling guide for the full pgbouncer configuration.

Connection Pool Tuning for Bulk Workloads

Bulk insert jobs and streaming queries hold connections for longer than typical OLTP requests. Size the pool accordingly:

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/mydb",
    pool_size=5,          # baseline connections kept alive
    max_overflow=10,      # burst headroom for concurrent bulk jobs
    pool_timeout=60,      # seconds to wait for a connection (raise TimeoutError)
    pool_recycle=3600,    # recycle connections older than 1 hour
    pool_pre_ping=True,   # validate connection before checkout
)

For AWS RDS and other managed services, set pool_size to match the instance's max_connections divided by the number of application workers, leaving headroom for administrative connections.

Common Pitfalls and Anti-Patterns

  • SAWarning: SELECT statement has a cartesian product — Root cause: joinedload applied to a one-to-many relationship where the parent set is large, producing M×N rows in the result before deduplication. Fix: replace with selectinload for collections. Use joinedload only for many-to-one or one-to-one sides.
  • Silent N+1 from lazy loading — Root cause: accessing a relationship attribute outside the session context (or without a loader option) triggers an implicit SELECT per parent object. Fix: always specify loader strategies on every statement — selectinload, joinedload, or raiseload("*") to catch the remaining cases immediately rather than silently.
  • sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called — Root cause: code that accesses a lazy-loaded relationship attribute inside an async def context without the greenlet executor that AsyncSession requires. Fix: always eagerly load relationships used after await points, or access them within a sync event (run_sync) if lazy loading cannot be avoided.
  • yield_per with joinedload on a collection — Root cause: yield_per slices the cursor into fixed-size partitions, but joinedload on a collection can split a single parent's rows across partition boundaries, leaving the ORM unable to reconstruct the full collection for that object. The query silently returns incomplete data. Fix: use selectinload whenever yield_per is active.
  • Memory blowup from .all() on large result sets — Root cause: calling result.scalars().all() materialises the entire result as a Python list before your code touches a single row. Fix: replace with session.stream(stmt) and iterate via .partitions(N), keeping memory usage bounded to the partition size.
  • Using session.add_all(objects) for bulk ingestion — Root cause: each call to add() registers the object in the identity map, fires init events, and will issue individual INSERT statements unless the session flushes in bulk. Even with bulk flushing, the ORM overhead is significant. Fix: use conn.execute(insert(Model), list_of_dicts) for write-throughput workloads exceeding ~500 rows per transaction.
  • Forgetting synchronize_session=False on bulk UPDATE/DELETE — Root cause: by default, bulk update() and delete() attempt to synchronise the in-memory session state with the database changes, which requires fetching affected primary keys. For large updates this is expensive. Fix: set .execution_options(synchronize_session=False) and expire or refresh affected objects explicitly if needed.

Frequently Asked Questions

How does the 2.0 select() API differ from the legacy session.query() approach?session.query() was a fluent ORM-only API that compiled queries tightly coupled to the session. select() is a dialect-agnostic SQL expression that compiles independently of execution context — you pass it to any session.execute(), conn.execute(), or await session.stream(). This separation enables the same statement object to be executed on a sync connection, an async connection, or inspected as SQL without touching the database. The query() API is still present in 2.0 for backwards compatibility but raises RemovedIn20Warning under SQLALCHEMY_WARN_20=1.

When should I use Core conn.execute() vs ORM session.execute() for inserts? Use conn.execute(insert(Model), rows) (Core path) when throughput is paramount and you do not need the identity map, mapper events, or relationship cascades. Use session.execute(insert(Model), rows) (ORM-routed path) when you want mapper-level before_bulk_insert / after_bulk_insert events to fire and column defaults to be applied, but you still need executemany performance. Use session.add() / session.add_all() only for individual objects or very small batches where per-object identity tracking has value.

What is the safe batch size for yield_per streaming? There is no universal answer — it depends on row width, JVM-side processing time, and available memory per worker. As a starting heuristic: 5000 rows per partition for narrow rows (integer/float/short-varchar columns), 500–1000 for wide rows containing JSON, TEXT, or BYTEA columns. Measure actual RSS growth during a representative streaming run and halve the batch size if it climbs beyond your pod's memory limit.

Can I mix window functions with ORM-mapped models in a single query? Yes. select(User, func.rank().over(...).label("rank")) returns Row objects that unpack as (User_instance, int). The ORM reconstructs the User object from the mapped columns; the window function result is a plain Python value in the same row. Be aware that adding a window function changes the query semantics — the ORM cannot use the result for identity-map deduplication on the window column.

How do I route different tenants to different PostgreSQL schemas without changing my model definitions? Use execution_options(schema_translate_map={"__main__": tenant_slug}) per statement or per session. Map __main__ (or any sentinel string you set as __table_args__ = {"schema": "__main__"} on your models) to the runtime tenant schema. The rewrite happens at compile time, before the SQL reaches the driver. See dynamic schema and multi-tenant routing for middleware integration patterns.

Why does my recursive CTE return an infinite loop or raise a database error? Recursive CTEs without a termination condition will loop until the database's max_recursive_iterations limit is hit (PostgreSQL default: none — it will run until the query times out or exhausts stack). Always add a WHERE depth < N guard in the recursive member, or use PostgreSQL 14+'s CYCLE clause to detect repeated node visits. SQLAlchemy does not add any automatic depth limiting; you own the termination logic.