Advanced Query Patterns and Bulk Data Operations in SQLAlchemy 2.0
Modern Python applications demand a database layer that scales predictably under concurrent load, resolves complex relational graphs without N+1 degradation, and ingests high-velocity datasets without exhausting memory or connection pools — all covered in this guide to advanced query patterns and bulk data operations.
SQLAlchemy 2.0 delivers a fundamentally restructured execution model that unifies Core and ORM constructs under a single select()/execute()/scalars() pipeline, enforces explicit async boundaries, and exposes first-class primitives for bulk inserts, analytical SQL, and streaming iteration. This guide targets Python developers and backend architects who need production-grade patterns for every layer of that pipeline — from relationship loader selection to multi-tenant schema routing to keyset pagination of hundred-million-row tables.
Architectural Foundations
The Unified select()/execute()/scalars() API
SQLAlchemy 2.0's most consequential change is the retirement of session.query() in favour of a single construction/execution pipeline that works identically for Core and ORM. Every query begins with select(), insert(), update(), or delete() from sqlalchemy — these produce statement objects that know nothing about the session or connection. Execution is explicit: you pass the statement to await session.execute(stmt) or await conn.execute(stmt), receive a Result (or CursorResult for Core), and then choose how to consume it.
The Result object is lazy by default. Calling .scalars() extracts the first column of each row (usually an ORM instance), .mappings() produces RowMapping dicts, .all() materialises the full list, and .partitions(N) yields fixed-size batches. For async workloads that must not block the event loop during iteration, await session.stream(stmt) returns an AsyncResult with the same API surface but backed by server-side cursors.
# Full imports shown — runnable Python 3.11+
from __future__ import annotations
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
tenant_id: Mapped[int] = mapped_column(index=True)
status: Mapped[str] = mapped_column(default="active")
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=10,
max_overflow=20,
pool_timeout=30,
)
async def fetch_active_users(tenant: int, limit: int = 100) -> Sequence[User]:
stmt = (
select(User)
.where(User.tenant_id == tenant, User.status == "active")
.order_by(User.id)
.limit(limit)
)
async with AsyncSession(engine) as session:
result = await session.execute(stmt)
return result.scalars().all() # returns list[User]
Result and Row: What You Actually Get Back
Every execute() call returns a CursorResult (sync) or wraps one in an AsyncResult (async). The internal representation is a sequence of Row objects — named tuples keyed by column label. When you call .scalars() on an ORM select(Model), SQLAlchemy unwraps each Row to produce the mapped instance. When you call .mappings(), you get RowMapping objects that behave like read-only dicts with both attribute and key access.
Understanding Row matters for analytical queries, where you select raw expressions alongside ORM columns:
from sqlalchemy import func, select
# Result rows contain (User, int) pairs — not just User instances
stmt = select(User, func.count(Order.id).label("order_count")).join(
Order, Order.user_id == User.id
).group_by(User.id)
async with AsyncSession(engine) as session:
result = await session.execute(stmt)
for user, count in result: # unpack the Row directly
print(f"{user.email}: {count} orders")
Core vs ORM for Bulk Operations
The ORM excels at transactional writes on individual or small groups of objects — it maintains the identity map, fires events, and handles relationship cascades. For ingestion workloads operating on thousands to millions of rows, those features become overhead. Core's conn.execute(insert(Model), list_of_dicts) bypasses ORM instrumentation entirely, batches rows via the driver's executemany protocol, and returns a lightweight CursorResult with rowcount and optional RETURNING data.
The boundary is not arbitrary. Use ORM session.add() / session.add_all() when: object identity matters, you need before_flush / after_flush events, or you're writing fewer than ~500 rows per transaction. Use Core conn.execute(insert(...)) when: throughput is the primary constraint, you want ON CONFLICT DO UPDATE, or you need RETURNING to capture generated primary keys in bulk.
Key Component Deep-Dive: Relationship Loading Strategies and N+1
The N+1 problem emerges whenever code triggers one query for a parent collection and then one additional query per parent to resolve a relationship. In SQLAlchemy 2.0, loader strategies are specified per-statement rather than on the mapping — options(selectinload(...)), options(joinedload(...)), options(subqueryload(...)), and options(raiseload("*")) — giving you fine-grained control that changes per endpoint.
Refer to complex joins and relationship loading strategies for the full treatment; this section establishes the decision rules.
selectinload: The Safe Default for Collections
selectinload issues a second SELECT … WHERE parent_id IN (…) after the primary query resolves. The entire collection population happens in two round trips regardless of how many parents were loaded, making it O(1) in terms of query count. It is safe with yield_per and with AsyncSession because there is no JOIN that could produce duplicate parent rows.
from sqlalchemy import select
from sqlalchemy.orm import selectinload, Mapped, mapped_column, relationship
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column()
amount: Mapped[float] = mapped_column()
items: Mapped[list["OrderItem"]] = relationship(back_populates="order")
class OrderItem(Base):
__tablename__ = "order_items"
id: Mapped[int] = mapped_column(primary_key=True)
order_id: Mapped[int] = mapped_column()
sku: Mapped[str] = mapped_column()
order: Mapped[Order] = relationship(back_populates="items")
async def get_orders_with_items(session: AsyncSession) -> list[Order]:
stmt = (
select(Order)
.options(selectinload(Order.items)) # one IN query after primary SELECT
.order_by(Order.id)
.limit(200)
)
result = await session.execute(stmt)
return result.scalars().all()
joinedload: When to Use It and When Not To
joinedload emits a LEFT OUTER JOIN so that parent rows and their related rows arrive in a single round trip. This is ideal for many-to-one or one-to-one relationships (user → tenant, order → invoice), where each parent matches exactly one related row and there is no cardinality explosion. It is dangerous for one-to-many collections when the parent set is large, because the JOIN multiplies rows — 1000 orders each with 50 items produces 50,000 result rows that SQLAlchemy deduplicates in Python.
The exact warning you will see when this goes wrong is:
SAWarning: SELECT statement has a cartesian product
raiseload("*") is the guard: put it on every relationship except the ones you explicitly load in a given query, and any accidental lazy load raises sqlalchemy.exc.InvalidRequestError immediately rather than silently issuing a query.
from sqlalchemy.orm import joinedload, raiseload
async def get_order_with_invoice(session: AsyncSession, order_id: int) -> Order:
stmt = (
select(Order)
.options(
joinedload(Order.invoice), # one-to-one: safe
raiseload("*"), # block all other lazy loads
)
.where(Order.id == order_id)
)
result = await session.execute(stmt)
return result.scalars().one()
The guide on using selectinload vs joinedload for N+1 prevention benchmarks both strategies across payload sizes.
subqueryload and raiseload
subqueryload embeds the relationship query as a correlated subquery. It is rarely preferable to selectinload in 2.0 — the subquery approach can confuse some query planners and the IN strategy is generally more predictable. raiseload has no performance cost; it simply registers a guard. Use it unconditionally in APIs where you cannot enumerate every relationship in advance.
Key Component Deep-Dive: Bulk Inserts, Updates, and RETURNING
High-throughput data ingestion is where Core's superiority over ORM is most pronounced. The high-performance bulk inserts and updates guide covers driver-level benchmarks in detail; here we establish the patterns and their rationale.
Core executemany via conn.execute(insert(...), rows)
When you pass a list of dicts as the second argument to conn.execute(), SQLAlchemy delegates to the driver's native executemany implementation. With asyncpg, this compiles to a server-side prepared statement executed once per batch — dramatically reducing per-row overhead compared to individual INSERT statements.
from __future__ import annotations
import asyncio
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from myapp.models import Base, Product # Product has id, sku, price, tenant_id
async def bulk_insert_products(engine: AsyncEngine, rows: list[dict]) -> int:
"""Insert rows in chunks of 10 000. Returns total inserted count."""
CHUNK = 10_000
total = 0
async with engine.begin() as conn:
for start in range(0, len(rows), CHUNK):
chunk = rows[start : start + CHUNK]
result = await conn.execute(insert(Product), chunk)
total += result.rowcount
return total
Wrap chunks in a single engine.begin() context to commit all-or-nothing. If idempotency is required, switch to the PostgreSQL-specific dialect:
from sqlalchemy.dialects.postgresql import insert as pg_insert
async def upsert_products(engine: AsyncEngine, rows: list[dict]) -> None:
stmt = pg_insert(Product).values(rows)
upsert = stmt.on_conflict_do_update(
index_elements=["sku", "tenant_id"],
set_={
"price": stmt.excluded.price,
},
)
async with engine.begin() as conn:
await conn.execute(upsert)
ORM bulk_insert_mappings and bulk_update_mappings (2.0 style)
SQLAlchemy 2.0 formalised the 1.x bulk_insert_mappings approach into session.execute(insert(Model), list_of_dicts) via the ORM insert() path — which triggers before_bulk_insert events and respects column defaults while still using executemany. This is the right middle ground when you need ORM events but not per-object identity tracking.
from sqlalchemy import insert, update
from sqlalchemy.ext.asyncio import AsyncSession
async def bulk_create_users(session: AsyncSession, data: list[dict]) -> None:
# ORM-routed executemany: fires mapper events, respects column defaults
await session.execute(insert(User), data)
await session.commit()
async def bulk_deactivate_users(session: AsyncSession, user_ids: list[int]) -> None:
stmt = (
update(User)
.where(User.id.in_(user_ids))
.values(status="inactive")
.execution_options(synchronize_session=False)
)
await session.execute(stmt)
await session.commit()
RETURNING for Generated Keys
PostgreSQL's RETURNING clause lets you capture auto-generated primary keys or computed columns from bulk inserts without a separate SELECT round trip. In SQLAlchemy 2.0, chain .returning(Model.id) onto any DML statement:
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine
from myapp.models import Invoice
async def insert_invoices_returning_ids(
engine: AsyncEngine, rows: list[dict]
) -> list[int]:
stmt = insert(Invoice).returning(Invoice.id)
async with engine.begin() as conn:
result = await conn.execute(stmt, rows)
return result.scalars().all() # list of generated Invoice.id values
The batch-inserting guide at batch inserting millions of rows with SQLAlchemy Core execute covers memory-safe chunking at that scale.
Key Component Deep-Dive: Analytical SQL — Window Functions, CTEs, and Recursive Queries
Analytical workloads push SQLAlchemy's expression language to its full extent. The key insight is that func.* and .over() produce SQL expression objects — they compose with select() like any other clause, which means they participate in subqueries, CTEs, and RETURNING just as scalar columns do.
Window Functions for Running Totals and Rankings
func.<name>().over(partition_by=..., order_by=...) translates directly to FUNCTION() OVER (PARTITION BY … ORDER BY …). The result column appears in the Row alongside any ORM columns you selected.
from __future__ import annotations
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Order, User
async def orders_with_running_total(
session: AsyncSession, tenant_id: int
) -> list[tuple]:
"""Return each order with a running revenue total, partitioned by user."""
stmt = select(
Order.id,
Order.user_id,
Order.amount,
func.sum(Order.amount)
.over(
partition_by=Order.user_id,
order_by=Order.id,
rows=(None, 0), # ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
)
.label("running_total"),
func.row_number()
.over(partition_by=Order.user_id, order_by=Order.id)
.label("row_num"),
).where(Order.tenant_id == tenant_id)
result = await session.execute(stmt)
return result.all() # list of Row(id, user_id, amount, running_total, row_num)
The window functions and analytical queries cluster covers LAG, LEAD, NTILE, and FIRST_VALUE with dialect-specific notes. A worked example for time-series data appears in writing window functions for running totals in Python.
Non-Recursive CTEs for Query Decomposition
A CTE (Common Table Expression) names an intermediate result set that subsequent clauses can reference. In SQLAlchemy 2.0, call .cte(name="...") on any Select to produce a CTE object. You then select() from the CTE alias exactly as if it were a table:
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Order
async def top_customers_by_revenue(session: AsyncSession, n: int = 10) -> list:
# Step 1: revenue per user as a named CTE
revenue_cte = (
select(
Order.user_id,
func.sum(Order.amount).label("total_revenue"),
)
.group_by(Order.user_id)
.cte("revenue_summary")
)
# Step 2: rank and limit using the CTE
stmt = (
select(revenue_cte.c.user_id, revenue_cte.c.total_revenue)
.order_by(revenue_cte.c.total_revenue.desc())
.limit(n)
)
result = await session.execute(stmt)
return result.all()
The common table expressions and recursive queries cluster explains how CTEs interact with DISTINCT ON, lateral joins, and multi-step analytical pipelines.
Recursive CTEs for Hierarchical Data
Recursive CTEs traverse trees and graphs by having the CTE reference its own alias. SQLAlchemy 2.0's cte(recursive=True) combined with .union_all() maps cleanly onto the SQL WITH RECURSIVE clause. The anchor query selects the root nodes; the recursive member joins the CTE alias to find children.
from sqlalchemy import Integer, func, select
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Base
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(primary_key=True)
parent_id: Mapped[int | None] = mapped_column()
name: Mapped[str] = mapped_column()
slug: Mapped[str] = mapped_column()
async def fetch_category_tree(session: AsyncSession) -> list:
# Anchor: root nodes (no parent)
anchor = select(
Category.id,
Category.parent_id,
Category.name,
func.cast(0, Integer).label("depth"),
).where(Category.parent_id.is_(None))
tree_cte = anchor.cte(name="category_tree", recursive=True)
cte_alias = tree_cte.alias()
# Recursive member: children joined to CTE alias
recursive_part = select(
Category.id,
Category.parent_id,
Category.name,
(cte_alias.c.depth + 1).label("depth"),
).join(cte_alias, Category.parent_id == cte_alias.c.id)
full_tree = tree_cte.union_all(recursive_part)
stmt = select(full_tree).order_by(full_tree.c.depth, full_tree.c.id)
result = await session.execute(stmt)
return result.all()
The full implementation guide — including cycle detection with CYCLE clauses and depth guards for unbounded graphs — lives in implementing recursive CTEs for hierarchical data in SQLAlchemy.
Advanced Patterns and Production Configuration
yield_per and stream_results: Choosing Thresholds
yield_per instructs SQLAlchemy to fetch rows from the cursor in fixed-size batches rather than materialising the entire result set. It is applied as an execution option:
stmt = select(User).order_by(User.id).execution_options(yield_per=5000)
async with session.stream(stmt) as result:
async for partition in result.partitions(5000):
process(partition)
The right batch size depends on row width, available heap, and downstream processing speed. For narrow rows (4-6 columns, scalar values), 5000–10 000 rows per partition is a reasonable starting point. For wide rows with large text or JSON columns, drop to 500–1000. Setting yield_per too high with joinedload is a correctness hazard (see Pitfalls), not just a memory one.
stream_results=True is a lower-level flag that instructs the driver to use a server-side cursor on backends that support it (asyncpg, psycopg). It is set automatically when you call session.stream(), but you can set it explicitly on a Core connection for fine-grained control:
async with engine.connect() as conn:
result = await conn.execution_options(stream_results=True).execute(stmt)
The complete analysis of thresholds and memory profiles is in using yield_per to stream millions of rows in async.
Keyset Pagination for Deep Result Sets
OFFSET-based pagination degrades linearly with depth: OFFSET 500000 LIMIT 100 requires the database to scan and discard half a million rows. Keyset pagination (also called cursor pagination) solves this by filtering on the last-seen key values:
from sqlalchemy import and_, select, tuple_
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Order
async def page_orders(
session: AsyncSession,
last_id: int | None = None,
page_size: int = 100,
) -> list[Order]:
stmt = select(Order).order_by(Order.id).limit(page_size)
if last_id is not None:
stmt = stmt.where(Order.id > last_id)
result = await session.execute(stmt)
return result.scalars().all()
For composite sort keys (e.g., created_at DESC, id ASC), use tuple_() for row-value comparison, which PostgreSQL evaluates with a single index scan. Full patterns including bidirectional pagination are covered in paginating large result sets with keyset pagination.
Multi-Tenant Schema Routing
PostgreSQL's schema-per-tenant isolation model routes each request to a different search_path. SQLAlchemy 2.0 supports this via execution_options(schema_translate_map=...), which rewrites table references at compile time:
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.models import Product
async def get_tenant_products(session: AsyncSession, tenant_slug: str) -> list[Product]:
# Rewrite __main__ schema to the tenant's actual schema at execute time
stmt = select(Product).execution_options(
schema_translate_map={"__main__": tenant_slug}
)
result = await session.execute(stmt)
return result.scalars().all()
The schema_translate_map is applied per-statement, so you can mix tenant and shared-schema tables in a single query. Pair this with a FastAPI dependency that extracts the tenant slug from the JWT and wraps the session. The full guide is in dynamic schema and multi-tenant routing.
Query Plan Caching
SQLAlchemy 2.0 introduced a compiled query cache that stores the string SQL and bound parameter positions keyed by the statement object's structure. This avoids re-compiling identical queries on every call. The cache is enabled by default with query_cache_size=500 on the engine. Queries that use literal_column() or Python-side string interpolation bypass the cache; always use bound parameters instead.
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=10,
max_overflow=20,
query_cache_size=1000, # increase for APIs with many distinct query shapes
)
For asyncpg specifically, the driver also maintains a prepared statement cache per connection. When using PgBouncer in transaction-pooling mode, you must set statement_cache_size=0 in the connect_args to disable it — see the async engines and connection pooling guide for the full pgbouncer configuration.
Connection Pool Tuning for Bulk Workloads
Bulk insert jobs and streaming queries hold connections for longer than typical OLTP requests. Size the pool accordingly:
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/mydb",
pool_size=5, # baseline connections kept alive
max_overflow=10, # burst headroom for concurrent bulk jobs
pool_timeout=60, # seconds to wait for a connection (raise TimeoutError)
pool_recycle=3600, # recycle connections older than 1 hour
pool_pre_ping=True, # validate connection before checkout
)
For AWS RDS and other managed services, set pool_size to match the instance's max_connections divided by the number of application workers, leaving headroom for administrative connections.
Common Pitfalls and Anti-Patterns
SAWarning: SELECT statement has a cartesian product— Root cause:joinedloadapplied to a one-to-many relationship where the parent set is large, producing M×N rows in the result before deduplication. Fix: replace withselectinloadfor collections. Usejoinedloadonly for many-to-one or one-to-one sides.- Silent N+1 from lazy loading — Root cause: accessing a relationship attribute outside the session context (or without a loader option) triggers an implicit
SELECTper parent object. Fix: always specify loader strategies on every statement —selectinload,joinedload, orraiseload("*")to catch the remaining cases immediately rather than silently. sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called— Root cause: code that accesses a lazy-loaded relationship attribute inside anasync defcontext without the greenlet executor thatAsyncSessionrequires. Fix: always eagerly load relationships used afterawaitpoints, or access them within a sync event (run_sync) if lazy loading cannot be avoided.yield_perwithjoinedloadon a collection — Root cause:yield_perslices the cursor into fixed-size partitions, butjoinedloadon a collection can split a single parent's rows across partition boundaries, leaving the ORM unable to reconstruct the full collection for that object. The query silently returns incomplete data. Fix: useselectinloadwheneveryield_peris active.- Memory blowup from
.all()on large result sets — Root cause: callingresult.scalars().all()materialises the entire result as a Python list before your code touches a single row. Fix: replace withsession.stream(stmt)and iterate via.partitions(N), keeping memory usage bounded to the partition size. - Using
session.add_all(objects)for bulk ingestion — Root cause: each call toadd()registers the object in the identity map, firesinitevents, and will issue individualINSERTstatements unless the session flushes in bulk. Even with bulk flushing, the ORM overhead is significant. Fix: useconn.execute(insert(Model), list_of_dicts)for write-throughput workloads exceeding ~500 rows per transaction. - Forgetting
synchronize_session=Falseon bulk UPDATE/DELETE — Root cause: by default, bulkupdate()anddelete()attempt to synchronise the in-memory session state with the database changes, which requires fetching affected primary keys. For large updates this is expensive. Fix: set.execution_options(synchronize_session=False)and expire or refresh affected objects explicitly if needed.
Frequently Asked Questions
How does the 2.0 select() API differ from the legacy session.query() approach?session.query() was a fluent ORM-only API that compiled queries tightly coupled to the session. select() is a dialect-agnostic SQL expression that compiles independently of execution context — you pass it to any session.execute(), conn.execute(), or await session.stream(). This separation enables the same statement object to be executed on a sync connection, an async connection, or inspected as SQL without touching the database. The query() API is still present in 2.0 for backwards compatibility but raises RemovedIn20Warning under SQLALCHEMY_WARN_20=1.
When should I use Core conn.execute() vs ORM session.execute() for inserts?
Use conn.execute(insert(Model), rows) (Core path) when throughput is paramount and you do not need the identity map, mapper events, or relationship cascades. Use session.execute(insert(Model), rows) (ORM-routed path) when you want mapper-level before_bulk_insert / after_bulk_insert events to fire and column defaults to be applied, but you still need executemany performance. Use session.add() / session.add_all() only for individual objects or very small batches where per-object identity tracking has value.
What is the safe batch size for yield_per streaming?
There is no universal answer — it depends on row width, JVM-side processing time, and available memory per worker. As a starting heuristic: 5000 rows per partition for narrow rows (integer/float/short-varchar columns), 500–1000 for wide rows containing JSON, TEXT, or BYTEA columns. Measure actual RSS growth during a representative streaming run and halve the batch size if it climbs beyond your pod's memory limit.
Can I mix window functions with ORM-mapped models in a single query?
Yes. select(User, func.rank().over(...).label("rank")) returns Row objects that unpack as (User_instance, int). The ORM reconstructs the User object from the mapped columns; the window function result is a plain Python value in the same row. Be aware that adding a window function changes the query semantics — the ORM cannot use the result for identity-map deduplication on the window column.
How do I route different tenants to different PostgreSQL schemas without changing my model definitions?
Use execution_options(schema_translate_map={"__main__": tenant_slug}) per statement or per session. Map __main__ (or any sentinel string you set as __table_args__ = {"schema": "__main__"} on your models) to the runtime tenant schema. The rewrite happens at compile time, before the SQL reaches the driver. See dynamic schema and multi-tenant routing for middleware integration patterns.
Why does my recursive CTE return an infinite loop or raise a database error?
Recursive CTEs without a termination condition will loop until the database's max_recursive_iterations limit is hit (PostgreSQL default: none — it will run until the query times out or exhausts stack). Always add a WHERE depth < N guard in the recursive member, or use PostgreSQL 14+'s CYCLE clause to detect repeated node visits. SQLAlchemy does not add any automatic depth limiting; you own the termination logic.
Related
- Complex Joins and Relationship Loading Strategies — In-depth coverage of selectinload, joinedload, subqueryload, and raiseload with N+1 diagnostics.
- High-Performance Bulk Inserts and Updates — Core executemany, ORM bulk paths, RETURNING, and driver-level benchmarks.
- Window Functions and Analytical Queries — LAG, LEAD, NTILE, and FIRST_VALUE with dialect notes.
- Common Table Expressions (CTEs) and Recursive Queries — Non-recursive and recursive CTEs, cycle detection, lateral joins.
- Streaming Large Result Sets with yield_per — Server-side cursors, partition thresholds, and keyset pagination.
- Dynamic Schema and Multi-Tenant Routing — schema_translate_map, read-replica routing, and per-request schema switching.
- Async Engines, Dialects, and Connection Pooling — Pool sizing, driver selection, and PgBouncer compatibility for the execution layer that all query patterns depend on.