Writing Window Functions for Running Totals in Python

Use func.sum(col).over(partition_by=..., order_by=..., rows=(None, 0)) in a SQLAlchemy 2.0 select() statement — rows=(None, 0) compiles to ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, which guarantees physical row ordering and avoids the quadratic tie-breaking behaviour of the default RANGE frame; this pattern and its async execution model are covered in the Window Functions and Analytical Queries guide.

Quick Answer

The minimal before/after replacement from legacy ORM to modern 2.0:

# Sync — SQLAlchemy 1.x legacy (do not use in 2.0 codebases)
from sqlalchemy.orm import Session
from sqlalchemy import func

def running_total_legacy(session: Session):
    return (
        session.query(
            Transaction.id,
            Transaction.amount,
            func.sum(Transaction.amount)
            .over(order_by=Transaction.created_at)
            .label("running_total"),
        )
        .all()
    )
# Async — SQLAlchemy 2.0 canonical form with frame clause
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
from decimal import Decimal


class Base(DeclarativeBase):
    pass


class Transaction(Base):
    __tablename__ = "transactions"

    id: Mapped[int] = mapped_column(primary_key=True)
    account_id: Mapped[int] = mapped_column(nullable=False)
    amount: Mapped[Decimal] = mapped_column(nullable=False)
    created_at: Mapped[datetime] = mapped_column(nullable=False)


async def running_total_2x(session: AsyncSession) -> list:
    stmt = select(
        Transaction.id,
        Transaction.account_id,
        Transaction.amount,
        Transaction.created_at,
        func.sum(Transaction.amount)
        .over(
            partition_by=Transaction.account_id,
            order_by=Transaction.created_at,
            rows=(None, 0),  # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        )
        .label("running_total"),
    ).order_by(Transaction.account_id, Transaction.created_at)

    result = await session.execute(stmt)
    return result.all()

Key differences: session.query() is removed in 2.0; select() is universal. await session.execute() replaces synchronous .all() on the query object. The rows=(None, 0) argument is new — it changes the generated OVER clause from the implicit RANGE UNBOUNDED PRECEDING AND CURRENT ROW to the more performant ROWS UNBOUNDED PRECEDING AND CURRENT ROW.

Execution Context & Async Workflow Integration

When await session.execute(stmt) runs, asyncpg sends the compiled SQL to PostgreSQL and opens a cursor. The database evaluates the window function server-side: it sorts the rows within each partition by created_at, then walks forward computing the cumulative sum. No Python-side arithmetic happens.

The result object returned by await session.execute() is a CursorResult. Calling .all() on it buffers every row in Python memory and closes the cursor. For small datasets this is fine. For tables with millions of rows, use yield_per to stream:

# Async — streaming a large running total result set
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator


async def stream_running_totals(
    session: AsyncSession,
    chunk_size: int = 500,
) -> AsyncIterator[list]:
    stmt = select(
        Transaction.account_id,
        Transaction.created_at,
        Transaction.amount,
        func.sum(Transaction.amount)
        .over(
            partition_by=Transaction.account_id,
            order_by=Transaction.created_at,
            rows=(None, 0),
        )
        .label("running_total"),
    ).execution_options(yield_per=chunk_size)

    result = await session.execute(stmt)
    try:
        async for batch in result.partitions(chunk_size):
            yield batch
    finally:
        await result.close()

The try/finally around the iterator is important: if the caller cancels the coroutine or raises before exhausting the stream, result.close() releases the server-side cursor and returns the connection to the pool. Without it, the connection remains checked out until garbage collection, which under load causes pool exhaustion.

Connection pool interaction: each await session.execute() call acquires a connection from the pool for the duration of result consumption. With streaming (yield_per), the connection stays checked out across every iteration of the async loop. Size your pool (pool_size, max_overflow) to account for concurrent streaming queries — a single streaming consumer holds a connection for the entire duration of iteration.

One important property of running total queries in async environments: because await yields control back to the event loop between iterations of an async generator, other coroutines can run concurrently. This means a second coroutine could commit a write to the transactions table while your streaming query is mid-result. If you need a consistent snapshot across the entire result, wrap the streaming operation in an explicit REPEATABLE READ transaction started before the first execute() call. Under the default READ COMMITTED isolation, each row-fetch within the stream may see different committed data, which produces an inconsistent running total.

Result materialization format matters for downstream consumers. Use .all() for tuple access, .mappings().all() for dict-like access by column name, or chain .scalars() only when the query projects a single column. For running totals — which always project multiple columns — .scalars() silently discards all columns except the first, a common source of bugs when migrating from single-column queries.

# Async — engine tuned for concurrent analytical streaming
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db/analytics",
    pool_size=15,
    max_overflow=5,
    pool_timeout=30,
    pool_pre_ping=True,
    pool_recycle=3600,
)

Resolving Warnings, Errors & Common Mistakes

Exact error or symptomRoot causeProduction fix
ProgrammingError: column "running_total" does not exist when adding WHERE running_total > 100Window expressions are evaluated after WHERE; you cannot reference a window alias in the same WHERE clauseWrap the window query in a .cte(), then filter on the CTE column in an outer select().where()
Running total does not increment — every row shows the same total equal to the partition sumorder_by is missing from .over(). Without ordering, SUM() OVER (PARTITION BY ...) computes the full-partition aggregate for every rowAlways pass order_by= to .over() for cumulative aggregates
RuntimeError: no running event loop or greenlet_spawn error when calling .all() on a sync session inside an async contextMixed sync/async: called a synchronous Session method from an async coroutine without run_sync()Use AsyncSession and await session.execute(); never mix sync session calls into async code paths
Slow query on large partition — EXPLAIN shows Sort node before WindowAggNo composite index on (account_id, created_at); the database materializes and sorts each partition in memoryCREATE INDEX CONCURRENTLY ON transactions (account_id, created_at) to let the planner stream sorted rows without an explicit sort step
Incorrect running total where rows with equal created_at are counted inconsistentlyDefault RANGE frame groups rows with equal ORDER BY values together, producing non-deterministic cumulative values within tiesSpecify rows=(None, 0) to use ROWS framing, which advances exactly one physical row at a time regardless of value ties
asyncpg.exceptions.TooManyConnectionsError during analytical batch jobsStreaming queries hold connections for the full duration of iteration; many concurrent streams exhaust the poolReduce concurrency, increase pool_size, or add pool_timeout with backpressure logic to retry on TimeoutError
AttributeError: 'RowMapping' object has no attribute 'running_total'Accessing a labeled column with attribute syntax on a RowMapping; it requires dict-style accessUse row["running_total"] or convert with dict(row) before attribute access, or use Python dataclass unpacking as shown in the parent guide

Advanced Running Total Optimization

Incremental Materialization with INSERT ... SELECT

For reporting tables refreshed on a schedule, avoid recomputing the full window over all historical rows. Instead, compute running totals only over new rows since the last refresh and append using INSERT ... SELECT:

# Async — incremental running total refresh
from sqlalchemy import func, select, insert, text, Table, Column, Integer, Numeric, DateTime, MetaData
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timezone

metadata = MetaData()

RunningTotals = Table(
    "running_totals_cache",
    metadata,
    Column("account_id", Integer, nullable=False),
    Column("as_of", DateTime(timezone=True), nullable=False),
    Column("running_total", Numeric(18, 4), nullable=False),
)


async def incremental_refresh(session: AsyncSession, since: datetime) -> None:
    # Find the last cached total per account to use as baseline
    last_cached = (
        select(
            RunningTotals.c.account_id,
            func.max(RunningTotals.c.as_of).label("last_refresh"),
            func.max(RunningTotals.c.running_total).label("baseline"),
        )
        .group_by(RunningTotals.c.account_id)
        .cte("last_cached")
    )

    # Compute incremental window over new rows only
    new_rows = select(
        Transaction.account_id,
        Transaction.created_at,
        (
            func.coalesce(last_cached.c.baseline, Decimal("0"))
            + func.sum(Transaction.amount).over(
                partition_by=Transaction.account_id,
                order_by=Transaction.created_at,
                rows=(None, 0),
            )
        ).label("running_total"),
    ).join(
        last_cached,
        Transaction.account_id == last_cached.c.account_id,
        isouter=True,
    ).where(
        Transaction.created_at > since
    ).cte("incremental")

    async with session.begin():
        await session.execute(
            insert(RunningTotals).from_select(
                ["account_id", "as_of", "running_total"],
                select(
                    new_rows.c.account_id,
                    new_rows.c.created_at,
                    new_rows.c.running_total,
                ),
            )
        )

This pattern reduces computation from O(all rows) to O(new rows since last refresh), which is critical for accounts with years of historical data. The incremental approach relies on monotonically increasing created_at values — if rows can be backdated, a full recompute is safer.

Using func.sum().filter() for Conditional Running Totals

When you need a running total that only accumulates certain rows — for example, credits but not debits, or confirmed orders but not cancelled ones — use the FILTER (WHERE ...) aggregate syntax rather than post-processing in Python:

# Async — running total of credits only, debits excluded
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def credit_running_total(session: AsyncSession) -> list:
    stmt = select(
        Transaction.account_id,
        Transaction.created_at,
        Transaction.amount,
        func.sum(Transaction.amount)
        .filter(Transaction.amount > 0)  # credits only
        .over(
            partition_by=Transaction.account_id,
            order_by=Transaction.created_at,
            rows=(None, 0),
        )
        .label("credit_running_total"),
        func.sum(Transaction.amount)
        .filter(Transaction.amount < 0)  # debits only
        .over(
            partition_by=Transaction.account_id,
            order_by=Transaction.created_at,
            rows=(None, 0),
        )
        .label("debit_running_total"),
    )
    result = await session.execute(stmt)
    return result.mappings().all()

This compiles to SUM(amount) FILTER (WHERE amount > 0) OVER (...) — a single database pass that produces both columns simultaneously. The alternative — two separate queries or Python-side filtering — doubles I/O or shifts work off the database, both of which are worse under load.

Resetting a Running Total at a Boundary

A less obvious but frequently requested pattern is resetting the running total at a logical boundary within a partition — for example, restarting the cumulative sum at the beginning of each calendar month while still partitioning by account. SQL does not have a native "reset on value change" frame clause, but you can solve this cleanly by composing the window function on a group-keyed expression rather than a raw timestamp column.

# Async — running total that resets at the start of each calendar month
from sqlalchemy import func, select, extract
from sqlalchemy.ext.asyncio import AsyncSession


async def monthly_running_totals(session: AsyncSession) -> list:
    # Derive a year-month key to use as a secondary partition dimension
    year_month = (
        extract("year", Transaction.created_at) * 100
        + extract("month", Transaction.created_at)
    ).label("year_month")

    stmt = select(
        Transaction.account_id,
        Transaction.created_at,
        Transaction.amount,
        year_month,
        func.sum(Transaction.amount)
        .over(
            # Partition by BOTH account AND calendar month
            partition_by=[
                Transaction.account_id,
                extract("year", Transaction.created_at),
                extract("month", Transaction.created_at),
            ],
            order_by=Transaction.created_at,
            rows=(None, 0),
        )
        .label("monthly_running_total"),
    ).order_by(Transaction.account_id, Transaction.created_at)

    result = await session.execute(stmt)
    return result.mappings().all()

By adding the year and month extracts to partition_by, the window resets at every month boundary without any Python-side logic. The database handles the boundary detection entirely through the partition key — each unique (account_id, year, month) tuple starts a fresh frame at UNBOUNDED PRECEDING. This approach scales to any reset boundary: fiscal quarters, billing cycles, or custom date bins, as long as you can express the boundary as a column or derived expression.

Frequently Asked Questions

Why does my running total show the same value on every row? The order_by argument is missing from .over(). Without it, SUM() OVER (PARTITION BY account_id) computes the full-partition total and stamps it identically on every row in that partition. Add order_by=Transaction.created_at (or whichever column defines the cumulative sequence) to produce a true running total.

What is the difference between rows=(None, 0) and range=(None, 0) in SQLAlchemy?rows=(None, 0) emits ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW — a physical row count frame. range=(None, 0) emits RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW — a logical value frame. With RANGE, if three rows share the same created_at value, they are all included in each other's frames, so the running total jumps by the sum of all tied rows at that timestamp. With ROWS, each row advances the total by exactly its own amount. For financial running totals, rows= is almost always the correct choice.

Can I compute a running total without partition_by? Yes. Omitting partition_by computes the running total across the entire result set as a single partition. This is correct for a global ledger total, but dangerous on large tables — the entire table is sorted in one partition, which can OOM the sort buffers. Always add partition_by when the logical domain divides naturally (by account, tenant, product, etc.).

How do I test running total queries against an in-memory SQLite database? SQLite supports window functions from version 3.25.0 (released 2018). Use aiosqlite with create_async_engine("sqlite+aiosqlite:///:memory:"). The rows=(None, 0) syntax compiles identically. The only difference is that SQLite does not support the FILTER clause on window aggregates — use a CASE WHEN inside SUM() as a workaround for test environments.

How do I verify the compiled SQL that SQLAlchemy generates for a running total query? Print the compiled SQL before executing it to confirm the OVER clause is correct:

from sqlalchemy import func, select
from sqlalchemy.dialects import postgresql

stmt = select(
    Transaction.account_id,
    func.sum(Transaction.amount)
    .over(
        partition_by=Transaction.account_id,
        order_by=Transaction.created_at,
        rows=(None, 0),
    )
    .label("running_total"),
)
# Print PostgreSQL-dialect SQL without executing
print(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))

This outputs the raw SUM(amount) OVER (PARTITION BY account_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) string, which you can paste directly into psql or a database GUI to verify correctness before running it in production. It also works with sqlite.dialect() when testing locally.