Using yield_per to Stream Millions of Rows in Async SQLAlchemy

Call AsyncSession.stream() with .execution_options(yield_per=N) (or the shorthand stream_scalars()) to iterate millions of rows without loading them all into memory at once — this technique is covered in full on the Streaming Large Result Sets with yield_per cluster.

Quick Answer

Before — the legacy buffered approach loads every matching row into Python memory in a single round-trip:

# BEFORE: buffered fetch — dangerous for large tables
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    total_cents: Mapped[int]
    status: Mapped[str]

async def export_all_orders_buffered(session: AsyncSession) -> list[Order]:
    # Loads every row into memory — will OOM on millions of rows
    result = await session.execute(select(Order))
    return result.scalars().all()

After — streaming with yield_per keeps memory flat regardless of result set size:

# AFTER: async streaming with yield_per
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    total_cents: Mapped[int]
    status: Mapped[str]

async def export_all_orders_streamed(session: AsyncSession) -> None:
    stmt = select(Order).where(Order.status == "completed").order_by(Order.id)

    async with await session.stream_scalars(stmt.execution_options(yield_per=1000)) as stream:
        async for order in stream:
            # process one Order ORM object at a time; peak memory stays near 1 000 rows
            await process_order(order)

async def process_order(order: Order) -> None:
    print(f"Processing order {order.id}, total: {order.total_cents / 100:.2f}")

Execution Context & Async Workflow Integration

AsyncSession.stream() vs stream_scalars()

SQLAlchemy exposes two coroutines for async streaming:

  • AsyncSession.stream(stmt) returns an AsyncMappingResult (or AsyncResult) — each iteration yields a Row object whose columns are accessed by name or index. Use this when you have multi-column projections (select(Order.id, Order.total_cents)) or when you need access to the raw row tuple.
  • AsyncSession.stream_scalars(stmt) is a convenience wrapper that calls stream() and then applies .scalars() — each iteration yields fully-hydrated ORM model instances. Use this for single-entity queries where you want Order objects directly.

Both accept execution_options(yield_per=N) either on the statement before passing it in, or applied directly on the connection via session.execute(stmt, execution_options={"yield_per": N}).

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    amount_cents: Mapped[int]
    issued_at: Mapped[sa.DateTime] = mapped_column(sa.DateTime(timezone=True))

async def stream_invoice_summaries(session: AsyncSession) -> None:
    # stream() for multi-column projections
    stmt = (
        select(Invoice.id, Invoice.user_id, Invoice.amount_cents)
        .order_by(Invoice.id)
        .execution_options(yield_per=500)
    )
    async with await session.stream(stmt) as stream:
        async for row in stream:
            print(f"invoice={row.id} user={row.user_id} amount={row.amount_cents}")

Event-Loop Safety and Connection Held Open

The most important operational fact about yield_per streaming is that the database connection is held checked out for the entire duration of the async-for loop. Unlike a buffered await session.execute() call that fetches all rows and releases the connection immediately, a streaming cursor keeps the connection occupied until the async with context manager exits.

This has a direct impact on connection pool sizing. If your application streams 10 concurrent exports, you need at least 10 connections in the pool. A pool sized for normal OLTP request/response patterns will exhaust quickly under heavy streaming load — see the guidance in Handling Connection Leaks and Pool Exhaustion for tuning strategies.

The result hydration happens in batches of exactly N rows (the yield_per value). asyncpg fetches those N rows from the server-side cursor, hands them to SQLAlchemy's result processor, and the async for loop delivers them one ORM object at a time. After consuming all N objects, the next iteration triggers another fetchmany(N) call over the same persistent cursor.

asyncpg Server-Side Cursor Behavior

asyncpg implements PostgreSQL's DECLARE CURSOR … FOR SELECT … protocol when a query is executed in a streaming context. These are unnamed cursors by default — created for the duration of the transaction and destroyed when the transaction ends. Named cursors (created with DECLARE name CURSOR FOR …) are also supported in asyncpg but SQLAlchemy's yield_per mechanism uses the unnamed form.

The cursor lives inside a transaction. SQLAlchemy automatically wraps the streaming session in an implicit transaction if one is not already open. This is important for two reasons:

  1. The cursor is only valid within its originating transaction — you cannot pass it across connection boundaries.
  2. The connection cannot be returned to the pool until both the cursor is closed and the transaction is committed or rolled back.

PgBouncer in Transaction Pooling Mode

If your infrastructure routes connections through PgBouncer in transaction pooling mode, server-side cursors will break. Transaction pooling returns the underlying PostgreSQL connection to the pool at the end of each transaction, which means the cursor — which must outlive individual statements but lives inside a transaction — is destroyed between asyncpg fetch calls.

The symptom is typically asyncpg.exceptions.InterfaceError or a silent empty result after the first batch.

Your options:

  • Switch PgBouncer to session pooling mode for the connections used by streaming queries. This keeps the same backend connection for the life of the application connection, allowing cursors to persist.
  • Use NullPool (create_async_engine(..., poolclass=NullPool)) for a dedicated streaming engine that connects directly to PostgreSQL without going through PgBouncer. This is the most reliable approach when you cannot reconfigure PgBouncer.
  • Avoid yield_per entirely for this pool and instead use keyset pagination to simulate streaming through sequential ranged queries that each complete in a single round-trip.
import asyncio
from sqlalchemy.pool import NullPool
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

# Dedicated streaming engine that bypasses PgBouncer
streaming_engine = create_async_engine(
    "postgresql+asyncpg://user:password@postgres-direct:5432/db",
    poolclass=NullPool,
)
StreamingSession = async_sessionmaker(streaming_engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    total_cents: Mapped[int]
    status: Mapped[str]

async def safe_stream_for_pgbouncer(session: AsyncSession) -> None:
    stmt = select(Order).order_by(Order.id).execution_options(yield_per=2000)
    async with await session.stream_scalars(stmt) as stream:
        async for order in stream:
            await process_order(order)

async def process_order(order: Order) -> None:
    pass

Resolving Warnings, Errors & Common Mistakes

Warning / ErrorRoot CauseProduction Fix
sqlalchemy.exc.InvalidRequestError: 'yield_per' cannot be used with 'joined' eager loadingjoinedload() generates a JOIN that produces duplicate parent rows — SQLAlchemy's de-duplication logic requires all rows to be buffered before it can reassemble relationships. yield_per bypasses this buffering.Replace joinedload() with selectinload() or subqueryload(). These strategies fire a separate query for related objects and are fully compatible with yield_per. See selectinload vs joinedload for the trade-offs.
asyncpg.exceptions.InterfaceError: cannot use a connection that has been closedThe connection was returned to the pool (or closed) before the async-for loop finished consuming the cursor. Common causes: PgBouncer transaction pooling reclaiming the connection, or a timeout closing the underlying socket mid-stream.For PgBouncer: use session pooling or NullPool. For timeouts: increase command_timeout on create_async_engine and ensure your PostgreSQL statement_timeout is set generously for the streaming user/role.
sqlalchemy.exc.ResourceClosedError: This result object does not return rows.The streaming context manager was exited (or the async with block ended) before iteration was complete, or stream_scalars() was awaited outside of an async with block.Always use async with await session.stream_scalars(stmt) as stream: and complete iteration inside the block. Do not await session.stream_scalars() and store the result to iterate later outside the context.
asyncpg.exceptions.InvalidSQLStatementNameError: prepared statement "__asyncpg_X" does not existPgBouncer in transaction pooling mode does not allow prepared statements to persist across transactions. asyncpg caches prepared statements by default.Set prepared_statement_cache_size=0 in the asyncpg connect arguments: create_async_engine("postgresql+asyncpg://...", connect_args={"prepared_statement_cache_size": 0}). Also consider switching PgBouncer to session pooling for the streaming pool.

Fixing the joinedload Incompatibility

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, selectinload
import sqlalchemy as sa

engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    orders: Mapped[list["Order"]] = relationship(back_populates="user")

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(sa.ForeignKey("users.id"))
    total_cents: Mapped[int]
    status: Mapped[str]
    user: Mapped["User"] = relationship(back_populates="orders")

async def stream_orders_with_users(session: AsyncSession) -> None:
    # WRONG: joinedload + yield_per raises InvalidRequestError
    # stmt = select(Order).options(joinedload(Order.user)).execution_options(yield_per=1000)

    # CORRECT: selectinload fires a second query per batch, compatible with yield_per
    stmt = (
        select(Order)
        .options(selectinload(Order.user))
        .where(Order.status == "pending")
        .order_by(Order.id)
        .execution_options(yield_per=1000)
    )
    async with await session.stream_scalars(stmt) as stream:
        async for order in stream:
            print(f"Order {order.id} placed by {order.user.email}")

Advanced Async Streaming Optimization

Choosing the yield_per Batch Size

The right N depends on row width. A practical rule of thumb:

  • Narrow rows (5–10 integer/string columns, ~200 bytes each): N = 2 000–5 000
  • Medium rows (~1 KB each, with text fields): N = 500–1 000
  • Wide rows (JSONB blobs, long text, ~10 KB each): N = 50–200

The goal is to keep each batch between 1 MB and 5 MB of raw data. Too small and you pay excessive round-trip overhead; too large and you spike memory on each batch and delay the first async for iteration.

Parallel Post-Processing with partitions() and asyncio.gather

SQLAlchemy's async stream objects expose a partitions(size) method that yields lists of N objects at once instead of one at a time. This is the key to parallelising CPU-bound or I/O-bound post-processing while still streaming from the database.

The pattern: collect a partition (a list of N ORM objects), then dispatch the batch to an async worker. Multiple partitions can be processed concurrently using asyncio.gather while the next partition is being fetched.

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

engine = create_async_engine(
    "postgresql+asyncpg://user:password@localhost/db",
    pool_size=20,
    max_overflow=10,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    amount_cents: Mapped[int]
    status: Mapped[str]

async def send_invoice_batch(invoices: list[Invoice]) -> None:
    """Simulate async I/O work — e.g. calling an external billing API."""
    await asyncio.sleep(0)  # replace with real async I/O
    print(f"Dispatched {len(invoices)} invoices")

async def stream_and_process_invoices_parallel(session: AsyncSession) -> None:
    stmt = (
        select(Invoice)
        .where(Invoice.status == "ready_to_send")
        .order_by(Invoice.id)
        .execution_options(yield_per=500)
    )

    in_flight: list[asyncio.Task] = []
    max_concurrent = 4  # tune based on external API rate limits

    async with await session.stream_scalars(stmt) as stream:
        async for partition in stream.partitions(500):
            # Launch processing task without awaiting it immediately
            task = asyncio.create_task(send_invoice_batch(list(partition)))
            in_flight.append(task)

            # Keep at most max_concurrent tasks running at once
            if len(in_flight) >= max_concurrent:
                await asyncio.gather(*in_flight)
                in_flight.clear()

    # Drain any remaining tasks after the stream is exhausted
    if in_flight:
        await asyncio.gather(*in_flight)

This pattern achieves genuine parallelism: while asyncpg is fetching the next partition from PostgreSQL, the event loop runs the send_invoice_batch coroutines for already-fetched partitions. The database connection stays open throughout (one connection consumed for the duration), while external I/O for processed batches overlaps with ongoing fetch work.

Applying execution_options on the Statement vs on the Connection

yield_per can be set in two places:

  1. On the statement (recommended): stmt.execution_options(yield_per=N) — scoped to this query only, composable with query builders.
  2. On the connection: await session.execute(stmt, execution_options={"yield_per": N}) — useful when you receive a pre-built statement from a helper and cannot modify it.

Avoid setting yield_per at the engine or session level — it would apply to all queries on that session, including writes and DDL, which do not return rows and will raise ResourceClosedError.

Frequently Asked Questions

Does yield_per work with synchronous SQLAlchemy sessions?

Yes — the synchronous Session has equivalent session.execute(stmt.yield_per(N)) support (note the ORM-level .yield_per() method on the statement, available since SQLAlchemy 1.4). The async version uses execution_options(yield_per=N) on the statement or the execute() call. The behavior is identical: rows are fetched in batches of N from a server-side cursor, keeping memory flat.

How does yield_per interact with SQLAlchemy's identity map?

Each hydrated ORM object is registered in the session's identity map as normal. With yield_per, the identity map grows by N entries per batch and is never cleared between batches. For very long streams (millions of rows of distinct objects), the identity map itself can become a memory concern. If you do not need change-tracking on streamed objects, call session.expunge_all() periodically inside the loop, or use session.execute(stmt) with .mappings() to get plain dicts instead of ORM instances.

Can I use yield_per inside a Celery task?

Celery workers are synchronous by default. Using async SQLAlchemy inside a Celery task requires running the coroutine via asyncio.run() or a persistent event loop per worker. A simpler approach for Celery is to use the synchronous Session with yield_per directly. If you are running Celery with gevent or eventlet concurrency, use the synchronous path — asyncpg is not compatible with those monkey-patching concurrency models. For details on integrating async SQLAlchemy with task workers, see Using SQLAlchemy Async with Celery Task Workers.

What happens if an exception is raised inside the async-for loop?

The async with context manager on the stream handles cleanup. If an exception propagates out of the loop body, the context manager's __aexit__ closes the cursor and rolls back the implicit transaction. The connection is returned to the pool. No manual cleanup is required — but you should ensure the async with await session.stream_scalars(stmt) as stream: pattern is used rather than manually awaiting and storing the stream object.

import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import sqlalchemy as sa

engine = create_async_engine("postgresql+asyncpg://user:password@localhost/db")
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    total_cents: Mapped[int]
    status: Mapped[str]

async def stream_with_error_handling() -> None:
    async with AsyncSessionLocal() as session:
        stmt = select(Order).order_by(Order.id).execution_options(yield_per=1000)
        try:
            async with await session.stream_scalars(stmt) as stream:
                async for order in stream:
                    if order.total_cents < 0:
                        raise ValueError(f"Negative total on order {order.id}")
                    await process_order(order)
        except ValueError as exc:
            # stream cursor is already closed by the context manager
            print(f"Stream aborted: {exc}")
            # session transaction is rolled back automatically on context manager exit

async def process_order(order: Order) -> None:
    pass