Window Functions and Analytical Queries in SQLAlchemy 2.0

Window functions give you partition-aware aggregations — rank(), lag(), lead(), running sums — without collapsing rows the way GROUP BY does, and SQLAlchemy 2.0's Advanced Query Patterns and Bulk Data Operations layer exposes the full func.<name>().over(partition_by=, order_by=, rows=) API that compiles directly to standard SQL OVER clauses.

Concept & Execution Model

Traditional GROUP BY collapses a set of rows into one summary row per group. Every column outside the aggregate must appear in the grouping list. Window functions invert this contract: the result set retains every original row, and the computed column carries a value derived from a window of peer rows defined by PARTITION BY, ORDER BY, and an optional frame clause.

SQLAlchemy 2.0 models this through sqlalchemy.func combined with the .over() method. The ORM makes no special distinction between window functions and plain scalar functions — they are first-class expression objects that compose with select(), where(), subqueries, and CTEs identically.

Understanding SQL's logical processing order clarifies when window functions apply. A SQL SELECT is logically processed in this sequence: FROMWHEREGROUP BYHAVING → window functions → DISTINCTORDER BYLIMIT. Because window functions run after WHERE and HAVING, they see the filtered, grouped result. Because they run before DISTINCT and ORDER BY, you cannot reference a window alias in the same statement's WHERE clause — that attempt requires a CTE or subquery wrapper.

Window functions are the right tool when you need: the original rows preserved alongside a derived statistic (e.g., each sale with the customer's rank); running or cumulative metrics (totals, counts, averages over time); offset comparisons between adjacent rows (month-over-month delta); or statistical distribution buckets (quartiles, percentiles). They are not the right tool when you only need summary statistics per group with no row-level output — in that case, GROUP BY with plain aggregates is simpler and potentially faster because it produces fewer output rows.

# Async — SQLAlchemy 2.0 with asyncpg
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from datetime import datetime
from decimal import Decimal


class Base(DeclarativeBase):
    pass


class Order(Base):
    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(primary_key=True)
    customer_id: Mapped[int] = mapped_column(nullable=False)
    product_id: Mapped[int] = mapped_column(nullable=False)
    amount: Mapped[Decimal] = mapped_column(nullable=False)
    created_at: Mapped[datetime] = mapped_column(nullable=False)


engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_pre_ping=True,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)


async def fetch_ranked_orders(session: AsyncSession) -> list:
    stmt = select(
        Order.id,
        Order.customer_id,
        Order.amount,
        func.row_number()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at.desc(),
        )
        .label("recency_rank"),
    )
    result = await session.execute(stmt)
    return result.all()

The key architectural point: in SQLAlchemy 2.0, select() plus await session.execute() is the only correct execution path for both ORM and Core queries. The legacy session.query() API that existed in 1.x is removed — any window function tutorial that calls session.query(func.row_number().over(...)) targets an obsolete interface.

Window Partition and Frame Clause Anatomy Diagram showing rows grouped into two partitions by customer_id, with ORDER BY created_at applied within each partition and a frame highlighting the rows included in a running sum calculation. PARTITION BY customer_id ORDER BY created_at Partition A — id=1 row 1 · $20 r1 sum=$20 row 2 · $35 r2 sum=$55 row 3 · $10 r3 sum=$65 ROWS UNBOUNDED PRECEDING → CURRENT Boundary resets rank & frame Partition B — id=2 row 1 · $50 r1 sum=$50 row 2 · $15 r2 sum=$65 row 3 · $80 r3 sum=$145 ROWS UNBOUNDED PRECEDING → CURRENT rank and sum restart independently Odd rows Even rows Active frame

Query Construction & Async Execution Patterns

Ranking Functions: row_number, rank, dense_rank

func.row_number() assigns a unique sequential integer to every row within its partition with no gaps or ties. func.rank() and func.dense_rank() allow ties but differ in how they handle subsequent ranks: rank() skips numbers after a tie (1, 2, 2, 4), while dense_rank() does not (1, 2, 2, 3).

# Async — ranking orders per customer by amount descending
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def top_orders_per_customer(session: AsyncSession) -> list:
    stmt = select(
        Order.id,
        Order.customer_id,
        Order.amount,
        func.row_number()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount.desc(),
        )
        .label("row_num"),
        func.rank()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount.desc(),
        )
        .label("rank"),
        func.dense_rank()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount.desc(),
        )
        .label("dense_rank"),
    )
    result = await session.execute(stmt)
    return result.mappings().all()

To retrieve only the top-N rows per partition without a subquery, wrap the window expression in a CTE and filter on the rank column. Attempting to put a WHERE clause directly on a window label raises a CompileError — window functions are computed after WHERE and before ORDER BY, so filtering must happen at a higher level.

from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession


async def latest_order_per_customer(session: AsyncSession) -> list:
    # Step 1: CTE wraps the window expression
    ranked_cte = (
        select(
            Order.id,
            Order.customer_id,
            Order.amount,
            Order.created_at,
            func.row_number()
            .over(
                partition_by=Order.customer_id,
                order_by=Order.created_at.desc(),
            )
            .label("rn"),
        )
        .cte("ranked_orders")
    )

    # Step 2: filter on the CTE
    stmt = select(
        ranked_cte.c.id,
        ranked_cte.c.customer_id,
        ranked_cte.c.amount,
        ranked_cte.c.created_at,
    ).where(ranked_cte.c.rn == 1)

    result = await session.execute(stmt)
    return result.all()

Offset Functions: lag and lead

func.lag() and func.lead() access the value from a preceding or following row within the same partition. They accept up to three arguments: the column expression, the offset (default 1), and a default value returned when no row exists at that offset.

# Async — computing month-over-month revenue delta per product
from sqlalchemy import func, select, case
from sqlalchemy.ext.asyncio import AsyncSession


async def revenue_delta(session: AsyncSession) -> list:
    stmt = select(
        Order.product_id,
        Order.created_at,
        Order.amount,
        func.lag(Order.amount, 1, Decimal("0"))
        .over(
            partition_by=Order.product_id,
            order_by=Order.created_at,
        )
        .label("prev_amount"),
        (
            Order.amount
            - func.lag(Order.amount, 1, Decimal("0")).over(
                partition_by=Order.product_id,
                order_by=Order.created_at,
            )
        ).label("delta"),
    )
    result = await session.execute(stmt)
    return result.mappings().all()

The default value on lag() and lead() is essential for the first (or last) row in each partition where no preceding (or following) row exists. Without it, those positions return NULL, which propagates silently through arithmetic expressions.

Aggregate Window Functions: sum, avg, count

Any standard aggregate function (sum, avg, count, min, max) becomes a window function when you append .over(). Unlike ranking functions, aggregate windows do not require order_by — but you almost always want it for running totals and moving averages.

# Async — running total and 3-row moving average, partitioned by customer
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def running_metrics(session: AsyncSession) -> list:
    stmt = select(
        Order.customer_id,
        Order.created_at,
        Order.amount,
        func.sum(Order.amount)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at,
            rows=(None, 0),  # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        )
        .label("running_total"),
        func.avg(Order.amount)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at,
            rows=(-2, 0),  # ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
        )
        .label("moving_avg_3"),
        func.count(Order.id)
        .over(partition_by=Order.customer_id)
        .label("total_orders"),
    )
    result = await session.execute(stmt)
    return result.all()

The rows parameter in .over() maps to ROWS BETWEEN ... AND .... None means UNBOUNDED, and 0 means CURRENT ROW. Positive integers look forward; negative integers look backward. This is the key to building writing window functions for running totals in Python with correct frame semantics.

Distribution Functions: ntile, percent_rank, cume_dist

Beyond ranking and aggregation, SQLAlchemy exposes distribution functions that segment partitions into statistical buckets. func.ntile(n) divides the ordered partition into n roughly equal groups — identical to quartile or decile scoring in data science pipelines. func.percent_rank() returns a value between 0 and 1 representing the relative rank of a row within its partition. func.cume_dist() returns the proportion of rows with values less than or equal to the current row's ORDER BY value.

These functions do not accept a frame clause — they always operate over the full partition defined by PARTITION BY and ORDER BY. Passing rows= or range= to them raises a database-level error.

# Async — order scoring: quartile, percentile rank, cumulative distribution
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def order_distribution_metrics(session: AsyncSession) -> list:
    stmt = select(
        Order.customer_id,
        Order.amount,
        func.ntile(4)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount,
        )
        .label("quartile"),           # 1=bottom 25%, 4=top 25%
        func.percent_rank()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount,
        )
        .label("pct_rank"),           # 0.0 to 1.0
        func.cume_dist()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount,
        )
        .label("cumulative_dist"),    # fraction of rows <= current row
    )
    result = await session.execute(stmt)
    return result.mappings().all()

A common production use of ntile(10) is to segment customers into spending deciles for targeted marketing. Combine it with a CASE expression in the outer select to map bucket numbers to human-readable labels without a Python-side loop. percent_rank() is useful for SLA reporting — "what fraction of orders processed faster than this one?" — and compiles to a single database pass rather than a correlated subquery.

State Management & Session Boundaries

Window functions are purely read-side operations. They do not modify session state, trigger lazy-load side effects, or interact with the identity map. The critical concern is result hydration: how rows are materialized from the database into Python.

For small result sets, .all() materializes everything immediately into a list of Row objects. For large analytical scans that might return millions of rows, use yield_per to stream in chunks and keep the async event loop responsive.

# Async — streaming large window result sets with yield_per
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator


async def stream_ranked_orders(
    session: AsyncSession,
    chunk_size: int = 1000,
) -> AsyncIterator[list]:
    stmt = select(
        Order.customer_id,
        Order.amount,
        Order.created_at,
        func.rank()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount.desc(),
        )
        .label("rank"),
    ).execution_options(yield_per=chunk_size)

    result = await session.execute(stmt)
    async for batch in result.partitions(chunk_size):
        yield batch

One subtlety with yield_per and window functions: the server-side cursor must remain open for the duration of streaming. Keep the session alive inside the streaming loop. Opening a second query on the same session while consuming a streaming cursor is possible with asyncpg but produces undefined ordering — close or fully consume the first cursor first.

Transaction isolation matters here too. If you're running an analytical query inside a long-lived transaction alongside DML, read committed isolation lets DML from concurrent sessions become visible mid-scan. For snapshot consistency across a large analytical result, use REPEATABLE READ or SERIALIZABLE isolation:

# Async — analytical query with explicit snapshot isolation
from sqlalchemy.ext.asyncio import AsyncSession


async def snapshot_analytics(session: AsyncSession) -> list:
    await session.execute(
        __import__("sqlalchemy").text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ")
    )
    stmt = select(
        Order.customer_id,
        Order.amount,
        func.sum(Order.amount)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at,
            rows=(None, 0),
        )
        .label("running_total"),
    )
    result = await session.execute(stmt)
    return result.all()

Advanced Window Function Patterns

Combining Window Functions with GROUP BY

Window functions and GROUP BY can coexist in the same query, but they occupy different logical layers of SQL execution. GROUP BY runs first and collapses rows; window functions then operate on the already-collapsed groups. This is the correct way to compute a percentage-of-total or share-of-segment:

# Async — revenue share per product within each customer's total
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def revenue_share(session: AsyncSession) -> list:
    # First GROUP BY to get per-customer, per-product totals
    grouped = (
        select(
            Order.customer_id,
            Order.product_id,
            func.sum(Order.amount).label("product_total"),
        )
        .group_by(Order.customer_id, Order.product_id)
        .cte("product_totals")
    )

    # Then window function over the grouped result
    stmt = select(
        grouped.c.customer_id,
        grouped.c.product_id,
        grouped.c.product_total,
        (
            grouped.c.product_total
            / func.sum(grouped.c.product_total).over(
                partition_by=grouped.c.customer_id
            )
        ).label("revenue_share"),
        func.rank()
        .over(
            partition_by=grouped.c.customer_id,
            order_by=grouped.c.product_total.desc(),
        )
        .label("product_rank"),
    )

    result = await session.execute(stmt)
    return result.mappings().all()

Mapping Window Results to Python Dataclasses

When you need structured access rather than raw tuple rows, map the window result using a dataclass or TypedDict. SQLAlchemy 2.0's .mappings() returns RowMapping objects that behave like read-only dicts, compatible with **unpacking:

# Async — mapping window results to a typed structure
from dataclasses import dataclass
from decimal import Decimal
from datetime import datetime
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


@dataclass
class RankedOrder:
    customer_id: int
    amount: Decimal
    created_at: datetime
    running_total: Decimal
    rank: int


async def get_ranked_orders(session: AsyncSession) -> list[RankedOrder]:
    stmt = select(
        Order.customer_id,
        Order.amount,
        Order.created_at,
        func.sum(Order.amount)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at,
            rows=(None, 0),
        )
        .label("running_total"),
        func.rank()
        .over(
            partition_by=Order.customer_id,
            order_by=Order.amount.desc(),
        )
        .label("rank"),
    )
    result = await session.execute(stmt)
    return [RankedOrder(**dict(row)) for row in result.mappings()]

FILTER Clause on Aggregate Windows

PostgreSQL supports a FILTER (WHERE ...) clause on aggregate window functions to compute conditional aggregations without subqueries. SQLAlchemy exposes this via .filter() chained before .over():

# Async — conditional window aggregate: sum only positive amounts
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession


async def conditional_running_total(session: AsyncSession) -> list:
    stmt = select(
        Order.customer_id,
        Order.amount,
        Order.created_at,
        func.sum(Order.amount)
        .filter(Order.amount > 0)
        .over(
            partition_by=Order.customer_id,
            order_by=Order.created_at,
            rows=(None, 0),
        )
        .label("positive_running_total"),
    )
    result = await session.execute(stmt)
    return result.all()

This is particularly useful for financial dashboards where refunds (negative amounts) should not reduce cumulative revenue in certain views.

Chaining Window Results into Bulk Operations

Window functions rarely end at reporting. A common production pattern is to compute rankings, filter the top results, and persist them to a summary table in a single database roundtrip. This combines window functions with the high-performance bulk inserts and updates approach:

# Async — rank orders, keep top 3 per customer, bulk-insert to summary table
from sqlalchemy import func, select, insert, Table, Column, Integer, Numeric, MetaData
from sqlalchemy.ext.asyncio import AsyncSession

metadata = MetaData()
TopOrders = Table(
    "top_orders",
    metadata,
    Column("customer_id", Integer, nullable=False),
    Column("amount", Numeric(12, 2), nullable=False),
    Column("rank", Integer, nullable=False),
)


async def refresh_top_orders(session: AsyncSession) -> None:
    ranked_cte = (
        select(
            Order.customer_id,
            Order.amount,
            func.row_number()
            .over(
                partition_by=Order.customer_id,
                order_by=Order.amount.desc(),
            )
            .label("rn"),
        )
        .cte("ranked")
    )

    top3 = (
        select(
            ranked_cte.c.customer_id,
            ranked_cte.c.amount,
            ranked_cte.c.rn.label("rank"),
        )
        .where(ranked_cte.c.rn <= 3)
    )

    async with session.begin():
        await session.execute(TopOrders.delete())
        await session.execute(insert(TopOrders).from_select(
            ["customer_id", "amount", "rank"],
            top3,
        ))

Hybrid Architectures & Migration Strategies

Migrating from SQLAlchemy 1.4 Window Syntax

In SQLAlchemy 1.x, over() was often called as a standalone function: over(func.row_number(), partition_by=..., order_by=...). In 2.0, the method form func.row_number().over(...) is canonical. Both still compile to identical SQL, but the method form integrates more cleanly with type-checking and modern IDE inference.

The other common 1.x pattern used Query.from_self() to filter window results. That method is removed entirely in 2.0. Replace it with the CTE-based approach shown in this guide.

# Sync — SQLAlchemy 1.4 legacy style (do not use in 2.0)
# session.query(func.row_number().over(...)).from_self().filter(...)

# Sync — SQLAlchemy 2.0 equivalent
from sqlalchemy import func, select
from sqlalchemy.orm import Session

def top_per_customer_sync(session: Session) -> list:
    ranked_cte = (
        select(
            Order.customer_id,
            Order.amount,
            func.row_number()
            .over(
                partition_by=Order.customer_id,
                order_by=Order.amount.desc(),
            )
            .label("rn"),
        )
        .cte("ranked")
    )
    stmt = select(
        ranked_cte.c.customer_id,
        ranked_cte.c.amount,
    ).where(ranked_cte.c.rn == 1)
    return session.execute(stmt).all()

Mixing Core and ORM Window Queries

Some analytical pipelines bypass the ORM entirely for performance. SQLAlchemy Core Table objects support all the same window function expressions. When working with common table expressions (CTEs) and recursive queries on raw Core tables, the expression API is identical — only the column access syntax changes from ORM attributes to Table.c.column_name.

For mixed workloads — where some tables are mapped ORM models and others are raw Core tables — you can freely join across both in the same select() statement. The window function .over() call operates on whatever column expression you provide, ORM-mapped or not.

Materialized Views as Window Pre-computation

For dashboards serving millions of users, materializing expensive window computations as PostgreSQL materialized views and querying them via SQLAlchemy is often more practical than recomputing windows on every request. Refresh the view on a schedule or on-write using REFRESH MATERIALIZED VIEW CONCURRENTLY. Map the view to an ORM class with __table__ = Table(...) and treat it as a read-only model.

The complex joins and relationship loading strategies guide covers how to join materialized view-backed models against live tables without triggering unnecessary relationship loading.

Production Pitfalls & Anti-Patterns

  • Default RANGE UNBOUNDED PRECEDING on tied timestamps. When order_by contains duplicate values and you omit rows=, PostgreSQL uses RANGE framing, which must resolve ties by scanning all rows with equal values. On dense time-series with many rows per second, this degrades to O(N²). Fix: always pass rows=(None, 0) for cumulative aggregates.
  • Filtering on a window label in WHERE. SELECT ..., func.row_number().over(...).label("rn") ... WHERE rn = 1 raises ProgrammingError: column "rn" does not exist because window functions are evaluated after WHERE. Fix: wrap the window query in a subquery or CTE, then filter on the outer query.
  • Forgetting order_by in lag() and lead(). Without an ORDER BY, the offset function has no meaningful sequence and returns an arbitrary row. PostgreSQL will not raise an error — it silently produces non-deterministic results. Fix: always supply order_by when using lag(), lead(), first_value(), or last_value().
  • ORM hydration of window results into mapped instances. Calling .scalars() on a multi-column window query that includes computed labels attempts to instantiate ORM objects and fails with ArgumentError or returns only the first column. Fix: use .all() for tuples or .mappings().all() for dict-like rows.
  • Async session outliving the streaming cursor. When using yield_per with an async streaming generator, if the calling coroutine is cancelled mid-iteration, the server-side cursor leaks. Fix: wrap the generator in a try/finally that calls await result.close().
  • Missing index on PARTITION BY + ORDER BY columns. A window function on (customer_id, created_at) with no composite index forces an in-memory sort for every partition. On a 100M-row table, this can OOM the database. Fix: CREATE INDEX CONCURRENTLY ON orders (customer_id, created_at) before deploying.

Frequently Asked Questions

How do I filter on a window function result without a subquery? You cannot filter on a window expression in the same WHERE clause — window functions execute after filtering. Wrap your window query in a CTE using .cte(), then write an outer select().where() that references the CTE's computed column. This is the standard SQL approach and compiles to efficient plan in PostgreSQL.

Does func.rank().over() work identically in sync and async sessions? Yes. The expression is pure Python object construction — no I/O occurs until you call session.execute() or await session.execute(). The generated SQL is identical. The only difference is that async requires await before session.execute() and uses AsyncSession instead of Session.

What is the difference between rows= and range= in .over()?rows= specifies a physical row offset: rows=(-2, 0) means exactly the 2 preceding rows and the current row, regardless of column values. range= specifies a logical value offset: range=(-10, 0) means all rows where the ORDER BY column value is within 10 units of the current row's value. For running totals on time-series data, rows= is almost always correct — range= causes unpredictable grouping when rows share the same timestamp.

Can I use window functions with SQLAlchemy's ORM relationship loading? Window functions return raw tuple rows, not ORM-mapped instances. They do not trigger relationship loading (selectinload, joinedload, etc.). If you need related data alongside window results, join the related table explicitly in the select() statement rather than relying on lazy-loading attributes.

How do I debug slow window function queries? Run EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) on the compiled SQL. Look for WindowAgg nodes preceded by Sort nodes — a Sort indicates the planner couldn't use an index and is materializing the sort in memory or on disk. Adding a composite index on (partition_by_col, order_by_col) typically converts the Sort node into an Index Scan.