Window Functions and Analytical Queries in SQLAlchemy 2.0
Window functions give you partition-aware aggregations — rank(), lag(), lead(), running sums — without collapsing rows the way GROUP BY does, and SQLAlchemy 2.0's Advanced Query Patterns and Bulk Data Operations layer exposes the full func.<name>().over(partition_by=, order_by=, rows=) API that compiles directly to standard SQL OVER clauses.
Concept & Execution Model
Traditional GROUP BY collapses a set of rows into one summary row per group. Every column outside the aggregate must appear in the grouping list. Window functions invert this contract: the result set retains every original row, and the computed column carries a value derived from a window of peer rows defined by PARTITION BY, ORDER BY, and an optional frame clause.
SQLAlchemy 2.0 models this through sqlalchemy.func combined with the .over() method. The ORM makes no special distinction between window functions and plain scalar functions — they are first-class expression objects that compose with select(), where(), subqueries, and CTEs identically.
Understanding SQL's logical processing order clarifies when window functions apply. A SQL SELECT is logically processed in this sequence: FROM → WHERE → GROUP BY → HAVING → window functions → DISTINCT → ORDER BY → LIMIT. Because window functions run after WHERE and HAVING, they see the filtered, grouped result. Because they run before DISTINCT and ORDER BY, you cannot reference a window alias in the same statement's WHERE clause — that attempt requires a CTE or subquery wrapper.
Window functions are the right tool when you need: the original rows preserved alongside a derived statistic (e.g., each sale with the customer's rank); running or cumulative metrics (totals, counts, averages over time); offset comparisons between adjacent rows (month-over-month delta); or statistical distribution buckets (quartiles, percentiles). They are not the right tool when you only need summary statistics per group with no row-level output — in that case, GROUP BY with plain aggregates is simpler and potentially faster because it produces fewer output rows.
# Async — SQLAlchemy 2.0 with asyncpg
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
from decimal import Decimal
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
customer_id: Mapped[int] = mapped_column(nullable=False)
product_id: Mapped[int] = mapped_column(nullable=False)
amount: Mapped[Decimal] = mapped_column(nullable=False)
created_at: Mapped[datetime] = mapped_column(nullable=False)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def fetch_ranked_orders(session: AsyncSession) -> list:
stmt = select(
Order.id,
Order.customer_id,
Order.amount,
func.row_number()
.over(
partition_by=Order.customer_id,
order_by=Order.created_at.desc(),
)
.label("recency_rank"),
)
result = await session.execute(stmt)
return result.all()
The key architectural point: in SQLAlchemy 2.0, select() plus await session.execute() is the only correct execution path for both ORM and Core queries. The legacy session.query() API that existed in 1.x is removed — any window function tutorial that calls session.query(func.row_number().over(...)) targets an obsolete interface.
Query Construction & Async Execution Patterns
Ranking Functions: row_number, rank, dense_rank
func.row_number() assigns a unique sequential integer to every row within its partition with no gaps or ties. func.rank() and func.dense_rank() allow ties but differ in how they handle subsequent ranks: rank() skips numbers after a tie (1, 2, 2, 4), while dense_rank() does not (1, 2, 2, 3).
# Async — ranking orders per customer by amount descending
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def top_orders_per_customer(session: AsyncSession) -> list:
stmt = select(
Order.id,
Order.customer_id,
Order.amount,
func.row_number()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("row_num"),
func.rank()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("rank"),
func.dense_rank()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("dense_rank"),
)
result = await session.execute(stmt)
return result.mappings().all()
To retrieve only the top-N rows per partition without a subquery, wrap the window expression in a CTE and filter on the rank column. Attempting to put a WHERE clause directly on a window label raises a CompileError — window functions are computed after WHERE and before ORDER BY, so filtering must happen at a higher level.
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
async def latest_order_per_customer(session: AsyncSession) -> list:
# Step 1: CTE wraps the window expression
ranked_cte = (
select(
Order.id,
Order.customer_id,
Order.amount,
Order.created_at,
func.row_number()
.over(
partition_by=Order.customer_id,
order_by=Order.created_at.desc(),
)
.label("rn"),
)
.cte("ranked_orders")
)
# Step 2: filter on the CTE
stmt = select(
ranked_cte.c.id,
ranked_cte.c.customer_id,
ranked_cte.c.amount,
ranked_cte.c.created_at,
).where(ranked_cte.c.rn == 1)
result = await session.execute(stmt)
return result.all()
Offset Functions: lag and lead
func.lag() and func.lead() access the value from a preceding or following row within the same partition. They accept up to three arguments: the column expression, the offset (default 1), and a default value returned when no row exists at that offset.
# Async — computing month-over-month revenue delta per product
from sqlalchemy import func, select, case
from sqlalchemy.ext.asyncio import AsyncSession
async def revenue_delta(session: AsyncSession) -> list:
stmt = select(
Order.product_id,
Order.created_at,
Order.amount,
func.lag(Order.amount, 1, Decimal("0"))
.over(
partition_by=Order.product_id,
order_by=Order.created_at,
)
.label("prev_amount"),
(
Order.amount
- func.lag(Order.amount, 1, Decimal("0")).over(
partition_by=Order.product_id,
order_by=Order.created_at,
)
).label("delta"),
)
result = await session.execute(stmt)
return result.mappings().all()
The default value on lag() and lead() is essential for the first (or last) row in each partition where no preceding (or following) row exists. Without it, those positions return NULL, which propagates silently through arithmetic expressions.
Aggregate Window Functions: sum, avg, count
Any standard aggregate function (sum, avg, count, min, max) becomes a window function when you append .over(). Unlike ranking functions, aggregate windows do not require order_by — but you almost always want it for running totals and moving averages.
# Async — running total and 3-row moving average, partitioned by customer
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def running_metrics(session: AsyncSession) -> list:
stmt = select(
Order.customer_id,
Order.created_at,
Order.amount,
func.sum(Order.amount)
.over(
partition_by=Order.customer_id,
order_by=Order.created_at,
rows=(None, 0), # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
.label("running_total"),
func.avg(Order.amount)
.over(
partition_by=Order.customer_id,
order_by=Order.created_at,
rows=(-2, 0), # ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
)
.label("moving_avg_3"),
func.count(Order.id)
.over(partition_by=Order.customer_id)
.label("total_orders"),
)
result = await session.execute(stmt)
return result.all()
The rows parameter in .over() maps to ROWS BETWEEN ... AND .... None means UNBOUNDED, and 0 means CURRENT ROW. Positive integers look forward; negative integers look backward. This is the key to building writing window functions for running totals in Python with correct frame semantics.
Distribution Functions: ntile, percent_rank, cume_dist
Beyond ranking and aggregation, SQLAlchemy exposes distribution functions that segment partitions into statistical buckets. func.ntile(n) divides the ordered partition into n roughly equal groups — identical to quartile or decile scoring in data science pipelines. func.percent_rank() returns a value between 0 and 1 representing the relative rank of a row within its partition. func.cume_dist() returns the proportion of rows with values less than or equal to the current row's ORDER BY value.
These functions do not accept a frame clause — they always operate over the full partition defined by PARTITION BY and ORDER BY. Passing rows= or range= to them raises a database-level error.
# Async — order scoring: quartile, percentile rank, cumulative distribution
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def order_distribution_metrics(session: AsyncSession) -> list:
stmt = select(
Order.customer_id,
Order.amount,
func.ntile(4)
.over(
partition_by=Order.customer_id,
order_by=Order.amount,
)
.label("quartile"), # 1=bottom 25%, 4=top 25%
func.percent_rank()
.over(
partition_by=Order.customer_id,
order_by=Order.amount,
)
.label("pct_rank"), # 0.0 to 1.0
func.cume_dist()
.over(
partition_by=Order.customer_id,
order_by=Order.amount,
)
.label("cumulative_dist"), # fraction of rows <= current row
)
result = await session.execute(stmt)
return result.mappings().all()
A common production use of ntile(10) is to segment customers into spending deciles for targeted marketing. Combine it with a CASE expression in the outer select to map bucket numbers to human-readable labels without a Python-side loop. percent_rank() is useful for SLA reporting — "what fraction of orders processed faster than this one?" — and compiles to a single database pass rather than a correlated subquery.
State Management & Session Boundaries
Window functions are purely read-side operations. They do not modify session state, trigger lazy-load side effects, or interact with the identity map. The critical concern is result hydration: how rows are materialized from the database into Python.
For small result sets, .all() materializes everything immediately into a list of Row objects. For large analytical scans that might return millions of rows, use yield_per to stream in chunks and keep the async event loop responsive.
# Async — streaming large window result sets with yield_per
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator
async def stream_ranked_orders(
session: AsyncSession,
chunk_size: int = 1000,
) -> AsyncIterator[list]:
stmt = select(
Order.customer_id,
Order.amount,
Order.created_at,
func.rank()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("rank"),
).execution_options(yield_per=chunk_size)
result = await session.execute(stmt)
async for batch in result.partitions(chunk_size):
yield batch
One subtlety with yield_per and window functions: the server-side cursor must remain open for the duration of streaming. Keep the session alive inside the streaming loop. Opening a second query on the same session while consuming a streaming cursor is possible with asyncpg but produces undefined ordering — close or fully consume the first cursor first.
Transaction isolation matters here too. If you're running an analytical query inside a long-lived transaction alongside DML, read committed isolation lets DML from concurrent sessions become visible mid-scan. For snapshot consistency across a large analytical result, use REPEATABLE READ or SERIALIZABLE isolation:
# Async — analytical query with explicit snapshot isolation
from sqlalchemy.ext.asyncio import AsyncSession
async def snapshot_analytics(session: AsyncSession) -> list:
await session.execute(
__import__("sqlalchemy").text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
)
stmt = select(
Order.customer_id,
Order.amount,
func.sum(Order.amount)
.over(
partition_by=Order.customer_id,
order_by=Order.created_at,
rows=(None, 0),
)
.label("running_total"),
)
result = await session.execute(stmt)
return result.all()
Advanced Window Function Patterns
Combining Window Functions with GROUP BY
Window functions and GROUP BY can coexist in the same query, but they occupy different logical layers of SQL execution. GROUP BY runs first and collapses rows; window functions then operate on the already-collapsed groups. This is the correct way to compute a percentage-of-total or share-of-segment:
# Async — revenue share per product within each customer's total
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def revenue_share(session: AsyncSession) -> list:
# First GROUP BY to get per-customer, per-product totals
grouped = (
select(
Order.customer_id,
Order.product_id,
func.sum(Order.amount).label("product_total"),
)
.group_by(Order.customer_id, Order.product_id)
.cte("product_totals")
)
# Then window function over the grouped result
stmt = select(
grouped.c.customer_id,
grouped.c.product_id,
grouped.c.product_total,
(
grouped.c.product_total
/ func.sum(grouped.c.product_total).over(
partition_by=grouped.c.customer_id
)
).label("revenue_share"),
func.rank()
.over(
partition_by=grouped.c.customer_id,
order_by=grouped.c.product_total.desc(),
)
.label("product_rank"),
)
result = await session.execute(stmt)
return result.mappings().all()
Mapping Window Results to Python Dataclasses
When you need structured access rather than raw tuple rows, map the window result using a dataclass or TypedDict. SQLAlchemy 2.0's .mappings() returns RowMapping objects that behave like read-only dicts, compatible with **unpacking:
# Async — mapping window results to a typed structure
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
@dataclass
class RankedOrder:
customer_id: int
amount: Decimal
created_at: datetime
running_total: Decimal
rank: int
async def get_ranked_orders(session: AsyncSession) -> list[RankedOrder]:
stmt = select(
Order.customer_id,
Order.amount,
Order.created_at,
func.sum(Order.amount)
.over(
partition_by=Order.customer_id,
order_by=Order.created_at,
rows=(None, 0),
)
.label("running_total"),
func.rank()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("rank"),
)
result = await session.execute(stmt)
return [RankedOrder(**dict(row)) for row in result.mappings()]
FILTER Clause on Aggregate Windows
PostgreSQL supports a FILTER (WHERE ...) clause on aggregate window functions to compute conditional aggregations without subqueries. SQLAlchemy exposes this via .filter() chained before .over():
# Async — conditional window aggregate: sum only positive amounts
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
async def conditional_running_total(session: AsyncSession) -> list:
stmt = select(
Order.customer_id,
Order.amount,
Order.created_at,
func.sum(Order.amount)
.filter(Order.amount > 0)
.over(
partition_by=Order.customer_id,
order_by=Order.created_at,
rows=(None, 0),
)
.label("positive_running_total"),
)
result = await session.execute(stmt)
return result.all()
This is particularly useful for financial dashboards where refunds (negative amounts) should not reduce cumulative revenue in certain views.
Chaining Window Results into Bulk Operations
Window functions rarely end at reporting. A common production pattern is to compute rankings, filter the top results, and persist them to a summary table in a single database roundtrip. This combines window functions with the high-performance bulk inserts and updates approach:
# Async — rank orders, keep top 3 per customer, bulk-insert to summary table
from sqlalchemy import func, select, insert, Table, Column, Integer, Numeric, MetaData
from sqlalchemy.ext.asyncio import AsyncSession
metadata = MetaData()
TopOrders = Table(
"top_orders",
metadata,
Column("customer_id", Integer, nullable=False),
Column("amount", Numeric(12, 2), nullable=False),
Column("rank", Integer, nullable=False),
)
async def refresh_top_orders(session: AsyncSession) -> None:
ranked_cte = (
select(
Order.customer_id,
Order.amount,
func.row_number()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("rn"),
)
.cte("ranked")
)
top3 = (
select(
ranked_cte.c.customer_id,
ranked_cte.c.amount,
ranked_cte.c.rn.label("rank"),
)
.where(ranked_cte.c.rn <= 3)
)
async with session.begin():
await session.execute(TopOrders.delete())
await session.execute(insert(TopOrders).from_select(
["customer_id", "amount", "rank"],
top3,
))
Hybrid Architectures & Migration Strategies
Migrating from SQLAlchemy 1.4 Window Syntax
In SQLAlchemy 1.x, over() was often called as a standalone function: over(func.row_number(), partition_by=..., order_by=...). In 2.0, the method form func.row_number().over(...) is canonical. Both still compile to identical SQL, but the method form integrates more cleanly with type-checking and modern IDE inference.
The other common 1.x pattern used Query.from_self() to filter window results. That method is removed entirely in 2.0. Replace it with the CTE-based approach shown in this guide.
# Sync — SQLAlchemy 1.4 legacy style (do not use in 2.0)
# session.query(func.row_number().over(...)).from_self().filter(...)
# Sync — SQLAlchemy 2.0 equivalent
from sqlalchemy import func, select
from sqlalchemy.orm import Session
def top_per_customer_sync(session: Session) -> list:
ranked_cte = (
select(
Order.customer_id,
Order.amount,
func.row_number()
.over(
partition_by=Order.customer_id,
order_by=Order.amount.desc(),
)
.label("rn"),
)
.cte("ranked")
)
stmt = select(
ranked_cte.c.customer_id,
ranked_cte.c.amount,
).where(ranked_cte.c.rn == 1)
return session.execute(stmt).all()
Mixing Core and ORM Window Queries
Some analytical pipelines bypass the ORM entirely for performance. SQLAlchemy Core Table objects support all the same window function expressions. When working with common table expressions (CTEs) and recursive queries on raw Core tables, the expression API is identical — only the column access syntax changes from ORM attributes to Table.c.column_name.
For mixed workloads — where some tables are mapped ORM models and others are raw Core tables — you can freely join across both in the same select() statement. The window function .over() call operates on whatever column expression you provide, ORM-mapped or not.
Materialized Views as Window Pre-computation
For dashboards serving millions of users, materializing expensive window computations as PostgreSQL materialized views and querying them via SQLAlchemy is often more practical than recomputing windows on every request. Refresh the view on a schedule or on-write using REFRESH MATERIALIZED VIEW CONCURRENTLY. Map the view to an ORM class with __table__ = Table(...) and treat it as a read-only model.
The complex joins and relationship loading strategies guide covers how to join materialized view-backed models against live tables without triggering unnecessary relationship loading.
Production Pitfalls & Anti-Patterns
- Default
RANGE UNBOUNDED PRECEDINGon tied timestamps. Whenorder_bycontains duplicate values and you omitrows=, PostgreSQL usesRANGEframing, which must resolve ties by scanning all rows with equal values. On dense time-series with many rows per second, this degrades to O(N²). Fix: always passrows=(None, 0)for cumulative aggregates. - Filtering on a window label in WHERE.
SELECT ..., func.row_number().over(...).label("rn") ... WHERE rn = 1raisesProgrammingError: column "rn" does not existbecause window functions are evaluated afterWHERE. Fix: wrap the window query in a subquery or CTE, then filter on the outer query. - Forgetting
order_byinlag()andlead(). Without anORDER BY, the offset function has no meaningful sequence and returns an arbitrary row. PostgreSQL will not raise an error — it silently produces non-deterministic results. Fix: always supplyorder_bywhen usinglag(),lead(),first_value(), orlast_value(). - ORM hydration of window results into mapped instances. Calling
.scalars()on a multi-column window query that includes computed labels attempts to instantiate ORM objects and fails withArgumentErroror returns only the first column. Fix: use.all()for tuples or.mappings().all()for dict-like rows. - Async session outliving the streaming cursor. When using
yield_perwith an async streaming generator, if the calling coroutine is cancelled mid-iteration, the server-side cursor leaks. Fix: wrap the generator in atry/finallythat callsawait result.close(). - Missing index on PARTITION BY + ORDER BY columns. A window function on
(customer_id, created_at)with no composite index forces an in-memory sort for every partition. On a 100M-row table, this can OOM the database. Fix:CREATE INDEX CONCURRENTLY ON orders (customer_id, created_at)before deploying.
Frequently Asked Questions
How do I filter on a window function result without a subquery?
You cannot filter on a window expression in the same WHERE clause — window functions execute after filtering. Wrap your window query in a CTE using .cte(), then write an outer select().where() that references the CTE's computed column. This is the standard SQL approach and compiles to efficient plan in PostgreSQL.
Does func.rank().over() work identically in sync and async sessions?
Yes. The expression is pure Python object construction — no I/O occurs until you call session.execute() or await session.execute(). The generated SQL is identical. The only difference is that async requires await before session.execute() and uses AsyncSession instead of Session.
What is the difference between rows= and range= in .over()?rows= specifies a physical row offset: rows=(-2, 0) means exactly the 2 preceding rows and the current row, regardless of column values. range= specifies a logical value offset: range=(-10, 0) means all rows where the ORDER BY column value is within 10 units of the current row's value. For running totals on time-series data, rows= is almost always correct — range= causes unpredictable grouping when rows share the same timestamp.
Can I use window functions with SQLAlchemy's ORM relationship loading?
Window functions return raw tuple rows, not ORM-mapped instances. They do not trigger relationship loading (selectinload, joinedload, etc.). If you need related data alongside window results, join the related table explicitly in the select() statement rather than relying on lazy-loading attributes.
How do I debug slow window function queries?
Run EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) on the compiled SQL. Look for WindowAgg nodes preceded by Sort nodes — a Sort indicates the planner couldn't use an index and is materializing the sort in memory or on disk. Adding a composite index on (partition_by_col, order_by_col) typically converts the Sort node into an Index Scan.
Related
- Advanced Query Patterns and Bulk Data Operations — parent guide covering the full range of SQLAlchemy 2.0 query optimization techniques.
- Writing Window Functions for Running Totals in Python — step-by-step implementation with frame clause details and async error handling.
- Common Table Expressions (CTEs) and Recursive Queries — how to chain window function output into multi-stage CTE pipelines.
- Complex Joins and Relationship Loading Strategies — joining window-computed results against ORM-mapped relationships without N+1 issues.
- High-Performance Bulk Inserts and Updates — persisting ranked window results to summary tables using
insert().from_select().