High-Performance Bulk Inserts and Updates in SQLAlchemy 2.0
The fastest bulk insert path in SQLAlchemy 2.0 is session.execute(insert(Model), list_of_dicts) for ORM workflows and conn.execute(insert(table), list_of_dicts) at the Core level — both engage the driver's native executemany protocol and avoid unit-of-work overhead. This guide covers every production-relevant technique within the Advanced Query Patterns and Bulk Data Operations topic area: Core insert().values(), ORM bulk execute, insertmanyvalues, RETURNING, ON CONFLICT upsert, bulk UPDATE/DELETE with synchronize_session, and asyncpg COPY for extreme throughput.
Concept & Execution Model
SQLAlchemy 2.0 provides a unified Connection.execute() / AsyncConnection.execute() surface that accepts both single-row and multi-row parameter sequences. When you pass a list of dictionaries, the engine internally selects between three execution paths:
Standard executemany — the DBAPI receives one prepared statement and N parameter tuples. PostgreSQL parses the statement once; asyncpg encodes parameters in binary format using a sequence of Bind/Execute messages, then flushes with a single Sync. No SQL string is re-parsed per row. The prepared statement is cached on the server for the connection's lifetime.
insertmanyvalues — SQLAlchemy 2.0's own optimization layer. When the dialect supports RETURNING (PostgreSQL natively, SQLite ≥ 3.35), and the insert statement includes .returning(), the engine rewrites the statement as batched multi-value INSERT ... VALUES (...), (...) RETURNING ..., grouping up to insertmanyvalues_page_size rows (default 1000) per round-trip. This recovers generated primary keys without N individual SELECT queries and significantly outperforms single-row executemany for workloads that need the returned values.
Driver-level COPY — bypasses SQL entirely. asyncpg exposes copy_records_to_table() which streams rows over the PostgreSQL wire protocol's efficient COPY FROM STDIN path in binary encoding. Throughput is 3–6× higher than insertmanyvalues for large datasets, at the cost of bypassing triggers and Python-side event hooks.
The ORM's session.execute(insert(Model), list_of_dicts) uses path one or two depending on dialect and whether RETURNING is active. It skips the unit-of-work (no dirty tracking, no identity map synchronization) but still applies column defaults and Python-side validators declared on the mapper. By contrast, session.add_all(instances) takes the full ORM path — instantiating mapped objects, intercepting __set__ on every instrumented attribute, and flushing via the unit-of-work — roughly 10–20× slower for pure insert workloads at scale.
import asyncio
from decimal import Decimal
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import Integer, String, Numeric, insert, Table, MetaData, Column
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
customer_id: Mapped[int] = mapped_column(Integer, nullable=False)
amount: Mapped[Decimal] = mapped_column(Numeric(12, 4), nullable=False)
status: Mapped[str] = mapped_column(String(32), default="pending")
metadata = MetaData()
orders_table = Table(
"orders", metadata,
Column("id", Integer, primary_key=True),
Column("customer_id", Integer, nullable=False),
Column("amount", Numeric(12, 4), nullable=False),
Column("status", String(32), nullable=False),
)
engine = create_async_engine(
"postgresql+asyncpg://app:secret@db:5432/orders_db",
pool_size=10,
max_overflow=20,
pool_pre_ping=True,
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
Query Construction & Async Execution Patterns
Core insert().values() with Chunking
Core execution bypasses both the ORM identity map and SQLAlchemy's Python-side column defaults. It is the right choice when you control data shaping upstream and want to maximize raw throughput:
# Async — Core bulk insert with explicit chunking
import asyncio
from itertools import islice
from typing import Any, Iterator
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine
def _chunks(data: Iterator[dict], size: int) -> Iterator[list[dict]]:
it = iter(data)
while True:
chunk = list(islice(it, size))
if not chunk:
break
yield chunk
async def bulk_insert_orders(
engine: AsyncEngine,
rows: Iterator[dict[str, Any]],
chunk_size: int = 2_000,
) -> None:
stmt = insert(orders_table)
async with engine.begin() as conn: # auto-commits on clean exit
for chunk in _chunks(rows, chunk_size):
await conn.execute(stmt, chunk)
engine.begin() is preferred over engine.connect() because it issues COMMIT automatically on clean exit and ROLLBACK on exception. This eliminates the most common bulk insert bug: rows inserted but silently rolled back because conn.commit() was omitted.
ORM Bulk Execute via session.execute(insert(), [dicts])
When you need Python-side column defaults, validators, or @validates write paths but not full ORM tracking:
# Async — ORM bulk insert (skips unit-of-work, applies column defaults)
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
async def bulk_insert_orders_orm(
session: AsyncSession,
rows: list[dict[str, Any]],
) -> None:
# Activates insertmanyvalues on PostgreSQL automatically
await session.execute(insert(Order), rows)
await session.commit()
This is the ORM bulk INSERT path introduced in SQLAlchemy 2.0. On PostgreSQL it triggers insertmanyvalues, rewriting the insert as batched INSERT ... VALUES (...) RETURNING id and mapping the returned PKs without a subsequent SELECT. The result is significantly better throughput than add_all() while keeping ORM-level column processing.
insertmanyvalues — Tuning Page Size
The default page size of 1000 rows per rewritten VALUES clause is conservative. On wide rows (many columns, large text fields) reduce it; on narrow rows with fast network you can increase it:
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
async def bulk_insert_tuned(session: AsyncSession, rows: list[dict]) -> None:
stmt = insert(Order).execution_options(insertmanyvalues_page_size=500)
await session.execute(stmt, rows)
await session.commit()
Monitor pg_stat_statements to confirm the rewritten statement shape. The query column should show multi-value INSERT ... VALUES ($1,$2,$3),($4,$5,$6)... rather than single-row forms. For a detailed breakdown of benchmarking methodology across all modes, see the benchmarking Core executemany bulk insert performance guide.
RETURNING for Post-Insert ID Capture
Avoid the separate SELECT round-trip after insert by appending .returning():
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine
async def insert_and_capture_ids(
engine: AsyncEngine,
rows: list[dict[str, Any]],
) -> list[int]:
stmt = insert(orders_table).returning(orders_table.c.id)
async with engine.begin() as conn:
result = await conn.execute(stmt, rows)
return [row.id for row in result]
RETURNING combined with insertmanyvalues is a particularly strong pattern: all generated PKs arrive in the same network round-trips as the insert itself, at the cost of slightly higher per-row result parsing on the Python side. For very large result sets, pair with yield_per to avoid materializing all IDs at once.
State Management & Session Boundaries
Transaction Scope and WAL Pressure
Each engine.begin() block maps to a single database transaction. Holding one transaction open across millions of rows exhausts PostgreSQL's Write-Ahead Log (WAL) and triggers autovacuum contention on tables that also receive deletes and updates concurrently. Chunk your work so that each transaction covers 1,000–5,000 rows for narrow tables, or 500–1,000 rows for tables with many indexes:
async def chunked_transactional_insert(
engine: AsyncEngine,
rows: Iterator[dict[str, Any]],
chunk_size: int = 2_000,
) -> None:
stmt = insert(orders_table)
for chunk in _chunks(rows, chunk_size):
async with engine.begin() as conn: # separate transaction per chunk
await conn.execute(stmt, chunk)
Use pg_stat_bgwriter to observe checkpoint frequency during load. If checkpoints_req climbs steeply (checkpoints triggered by WAL volume rather than timeout), reduce chunk size or increase max_wal_size in postgresql.conf.
Identity Map Contamination
When you use session.execute(insert(Order), rows) for bulk insert and then load those rows in the same session, the identity map may serve stale ORM instances that lack server-generated values. Call session.expire_all() or close and reopen the session before subsequent reads. Better yet, use a dedicated short-lived AsyncSession scoped to the bulk operation:
async def isolated_bulk_insert(rows: list[dict]) -> None:
async with SessionLocal() as session:
await session.execute(insert(Order), rows)
await session.commit()
# identity map discarded — subsequent reads open a fresh session
async def read_inserted_orders(customer_id: int) -> list[Order]:
from sqlalchemy import select
async with SessionLocal() as session:
result = await session.execute(
select(Order).where(Order.customer_id == customer_id)
)
return result.scalars().all()
This pattern avoids the subtlest class of bugs in bulk workflows: reading ORM instances from a session that performed the insert and getting pre-commit attribute states, because expire_on_commit=True expires attributes but the data is already in the identity map.
Async Event Loop Safety
Bulk inserts in async contexts must never block the event loop. Two common mistakes block it inadvertently:
- Synchronous file or data source reads inside the async loop — wrap these in
asyncio.to_thread()or pre-compute the chunk outside the async context. - CPU-intensive data transformation inside
awaitcalls — validation, type coercion, or encoding heavy payloads synchronously inside the coroutine starves other concurrent tasks. Move heavy CPU work to aThreadPoolExecutoror preprocess before entering the async context manager.
Advanced Bulk Patterns
ON CONFLICT Upsert
PostgreSQL's INSERT ... ON CONFLICT DO UPDATE (upsert) is the canonical way to handle idempotent data ingestion. SQLAlchemy exposes it through postgresql.insert():
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncEngine
async def upsert_orders(engine: AsyncEngine, rows: list[dict[str, Any]]) -> None:
stmt = pg_insert(orders_table).values(rows)
upsert = stmt.on_conflict_do_update(
index_elements=["id"],
set_={
"amount": stmt.excluded.amount,
"status": stmt.excluded.status,
},
where=stmt.excluded.amount > orders_table.c.amount,
)
async with engine.begin() as conn:
await conn.execute(upsert)
stmt.excluded references the row that was rejected due to conflict — it maps directly to PostgreSQL's EXCLUDED pseudo-table. The where= clause filters which conflicts actually trigger an update, implementing "insert or upgrade" semantics without a separate SELECT. To skip conflicting rows entirely, use .on_conflict_do_nothing() instead.
For multi-column unique constraints, specify all constraint columns in index_elements:
upsert = stmt.on_conflict_do_update(
index_elements=["customer_id", "order_date"], # composite unique index
set_={"amount": stmt.excluded.amount},
)
Bulk UPDATE and DELETE with synchronize_session
SQLAlchemy's ORM update() and delete() constructs accept a synchronize_session parameter that controls how the session's identity map stays consistent after a bulk operation modifies rows that may be cached in memory:
from sqlalchemy import update, delete
from sqlalchemy.ext.asyncio import AsyncSession
async def cancel_pending_orders(
session: AsyncSession, customer_id: int
) -> int:
stmt = (
update(Order)
.where(Order.customer_id == customer_id, Order.status == "pending")
.values(status="cancelled")
.execution_options(synchronize_session="fetch")
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
async def purge_old_orders(
session: AsyncSession, cutoff_id: int
) -> int:
stmt = (
delete(Order)
.where(Order.id < cutoff_id)
.execution_options(synchronize_session=False) # skip identity map sync
)
result = await session.execute(stmt)
await session.commit()
return result.rowcount
synchronize_session="fetch" issues a SELECT before the UPDATE/DELETE, then reconciles in-memory objects — safe but adds one round-trip. synchronize_session=False skips identity map reconciliation entirely and is correct when no instances from the affected rows are currently in the session. Use it for large purge operations where the session is clean. The default "evaluate" applies Python-side predicate evaluation to in-memory objects, which fails silently for complex WHERE clauses that SQLAlchemy cannot evaluate in Python.
asyncpg COPY for Extreme Throughput
For pipelines that must sustain hundreds of thousands of rows per second, asyncpg's native copy_records_to_table() bypasses SQL entirely and streams rows via the PostgreSQL binary COPY protocol. SQLAlchemy does not abstract this — you access the raw asyncpg connection through conn.get_raw_connection():
import asyncpg
from decimal import Decimal
from sqlalchemy.ext.asyncio import AsyncEngine
async def copy_orders_to_table(
engine: AsyncEngine, records: list[tuple]
) -> None:
"""
Each tuple must match column order: (customer_id, amount, status).
Omit id — PostgreSQL generates it via SERIAL/GENERATED ALWAYS.
"""
async with engine.connect() as conn:
raw: asyncpg.Connection = await conn.get_raw_connection()
await raw.copy_records_to_table(
"orders",
records=records,
columns=["customer_id", "amount", "status"],
timeout=60.0,
)
await conn.commit()
COPY bypasses all SQLAlchemy event hooks, triggers, and row-by-row constraint checking during transfer. Constraints are enforced at commit time. Verify that all foreign-key, check, and unique constraints will pass before committing — a single FK violation rolls back the entire batch with no partial-success option. For production memory-safe patterns and driver tuning around this path, see batch inserting millions of rows with SQLAlchemy core.execute.
Async RETURNING with Server-Side Cursors
When inserting and immediately processing returned rows — for example, publishing events keyed on generated IDs — combine RETURNING with yield_per to avoid materializing all IDs in Python at once:
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession
from typing import AsyncIterator
async def insert_and_stream_ids(
session: AsyncSession, rows: list[dict]
) -> AsyncIterator[int]:
stmt = (
insert(Order)
.returning(Order.id)
.execution_options(yield_per=500)
)
async with session.begin():
result = await session.execute(stmt, rows)
async for row in result:
yield row.id
This pattern is most useful when downstream processing (event publishing, cache warming, downstream service notification) can proceed concurrently with row insertion and you want to avoid buffering millions of IDs in memory.
Hybrid Architectures & Migration Strategies
Separating the Write Path from the Read Path
A reliable production pattern separates bulk writes (Core) from business-logic reads (ORM). The ORM's relationship loading, lazy-load resolution, and identity map are valuable for navigating entity graphs, but irrelevant and expensive during mass ingestion. Structure services so that bulk data operations use the Core path, while the ORM's AsyncSession handles individual entity lifecycles and relationship traversal:
# Core write path — maximum throughput, no ORM overhead
async def ingest_orders(engine: AsyncEngine, payload: list[dict]) -> None:
async with engine.begin() as conn:
await conn.execute(insert(orders_table), payload)
# ORM read path — relationships, business logic, query composition
async def get_customer_orders(session: AsyncSession, customer_id: int) -> list[Order]:
from sqlalchemy import select
from sqlalchemy.orm import selectinload
result = await session.execute(
select(Order)
.where(Order.customer_id == customer_id)
.options(selectinload(Order.line_items))
)
return result.scalars().all()
For join-heavy read patterns alongside bulk writes, the complex joins and relationship loading strategies guide covers selectinload vs joinedload trade-offs specifically in async contexts, including how to prevent N+1 queries that counteract bulk insert gains.
CTE-Driven Bulk Updates
When bulk updates depend on computed or staged data, Common Table Expressions allow you to push the computation to the database layer rather than performing it in Python and then issuing row-by-row updates. This pattern composes cleanly with the update patterns above:
from sqlalchemy import select, update, func, Table
from sqlalchemy.ext.asyncio import AsyncEngine
async def promote_high_value_orders(
engine: AsyncEngine,
staging_table: Table,
target_table: Table,
) -> None:
"""Promote orders whose staged amount exceeds a threshold."""
qualified_cte = (
select(staging_table)
.where(staging_table.c.amount > 1000)
.cte("qualified_orders")
)
stmt = (
update(target_table)
.where(target_table.c.id == qualified_cte.c.order_id)
.values(status="vip", processed_at=func.now())
)
async with engine.begin() as conn:
await conn.execute(stmt)
For deeper CTE patterns including recursive dependency resolution before bulk updates, see Common Table Expressions (CTEs) and Recursive Queries.
Migrating from SQLAlchemy 1.4 Bulk Methods
SQLAlchemy 1.4 exposed session.bulk_insert_mappings(), session.bulk_update_mappings(), and session.bulk_save_objects() — all removed in 2.0. The migration is mechanical:
| 1.4 pattern | 2.0 equivalent |
|---|---|
session.bulk_insert_mappings(Order, rows) | session.execute(insert(Order), rows) |
session.bulk_update_mappings(Order, rows) | session.execute(update(Order), rows) |
session.bulk_save_objects(instances) | session.add_all(instances) (then flush) |
conn.execute(table.insert(), rows) | conn.execute(insert(table), rows) (same, but 2.0 style) |
The 2.0 equivalents also support RETURNING and insertmanyvalues, making them strictly superior in both API clarity and achievable throughput.
Production Pitfalls & Anti-Patterns
session.add_all()at scale: EachOrderinstance allocates attribute instrumentation state. At 100,000 rows this adds roughly 300–600 MB of Python object overhead and produces 15–25× slower throughput compared tosession.execute(insert(Order), rows). Reserveadd_all()for small batches where relationship event hooks are needed.- Single unbounded transaction: Wrapping a million-row insert in one
engine.begin()block writes a single multi-GB WAL segment. Autovacuum cannot reclaim dead tuples until the transaction commits, and any failure rolls back everything. Chunk to 1,000–5,000 rows per transaction. - Missing
RETURNINGwith a follow-upSELECT WHERE id IN (...): Post-insert hydration via a separateSELECT WHERE id IN (...)is fragile under concurrent inserts (another session may insert rows with the same IDs in edge cases) and adds a round-trip. Use.returning()or rely oninsertmanyvaluesto capture PKs inline. - Unsorted rows before upsert: Concurrent upserts targeting overlapping rows in arbitrary order produce deadlocks. Sort input rows by primary key (or the unique constraint columns) before chunking.
echo=Truein production: At 500,000 rows, logging every bind parameter to stdout serializes I/O and can crash the process. Disableechoor use a filtered logging handler that suppresses parameter output.- Raw
asyncpgCOPY without pre-validation: COPY defers constraint checking to commit. A single FK violation rolls back the entire batch. Validate referential integrity in application code before callingcopy_records_to_table(). - Ignoring
synchronize_sessionsemantics: Using the default"evaluate"mode for bulk deletes with complexWHEREclauses can leave the identity map in an incorrect state, causing subsequent reads from the same session to return deleted objects. Always explicitly choose"fetch"orFalse. - Reusing an
AsyncSessionacross bulk and transactional code: A session that performed a bulkinsert()withoutexpire_on_commit=Falsewill expire all attributes on commit. Any subsequent attribute access on a fetched object triggers a lazy load — which is forbidden in async SQLAlchemy without explicit greenlet context. Use dedicated sessions for bulk operations and separate sessions for transactional reads. - Not monitoring
pg_stat_activityduring load: Long-running bulk transactions appear inpg_stat_activitywithwait_event = 'Lock'when they clash with autovacuum or other writers. Always check lock wait states during staging load tests to identify contention before reaching production scale.
Choosing the Right Strategy: Decision Summary
| Scenario | Recommended strategy |
|---|---|
| < 5 000 rows, need ORM events/validators | session.add_all() |
| Any volume, need generated PKs | session.execute(insert(Model), rows) with insertmanyvalues |
| > 10 000 rows, no ORM events needed | conn.execute(insert(table), rows) (Core) |
| Need idempotent ingest | insert().on_conflict_do_update() |
| > 200 000 rows/sec, pre-validated data | asyncpg copy_records_to_table() |
| Bulk UPDATE/DELETE with session in scope | update()/delete() with explicit synchronize_session |
Frequently Asked Questions
When should I use ORM session.execute(insert(Model), rows) vs Core conn.execute(insert(table), rows)?
Use the ORM path when you have Python-side column defaults, @validates validators, or need RETURNING mapped back to ORM attributes. Use the Core path when you want maximum throughput with pre-validated data and have no need for ORM event hooks. The performance gap is small for batches under 50,000 rows; above 100,000 rows, ORM instrumentation overhead accumulates and Core becomes noticeably faster.
Does insertmanyvalues work with async engines?
Yes. insertmanyvalues is dialect-level and transparent — it activates automatically when PostgreSQL (or SQLite ≥ 3.35) is the backend and the statement includes RETURNING. You can tune page size via execution_options(insertmanyvalues_page_size=N) or disable it engine-wide with use_insertmanyvalues=False on create_async_engine().
How do I capture generated primary keys without a follow-up SELECT?
Append .returning(Model.id) to the insert statement. SQLAlchemy's insertmanyvalues handles the batching transparently and returns a CursorResult you can iterate for the IDs. No separate SELECT is needed, and the cost is proportional to the page size rather than total row count.
What is the safest synchronize_session mode for bulk DELETE?synchronize_session=False is safest when you are certain no instances from the affected rows are currently loaded in the session. For sessions where some objects may be present, use "fetch" (adds one SELECT before the delete) or call session.expire_all() after the delete. Leaving the default "evaluate" can produce incorrect behavior for complex WHERE clauses that SQLAlchemy's Python evaluator cannot reliably model.
Can I use asyncpg COPY inside an SQLAlchemy transaction?
Yes, but with care. Access the raw connection via conn.get_raw_connection(), issue the COPY command through the raw asyncpg connection, then commit through the SQLAlchemy connection object (await conn.commit()). Do not commit or rollback through the raw asyncpg connection — SQLAlchemy's transaction tracking will desynchronize and you risk leaving the connection in an unknown state.
Related
- Advanced Query Patterns and Bulk Data Operations — parent overview covering query optimization, CTEs, window functions, and bulk operations in context.
- Batch Inserting Millions of Rows with SQLAlchemy Core — memory-safe generator patterns, driver tuning, and error handling for multi-million row ingestion.
- Benchmarking Core Executemany Bulk Insert Performance — methodology and real throughput numbers comparing ORM, Core executemany, and COPY strategies.
- Complex Joins and Relationship Loading Strategies — optimizing the read side of systems that use bulk writes for ingestion.
- Common Table Expressions (CTEs) and Recursive Queries — staging and dependency resolution patterns that complement bulk update workflows.