Writing Window Functions for Running Totals in Python
Use func.sum(col).over(partition_by=..., order_by=..., rows=(None, 0)) in a SQLAlchemy 2.0 select() statement — rows=(None, 0) compiles to ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, which guarantees physical row ordering and avoids the quadratic tie-breaking behaviour of the default RANGE frame; this pattern and its async execution model are covered in the Window Functions and Analytical Queries guide.
Quick Answer
The minimal before/after replacement from legacy ORM to modern 2.0:
# Sync — SQLAlchemy 1.x legacy (do not use in 2.0 codebases)
from sqlalchemy.orm import Session
from sqlalchemy import func
def running_total_legacy(session: Session):
return (
session.query(
Transaction.id,
Transaction.amount,
func.sum(Transaction.amount)
.over(order_by=Transaction.created_at)
.label("running_total"),
)
.all()
)
# Async — SQLAlchemy 2.0 canonical form with frame clause
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
from decimal import Decimal
class Base(DeclarativeBase):
pass
class Transaction(Base):
__tablename__ = "transactions"
id: Mapped[int] = mapped_column(primary_key=True)
account_id: Mapped[int] = mapped_column(nullable=False)
amount: Mapped[Decimal] = mapped_column(nullable=False)
created_at: Mapped[datetime] = mapped_column(nullable=False)
async def running_total_2x(session: AsyncSession) -> list:
stmt = select(
Transaction.id,
Transaction.account_id,
Transaction.amount,
Transaction.created_at,
func.sum(Transaction.amount)
.over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0), # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
.label("running_total"),
).order_by(Transaction.account_id, Transaction.created_at)
result = await session.execute(stmt)
return result.all()
Key differences: session.query() is removed in 2.0; select() is universal. await session.execute() replaces synchronous .all() on the query object. The rows=(None, 0) argument is new — it changes the generated OVER clause from the implicit RANGE UNBOUNDED PRECEDING AND CURRENT ROW to the more performant ROWS UNBOUNDED PRECEDING AND CURRENT ROW.
Execution Context & Async Workflow Integration
When await session.execute(stmt) runs, asyncpg sends the compiled SQL to PostgreSQL and opens a cursor. The database evaluates the window function server-side: it sorts the rows within each partition by created_at, then walks forward computing the cumulative sum. No Python-side arithmetic happens.
The result object returned by await session.execute() is a CursorResult. Calling .all() on it buffers every row in Python memory and closes the cursor. For small datasets this is fine. For tables with millions of rows, use yield_per to stream:
# Async — streaming a large running total result set
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator
async def stream_running_totals(
session: AsyncSession,
chunk_size: int = 500,
) -> AsyncIterator[list]:
stmt = select(
Transaction.account_id,
Transaction.created_at,
Transaction.amount,
func.sum(Transaction.amount)
.over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0),
)
.label("running_total"),
).execution_options(yield_per=chunk_size)
result = await session.execute(stmt)
try:
async for batch in result.partitions(chunk_size):
yield batch
finally:
await result.close()
The try/finally around the iterator is important: if the caller cancels the coroutine or raises before exhausting the stream, result.close() releases the server-side cursor and returns the connection to the pool. Without it, the connection remains checked out until garbage collection, which under load causes pool exhaustion.
Connection pool interaction: each await session.execute() call acquires a connection from the pool for the duration of result consumption. With streaming (yield_per), the connection stays checked out across every iteration of the async loop. Size your pool (pool_size, max_overflow) to account for concurrent streaming queries — a single streaming consumer holds a connection for the entire duration of iteration.
One important property of running total queries in async environments: because await yields control back to the event loop between iterations of an async generator, other coroutines can run concurrently. This means a second coroutine could commit a write to the transactions table while your streaming query is mid-result. If you need a consistent snapshot across the entire result, wrap the streaming operation in an explicit REPEATABLE READ transaction started before the first execute() call. Under the default READ COMMITTED isolation, each row-fetch within the stream may see different committed data, which produces an inconsistent running total.
Result materialization format matters for downstream consumers. Use .all() for tuple access, .mappings().all() for dict-like access by column name, or chain .scalars() only when the query projects a single column. For running totals — which always project multiple columns — .scalars() silently discards all columns except the first, a common source of bugs when migrating from single-column queries.
# Async — engine tuned for concurrent analytical streaming
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"postgresql+asyncpg://user:pass@db/analytics",
pool_size=15,
max_overflow=5,
pool_timeout=30,
pool_pre_ping=True,
pool_recycle=3600,
)
Resolving Warnings, Errors & Common Mistakes
| Exact error or symptom | Root cause | Production fix |
|---|---|---|
ProgrammingError: column "running_total" does not exist when adding WHERE running_total > 100 | Window expressions are evaluated after WHERE; you cannot reference a window alias in the same WHERE clause | Wrap the window query in a .cte(), then filter on the CTE column in an outer select().where() |
| Running total does not increment — every row shows the same total equal to the partition sum | order_by is missing from .over(). Without ordering, SUM() OVER (PARTITION BY ...) computes the full-partition aggregate for every row | Always pass order_by= to .over() for cumulative aggregates |
RuntimeError: no running event loop or greenlet_spawn error when calling .all() on a sync session inside an async context | Mixed sync/async: called a synchronous Session method from an async coroutine without run_sync() | Use AsyncSession and await session.execute(); never mix sync session calls into async code paths |
Slow query on large partition — EXPLAIN shows Sort node before WindowAgg | No composite index on (account_id, created_at); the database materializes and sorts each partition in memory | CREATE INDEX CONCURRENTLY ON transactions (account_id, created_at) to let the planner stream sorted rows without an explicit sort step |
Incorrect running total where rows with equal created_at are counted inconsistently | Default RANGE frame groups rows with equal ORDER BY values together, producing non-deterministic cumulative values within ties | Specify rows=(None, 0) to use ROWS framing, which advances exactly one physical row at a time regardless of value ties |
asyncpg.exceptions.TooManyConnectionsError during analytical batch jobs | Streaming queries hold connections for the full duration of iteration; many concurrent streams exhaust the pool | Reduce concurrency, increase pool_size, or add pool_timeout with backpressure logic to retry on TimeoutError |
AttributeError: 'RowMapping' object has no attribute 'running_total' | Accessing a labeled column with attribute syntax on a RowMapping; it requires dict-style access | Use row["running_total"] or convert with dict(row) before attribute access, or use Python dataclass unpacking as shown in the parent guide |
Advanced Running Total Optimization
Incremental Materialization with INSERT ... SELECT
For reporting tables refreshed on a schedule, avoid recomputing the full window over all historical rows. Instead, compute running totals only over new rows since the last refresh and append using INSERT ... SELECT:
# Async — incremental running total refresh
from sqlalchemy import func, select, insert, text, Table, Column, Integer, Numeric, DateTime, MetaData
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timezone
metadata = MetaData()
RunningTotals = Table(
"running_totals_cache",
metadata,
Column("account_id", Integer, nullable=False),
Column("as_of", DateTime(timezone=True), nullable=False),
Column("running_total", Numeric(18, 4), nullable=False),
)
async def incremental_refresh(session: AsyncSession, since: datetime) -> None:
# Find the last cached total per account to use as baseline
last_cached = (
select(
RunningTotals.c.account_id,
func.max(RunningTotals.c.as_of).label("last_refresh"),
func.max(RunningTotals.c.running_total).label("baseline"),
)
.group_by(RunningTotals.c.account_id)
.cte("last_cached")
)
# Compute incremental window over new rows only
new_rows = select(
Transaction.account_id,
Transaction.created_at,
(
func.coalesce(last_cached.c.baseline, Decimal("0"))
+ func.sum(Transaction.amount).over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0),
)
).label("running_total"),
).join(
last_cached,
Transaction.account_id == last_cached.c.account_id,
isouter=True,
).where(
Transaction.created_at > since
).cte("incremental")
async with session.begin():
await session.execute(
insert(RunningTotals).from_select(
["account_id", "as_of", "running_total"],
select(
new_rows.c.account_id,
new_rows.c.created_at,
new_rows.c.running_total,
),
)
)
This pattern reduces computation from O(all rows) to O(new rows since last refresh), which is critical for accounts with years of historical data. The incremental approach relies on monotonically increasing created_at values — if rows can be backdated, a full recompute is safer.
Using func.sum().filter() for Conditional Running Totals
When you need a running total that only accumulates certain rows — for example, credits but not debits, or confirmed orders but not cancelled ones — use the FILTER (WHERE ...) aggregate syntax rather than post-processing in Python:
# Async — running total of credits only, debits excluded
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def credit_running_total(session: AsyncSession) -> list:
stmt = select(
Transaction.account_id,
Transaction.created_at,
Transaction.amount,
func.sum(Transaction.amount)
.filter(Transaction.amount > 0) # credits only
.over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0),
)
.label("credit_running_total"),
func.sum(Transaction.amount)
.filter(Transaction.amount < 0) # debits only
.over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0),
)
.label("debit_running_total"),
)
result = await session.execute(stmt)
return result.mappings().all()
This compiles to SUM(amount) FILTER (WHERE amount > 0) OVER (...) — a single database pass that produces both columns simultaneously. The alternative — two separate queries or Python-side filtering — doubles I/O or shifts work off the database, both of which are worse under load.
Resetting a Running Total at a Boundary
A less obvious but frequently requested pattern is resetting the running total at a logical boundary within a partition — for example, restarting the cumulative sum at the beginning of each calendar month while still partitioning by account. SQL does not have a native "reset on value change" frame clause, but you can solve this cleanly by composing the window function on a group-keyed expression rather than a raw timestamp column.
# Async — running total that resets at the start of each calendar month
from sqlalchemy import func, select, extract
from sqlalchemy.ext.asyncio import AsyncSession
async def monthly_running_totals(session: AsyncSession) -> list:
# Derive a year-month key to use as a secondary partition dimension
year_month = (
extract("year", Transaction.created_at) * 100
+ extract("month", Transaction.created_at)
).label("year_month")
stmt = select(
Transaction.account_id,
Transaction.created_at,
Transaction.amount,
year_month,
func.sum(Transaction.amount)
.over(
# Partition by BOTH account AND calendar month
partition_by=[
Transaction.account_id,
extract("year", Transaction.created_at),
extract("month", Transaction.created_at),
],
order_by=Transaction.created_at,
rows=(None, 0),
)
.label("monthly_running_total"),
).order_by(Transaction.account_id, Transaction.created_at)
result = await session.execute(stmt)
return result.mappings().all()
By adding the year and month extracts to partition_by, the window resets at every month boundary without any Python-side logic. The database handles the boundary detection entirely through the partition key — each unique (account_id, year, month) tuple starts a fresh frame at UNBOUNDED PRECEDING. This approach scales to any reset boundary: fiscal quarters, billing cycles, or custom date bins, as long as you can express the boundary as a column or derived expression.
Frequently Asked Questions
Why does my running total show the same value on every row?
The order_by argument is missing from .over(). Without it, SUM() OVER (PARTITION BY account_id) computes the full-partition total and stamps it identically on every row in that partition. Add order_by=Transaction.created_at (or whichever column defines the cumulative sequence) to produce a true running total.
What is the difference between rows=(None, 0) and range=(None, 0) in SQLAlchemy?rows=(None, 0) emits ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW — a physical row count frame. range=(None, 0) emits RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW — a logical value frame. With RANGE, if three rows share the same created_at value, they are all included in each other's frames, so the running total jumps by the sum of all tied rows at that timestamp. With ROWS, each row advances the total by exactly its own amount. For financial running totals, rows= is almost always the correct choice.
Can I compute a running total without partition_by?
Yes. Omitting partition_by computes the running total across the entire result set as a single partition. This is correct for a global ledger total, but dangerous on large tables — the entire table is sorted in one partition, which can OOM the sort buffers. Always add partition_by when the logical domain divides naturally (by account, tenant, product, etc.).
How do I test running total queries against an in-memory SQLite database?
SQLite supports window functions from version 3.25.0 (released 2018). Use aiosqlite with create_async_engine("sqlite+aiosqlite:///:memory:"). The rows=(None, 0) syntax compiles identically. The only difference is that SQLite does not support the FILTER clause on window aggregates — use a CASE WHEN inside SUM() as a workaround for test environments.
How do I verify the compiled SQL that SQLAlchemy generates for a running total query?
Print the compiled SQL before executing it to confirm the OVER clause is correct:
from sqlalchemy import func, select
from sqlalchemy.dialects import postgresql
stmt = select(
Transaction.account_id,
func.sum(Transaction.amount)
.over(
partition_by=Transaction.account_id,
order_by=Transaction.created_at,
rows=(None, 0),
)
.label("running_total"),
)
# Print PostgreSQL-dialect SQL without executing
print(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))
This outputs the raw SUM(amount) OVER (PARTITION BY account_id ORDER BY created_at ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) string, which you can paste directly into psql or a database GUI to verify correctness before running it in production. It also works with sqlite.dialect() when testing locally.
Related
- Window Functions and Analytical Queries — parent guide covering all window function types, frame clauses, combining with GROUP BY, and mapping results.
- Common Table Expressions (CTEs) and Recursive Queries — chaining running total CTEs into multi-stage analytical pipelines.
- High-Performance Bulk Inserts and Updates — persisting incremental running total results using
insert().from_select()with minimal roundtrips. - Advanced Query Patterns and Bulk Data Operations — top-level guide to SQLAlchemy 2.0 query patterns for high-throughput workloads.