Streaming Large Result Sets with yield_per in SQLAlchemy 2.0
SQLAlchemy's yield_per keeps memory flat when processing millions of rows — part of the broader Advanced Query Patterns and Bulk Data Operations toolkit.
Concept & Execution Model
Every database query produces a result set. The simplest approach is to fetch the entire result into client memory before your application code touches a single row. For queries returning thousands of records that approach is invisible — rows arrive quickly, Python allocates a flat list, your loop iterates, and the list is garbage-collected. Scale that to ten million rows and the picture changes: the database streams hundreds of megabytes across the wire, the DBAPI buffers them, SQLAlchemy hydrates ORM instances for each row, and the Python heap swells proportionally. On a typical 4 GB container that sequence ends in an OOM kill long before any business logic runs.
yield_per solves the problem by activating server-side cursors, which tell the database to hold the result set open and deliver rows in fixed-size batches on demand. Rather than one massive transfer, the client issues a series of FETCH N commands, processes the batch, discards it, then requests the next. Peak memory becomes a function of batch size, not total row count — a budget you control explicitly.
SQLAlchemy 2.0 integrates this behaviour at two levels. The execution_options(yield_per=N) call on a Select statement works identically on the synchronous Session and the asynchronous AsyncSession. Underneath, the ORM delegates to the DBAPI cursor's fetchmany() method (or the asyncpg-native equivalent), so the streaming guarantee holds regardless of whether your application uses await or plain function calls. The identity map, relationship loading, and type coercion layers all remain active — rows are fully hydrated ORM instances, not raw tuples — which distinguishes yield_per from bypassing the ORM entirely.
The minimal synchronous form:
from sqlalchemy import select
from sqlalchemy.orm import Session, DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Invoice(Base):
__tablename__ = "invoices"
id: Mapped[int] = mapped_column(primary_key=True)
tenant_id: Mapped[int] = mapped_column(index=True)
total_cents: Mapped[int] = mapped_column()
status: Mapped[str] = mapped_column()
def stream_invoices_sync(session: Session, tenant_id: int) -> None:
stmt = (
select(Invoice)
.where(Invoice.tenant_id == tenant_id)
.order_by(Invoice.id)
.execution_options(yield_per=2000)
)
for invoice in session.scalars(stmt):
process(invoice)
The identical operation on an AsyncSession:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async def stream_invoices_async(session: AsyncSession, tenant_id: int) -> None:
stmt = (
select(Invoice)
.where(Invoice.tenant_id == tenant_id)
.order_by(Invoice.id)
.execution_options(yield_per=2000)
)
async with session.stream_scalars(stmt) as result:
async for invoice in result:
await process(invoice)
Both forms activate server-side cursors. The synchronous version exhausts the generator inside the same thread; the async version uses an async for loop without blocking the event loop between fetches.
Query Construction & Async Execution Patterns
SQLAlchemy 2.0's unified select() construct is the entry point for all streaming queries, whether you reach for the ORM or Core. The four execution surfaces most relevant to large result sets are:
| Surface | Returns | When to use |
|---|---|---|
session.execute(stmt) | Result[Row] | Row tuples, Core queries |
session.scalars(stmt) | ScalarResult[T] | ORM instances, single entity |
await session.stream(stmt) | AsyncResult[Row] (context manager) | Async row tuples |
await session.stream_scalars(stmt) | AsyncScalarResult[T] (context manager) | Async ORM instances |
The stream() and stream_scalars() methods on AsyncSession exist specifically for streaming; they return asynchronous context managers that hold the database cursor open for the duration of the async with block. Attempting to iterate the result after exiting the block raises InvalidRequestError because the underlying connection has been returned to the pool.
A complete async pattern that feeds an external queue:
import asyncio
from typing import AsyncGenerator
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
tenant_id: Mapped[int] = mapped_column(index=True)
amount_cents: Mapped[int] = mapped_column()
state: Mapped[str] = mapped_column()
engine = create_async_engine(
"postgresql+asyncpg://user:secret@localhost/billing",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def enqueue_pending_orders(
queue: asyncio.Queue,
tenant_id: int,
batch_size: int = 1000,
) -> None:
stmt = (
select(Order)
.where(Order.tenant_id == tenant_id, Order.state == "pending")
.order_by(Order.id)
.execution_options(yield_per=batch_size)
)
async with AsyncSessionLocal() as session:
async with session.stream_scalars(stmt) as result:
async for order in result:
await queue.put(order)
The synchronous counterpart uses Result.partitions() for explicit chunking — useful when you need to send batches to a downstream consumer rather than individual rows:
from sqlalchemy import select
from sqlalchemy.orm import Session
def export_orders_in_chunks(
session: Session,
tenant_id: int,
batch_size: int = 1000,
) -> None:
stmt = (
select(Order)
.where(Order.tenant_id == tenant_id)
.order_by(Order.id)
.execution_options(yield_per=batch_size)
)
result = session.execute(stmt)
for chunk in result.partitions(batch_size):
write_to_parquet(chunk)
Result.partitions(N) groups consecutive rows into lists of length N, which maps cleanly to file chunking, message broker batches, or database micro-transactions.
State Management & Session Boundaries
Streaming keeps a database connection checked out for the entire duration of the iteration. This has two important consequences for session and transaction design.
Identity map accumulation. The SQLAlchemy session maintains an identity map — a dictionary keyed by primary key that stores every loaded ORM instance. Without explicit management, streaming ten million rows into a long-lived session fills this map with ten million objects that are never evicted. Memory grows linearly even though yield_per caps the in-flight DBAPI buffer. The fix is to call session.expunge_all() after each batch or to use expire_on_commit=False combined with an explicit session.expire(instance) per row when mutation is not needed.
from sqlalchemy import select
from sqlalchemy.orm import Session
def stream_and_evict(session: Session, batch_size: int = 5000) -> None:
stmt = (
select(Invoice)
.order_by(Invoice.id)
.execution_options(yield_per=batch_size)
)
result = session.execute(stmt)
for chunk in result.partitions(batch_size):
for row in chunk:
(invoice,) = row
process(invoice)
session.expunge_all() # prevent identity map growth
Transaction scope. Server-side cursors in PostgreSQL require an open transaction. asyncpg begins a transaction implicitly on the first statement, so streaming without an explicit BEGIN still works — but autocommit mode breaks this assumption. Always wrap streaming queries in an explicit async with session.begin() block when your session factory uses autocommit=True, or switch to the default transactional mode.
Connection checkout duration. A streaming query holds its connection for as long as iteration takes. If your event loop processes each row slowly — for instance, making additional database calls per row — you may exhaust the connection pool before the stream finishes. Design streaming consumers to be as fast as possible; push slow work to a background task or queue rather than performing it inline inside the iteration loop.
Advanced Streaming Patterns
execution_options(yield_per=N) vs stream_results=True
execution_options(yield_per=N) is the high-level ORM option. It sets both stream_results=True and the fetch size simultaneously. If you are working at the Core level with a raw Connection, you can set stream_results=True independently and rely on cursor.fetchmany() manually:
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncEngine
async def core_streaming(engine: AsyncEngine) -> None:
async with engine.connect() as conn:
result = await conn.execute(
text("SELECT id, amount_cents FROM invoices ORDER BY id"),
execution_options={"stream_results": True, "max_row_buffer": 2000},
)
while True:
rows = result.fetchmany(2000)
if not rows:
break
for row in rows:
process_row(row)
Setting max_row_buffer caps how many rows asyncpg pre-fetches in its internal buffer. Without it, the driver may speculatively fetch ahead and partially defeat the memory savings.
AsyncSession.stream() and stream_scalars()
stream() returns an AsyncResult over Row objects — useful when selecting multiple columns or mixing ORM columns with aggregate expressions. stream_scalars() unwraps the first column automatically, which suits single-entity queries:
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
async def revenue_by_tenant(session: AsyncSession) -> None:
stmt = (
select(Invoice.tenant_id, func.sum(Invoice.total_cents).label("revenue"))
.group_by(Invoice.tenant_id)
.order_by(Invoice.tenant_id)
.execution_options(yield_per=500)
)
# stream() for multi-column results
async with session.stream(stmt) as result:
async for partition in result.partitions(500):
for row in partition:
record_revenue(row.tenant_id, row.revenue)
Why yield_per is Incompatible with joinedload
This is the most common pitfall. When joinedload is applied to a one-to-many relationship, SQLAlchemy generates a JOIN that produces one row per child entity. A parent with twenty children appears as twenty rows in the raw result. SQLAlchemy normally de-duplicates these rows using the identity map, holding the complete set in memory until all rows for a given parent have arrived — which requires buffering the entire result.
yield_per conflicts directly with this behaviour. SQLAlchemy cannot know whether the next batch contains more rows for the current parent without looking ahead, so it raises InvalidRequestError at execution time:
sqlalchemy.exc.InvalidRequestError: Can't use yield_per with joined eager loading.
The fix is to replace joinedload with selectinload, which issues a second SELECT ... WHERE parent_id IN (...) query for each batch of parents rather than joining at the row level. This is compatible with yield_per and often faster for large collections because it avoids the row-multiplication overhead of the JOIN.
from sqlalchemy import select
from sqlalchemy.orm import Mapped, mapped_column, relationship, selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
class Product(Base):
__tablename__ = "products"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
class OrderLine(Base):
__tablename__ = "order_lines"
id: Mapped[int] = mapped_column(primary_key=True)
order_id: Mapped[int] = mapped_column(index=True)
product_id: Mapped[int] = mapped_column()
quantity: Mapped[int] = mapped_column()
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
tenant_id: Mapped[int] = mapped_column(index=True)
amount_cents: Mapped[int] = mapped_column()
state: Mapped[str] = mapped_column()
lines: Mapped[List[OrderLine]] = relationship("OrderLine", lazy="select")
async def stream_orders_with_lines(session: AsyncSession) -> None:
stmt = (
select(Order)
.options(selectinload(Order.lines)) # CORRECT: compatible with yield_per
# .options(joinedload(Order.lines)) # WRONG: raises InvalidRequestError
.order_by(Order.id)
.execution_options(yield_per=500)
)
async with session.stream_scalars(stmt) as result:
async for order in result:
for line in order.lines:
process_line(order, line)
The relationship-loading deep-dive at Complex Joins and Relationship Loading Strategies covers when subqueryload is preferable to selectinload and how to diagnose N+1 patterns in streaming contexts.
Choosing the Right Batch Size
The optimal yield_per value balances memory consumption against round-trip overhead. Each batch requires one FETCH N command, so extremely small values (yield_per=1) generate chatty traffic. Very large values (yield_per=100000) approach full-buffering behaviour and defeat the purpose.
A practical formula: estimate your target peak memory budget for the streaming operation (say, 256 MB for a data export worker), divide by the average row size in bytes (measurable with pg_column_size() or Python's sys.getsizeof() on a sample instance), and use that quotient as your starting point. Then validate by measuring actual heap usage with tracemalloc under realistic load.
For rows averaging 500 bytes each and a 256 MB budget:
256 * 1024 * 1024 / 500 ≈ 536,870 rows
Account for ORM hydration overhead (typically 3–5x the raw row bytes for instrumented instances) and you arrive at roughly 100,000 rows — still too large for most use cases once the identity map and Python object graph are counted. A practical starting range for typical ORM-hydrated rows is 1,000–10,000, with 5,000 as a reasonable default for rows under 1 KB each. For wide rows (many TEXT or JSONB columns), start at 500.
The detailed async implementation walkthrough at Using yield_per to Stream Millions of Rows in Async benchmarks batch sizes against wall-clock throughput and memory profiles across asyncpg and psycopg3 drivers.
Memory Profile Comparison
The diagram below illustrates the difference in client-side memory usage between a buffered full-fetch and yield_per streaming over the same ten-million-row result set.
The buffered approach accumulates memory proportionally to total row count, peaking just before your code processes the first row. yield_per keeps the footprint constant because each batch is discarded before the next is fetched.
Keyset Pagination as a Complement
Server-side cursors are not the only tool for large result traversal. Cursor-based keyset pagination issues separate queries, each starting after the last primary key of the previous batch, rather than holding a cursor open. This avoids long-lived connections and works across stateless HTTP services where a single request cannot hold a database connection for minutes. The trade-offs between these approaches — and when to combine them — are explored in Paginating Large Result Sets with Keyset Pagination.
Hybrid Architectures & Migration Strategies
Mixing Core and ORM in Streaming Pipelines
Some pipelines benefit from ORM-level streaming for the initial traversal (to leverage relationship loading and type coercion) but switch to Core for downstream writes (to bypass ORM overhead during bulk mutations). This hybrid pattern is fully supported in SQLAlchemy 2.0:
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine
async def recalculate_invoice_totals(
session: AsyncSession,
engine: AsyncEngine,
tenant_id: int,
) -> None:
stmt = (
select(Invoice)
.where(Invoice.tenant_id == tenant_id, Invoice.status == "draft")
.order_by(Invoice.id)
.execution_options(yield_per=2000)
)
pending_updates: list[dict] = []
async with session.stream_scalars(stmt) as result:
async for invoice in result:
new_total = compute_total(invoice)
pending_updates.append({"id": invoice.id, "total_cents": new_total})
if len(pending_updates) >= 2000:
# Flush the accumulated batch via Core to avoid ORM overhead
async with engine.begin() as conn:
await conn.execute(
update(Invoice),
pending_updates,
)
pending_updates.clear()
# Flush remainder
if pending_updates:
async with engine.begin() as conn:
await conn.execute(update(Invoice), pending_updates)
This pattern reads with the ORM (gaining type safety and relationship traversal) and writes with Core (gaining bulk-execute throughput). For the bulk write half of this pattern, the performance guidance at High-Performance Bulk Inserts and Updates covers transaction batching, RETURNING clause usage, and deadlock avoidance.
Migrating from SQLAlchemy 1.4 to 2.0
In 1.4, the equivalent of yield_per was Query.yield_per(N) on the legacy session.query() API. The 2.0 migration path is mechanical:
| 1.4 pattern | 2.0 equivalent |
|---|---|
session.query(Invoice).yield_per(1000) | session.scalars(select(Invoice).execution_options(yield_per=1000)) |
session.query(Invoice).with_for_update().yield_per(1000) | select(Invoice).with_for_update().execution_options(yield_per=1000) |
for invoice in query.yield_per(1000) (sync) | async for invoice in result (async) |
The execution_options() approach is more composable: you can attach it to a subquery, pass it through a helper function that constructs the Select, or apply it conditionally based on an estimated row count without restructuring the query. The legacy Query.yield_per() was a method on the session-bound query object and could not be composed this way.
One important 2.0 change: scalars() and stream_scalars() return ScalarResult / AsyncScalarResult, which iterate directly over the first column of each row. If you were iterating over Row objects in 1.4 and accessing columns by name, switch to session.execute() / session.stream() and unpack row.Invoice or row[0] explicitly.
Production Pitfalls & Anti-Patterns
| Pitfall | Exception or Symptom | Fix |
|---|---|---|
Using joinedload with yield_per | InvalidRequestError: Can't use yield_per with joined eager loading | Replace joinedload with selectinload or subqueryload for all eager-loaded relationships |
| Streaming without an active transaction in autocommit mode | Cursor closes prematurely; partial results silently truncated | Wrap the stream in async with session.begin() before calling stream() or stream_scalars() |
| Identity map growth across the full stream | Monotonically growing heap; eventual MemoryError | Call session.expunge_all() after processing each batch, or use expire_on_commit=True |
| Slow per-row processing blocking the connection checkout | Pool exhaustion; TimeoutError: QueuePool limit reached | Move per-row side effects to an async queue; process batches not individual rows inside the stream |
Missing order_by on the streaming query | Non-deterministic result ordering; duplicate or skipped rows on cursor resume | Always add an order_by clause tied to an indexed column (typically the primary key) |
Applying yield_per to a query that uses DISTINCT or GROUP BY on non-identity columns | ORM identity deduplication breaks batch boundaries; InvalidRequestError | Use Core-level streaming (stream_results=True) for aggregated or deduplicated result sets |
Frequently Asked Questions
Does yield_per work with AsyncSession?
Yes. Apply .execution_options(yield_per=N) to the Select statement before passing it to await session.stream_scalars(stmt) or await session.stream(stmt). Both methods return asynchronous context managers that keep the DBAPI cursor open for server-side delivery. The AsyncSession uses the asyncpg driver's native cursor protocol, so the memory savings are identical to the synchronous path — rows are delivered in batches of N without the full result ever being materialized on the client.
What happens if I use joinedload with yield_per?
SQLAlchemy raises sqlalchemy.exc.InvalidRequestError at execution time with the message "Can't use yield_per with joined eager loading." This is a deliberate guard: joined eager loading multiplies rows at the SQL level and requires the ORM to buffer all rows for a given parent entity before yielding it. That buffering requirement is fundamentally incompatible with the fixed-batch guarantee that yield_per provides. The correct replacement is selectinload, which issues a separate SELECT ... IN (...) query per batch of parent IDs and is fully compatible with streaming.
How do I choose the right N for yield_per?
Start from your memory budget for the streaming operation, divide by the estimated per-row cost including ORM instrumentation (typically 3–5x raw row bytes), and round down to the nearest power of ten. Then validate empirically: stream 100,000 rows from production data with tracemalloc enabled and observe peak allocation. Adjust N until peak allocation fits your budget with a 30% safety margin. For most applications processing rows under 1 KB each, yield_per=5000 is a reasonable starting point. For wide rows with large text or JSON columns, start at 500. Avoid values below 100 (too many round trips) or above 50,000 (approaches full-buffer behaviour).
Can I use yield_per with SQLAlchemy Core?
Yes. Set stream_results=True in execution_options on any Core Select statement executed against a Connection or AsyncConnection. You then iterate with cursor.fetchmany() or result.fetchmany(). The yield_per shorthand sets both stream_results=True and max_row_buffer=N simultaneously, which is why it is preferred at the ORM level — it encapsulates both settings in a single call. At the Core level you may want separate control over the two, for example setting a large max_row_buffer for asyncpg pre-fetching while keeping your application batch size smaller.
Does yield_per prevent timeouts on long streams?
yield_per itself does not manage statement timeouts. PostgreSQL's statement_timeout setting applies to the total execution time of the cursor's initial EXECUTE call, not to the duration of FETCH calls. If your database has a 30-second statement_timeout and the initial EXECUTE for a 100-million-row query takes 45 seconds to plan and begin streaming, you will receive a QueryCanceledError regardless of yield_per. To handle long streams safely: set statement_timeout to 0 for the connection used for streaming (via connect_args), use an indexed ORDER BY so PostgreSQL can begin streaming without a full table scan, and implement application-level checkpointing (storing the last processed primary key) so a failed stream can resume rather than restart from row zero.
Related
- Advanced Query Patterns and Bulk Data Operations — the parent topic covering the full spectrum of SQLAlchemy 2.0 query and bulk-data techniques, including CTEs, window functions, and connection pool tuning.
- Using yield_per to Stream Millions of Rows in Async — a step-by-step walkthrough benchmarking batch sizes and memory profiles across asyncpg and psycopg3 for production async streaming.
- Paginating Large Result Sets with Keyset Pagination — covers cursor-based keyset pagination as a stateless complement to server-side cursor streaming, with comparisons of trade-offs by use case.
- High-Performance Bulk Inserts and Updates — details the write side of large data pipelines: Core-level bulk inserts, upsert conflict resolution, and chunked transaction strategies.
- Complex Joins and Relationship Loading Strategies — explains when to use selectinload vs joinedload vs subqueryload, directly relevant to avoiding the joinedload/yield_per incompatibility.