Batch Inserting Millions of Rows with SQLAlchemy Core
To batch insert millions of rows with SQLAlchemy 2.0 Core, pass a list of dicts to await conn.execute(insert(table), chunk) inside engine.begin() — the engine delegates to the driver's native executemany path, which on asyncpg uses binary protocol encoding and a single server-side prepared statement for the entire batch. Full technique coverage lives in the High-Performance Bulk Inserts and Updates guide.
Quick Answer
# Legacy 1.4 — session.bulk_insert_mappings (removed in 2.0)
session.bulk_insert_mappings(Order, list_of_dicts)
# Modern 2.0 — Core async bulk insert with explicit chunking
import asyncio
from itertools import islice
from sqlalchemy import insert, Table, MetaData, Column, Integer, String, Numeric
from sqlalchemy.ext.asyncio import create_async_engine, AsyncEngine
metadata = MetaData()
orders_table = Table(
"orders", metadata,
Column("id", Integer, primary_key=True),
Column("customer_id", Integer, nullable=False),
Column("amount", Numeric(12, 4), nullable=False),
Column("status", String(32), nullable=False),
)
async def batch_insert(engine: AsyncEngine, rows: list[dict], chunk_size: int = 2_000) -> None:
stmt = insert(orders_table)
it = iter(rows)
async with engine.begin() as conn:
while chunk := list(islice(it, chunk_size)):
await conn.execute(stmt, chunk)
engine.begin() wraps each block in an auto-committing transaction — no conn.commit() call required. To commit per-chunk (for very large datasets where a single transaction would exhaust WAL), move engine.begin() inside the loop.
Execution Context & Async Workflow Integration
How executemany Maps to the Driver
When conn.execute(stmt, list_of_dicts) receives a sequence, SQLAlchemy compiles the insert statement once and hands the parameter list to the underlying DBAPI's executemany() method. On asyncpg this translates to:
- A single
Parsemessage — the server prepares the statement once. - N
Bind+Executemessages, each carrying one row's parameters in efficient binary format. - A single
Syncmessage to flush and confirm.
No SQL string is re-parsed per row. The binary encoding avoids text-to-value coercion overhead on both the Python and PostgreSQL sides. The net effect is that asyncpg sustains 60,000–100,000 rows per second on typical cloud hardware for narrow tables, compared to 8,000–15,000 for ORM add_all() with the same row shape.
Generator-Based Streaming for Multi-Million Row Sources
Loading an entire CSV or upstream API response into a Python list before inserting causes MemoryError at scale. Use a generator that yields row dicts on demand:
import csv
from typing import Iterator
def stream_orders_from_csv(filepath: str) -> Iterator[dict]:
"""Yield one dict per CSV row — never holds more than one row in memory."""
with open(filepath, encoding="utf-8", newline="") as fh:
for row in csv.DictReader(fh):
yield {
"customer_id": int(row["customer_id"]),
"amount": row["amount"], # Decimal-compatible string
"status": row.get("status", "pending"),
}
Pair this with the chunking pattern from the Quick Answer section. islice materializes only chunk_size dicts at a time, keeping Python heap usage flat regardless of source size.
Per-Chunk Transaction Boundaries
For data sets larger than ~500,000 rows, a single transaction is impractical because:
- PostgreSQL's WAL accumulates all changes before flushing — a single 10 M row transaction can generate 10–40 GB of WAL.
- Autovacuum cannot reclaim dead tuples inside an open transaction.
- Any failure forces a full rollback of all previously inserted rows.
Commit per chunk and track a resumption checkpoint to support safe restarts:
import asyncio
from itertools import islice
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import Table, insert
async def resumable_bulk_insert(
engine: AsyncEngine,
table: Table,
data_stream,
chunk_size: int = 2_000,
start_offset: int = 0,
) -> int:
"""Returns total rows inserted; supports restart from start_offset."""
stmt = insert(table)
total = 0
it = islice(data_stream, start_offset, None) # skip already-inserted rows
while chunk := list(islice(it, chunk_size)):
async with engine.begin() as conn: # one transaction per chunk
await conn.execute(stmt, chunk)
total += len(chunk)
return total
Async Concurrency: Parallel Chunk Workers
For I/O-bound pipelines with multiple CPU-bound data sources, run chunk insertions concurrently using asyncio.gather(). Use a semaphore to cap database connections:
import asyncio
from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy import Table, insert
async def parallel_bulk_insert(
engine: AsyncEngine,
table: Table,
chunks: list[list[dict]],
max_concurrency: int = 5,
) -> None:
sem = asyncio.Semaphore(max_concurrency)
stmt = insert(table)
async def insert_chunk(chunk: list[dict]) -> None:
async with sem:
async with engine.begin() as conn:
await conn.execute(stmt, chunk)
await asyncio.gather(*[insert_chunk(c) for c in chunks])
Keep max_concurrency at or below pool_size to prevent connection starvation. Monitor pg_stat_activity for wait_event = 'ClientRead' spikes that indicate connection queue pressure.
Resolving Warnings, Errors & Common Mistakes
| Warning / Error | Root Cause | Production Fix |
|---|---|---|
InterfaceError: connection already closed | Awaiting on a connection whose underlying asyncpg socket closed due to server timeout or pool recycle. | Set pool_pre_ping=True on create_async_engine() and ensure command_timeout in connect_args is greater than your longest expected chunk insert time. |
ProgrammingError: can't operate on a closed result set | Iterating a CursorResult after the transaction that produced it has committed or rolled back. | Consume the result set (e.g., result.fetchall()) inside the engine.begin() block, or use .returning() and collect IDs before the context exits. |
asyncpg.exceptions.TooManyConnectionsError | More concurrent coroutines than pool_size + max_overflow are requesting connections simultaneously. | Reduce max_concurrency in parallel insert patterns, or increase pool_size and max_overflow on create_async_engine(). |
sqlalchemy.exc.StatementError: could not convert value to type int4 | Python value types don't match column types — e.g., string "123" for an Integer column. | Cast values explicitly before inserting: int(row["customer_id"]). SQLAlchemy does not silently coerce in executemany paths. |
MemoryError | Materializing millions of row dicts into a single list before passing to execute. | Stream from source using a generator and chunk with islice — never accumulate the full dataset. |
OperationalError: server closed the connection unexpectedly | Long-running transaction exceeds idle_in_transaction_session_timeout on the server. | Chunk to shorter transactions, or set server_settings={"idle_in_transaction_session_timeout": "0"} in connect_args to disable the timeout for ingestion connections. |
DataError: value too long for type character varying(N) | Source data contains strings wider than the column definition. | Validate and truncate upstream; use String columns with length=None (TEXT) if variable length is expected. |
Advanced Bulk Insert Optimization
insertmanyvalues vs Standard executemany
SQLAlchemy 2.0 introduced insertmanyvalues: when the backend supports RETURNING (PostgreSQL, SQLite ≥ 3.35), and the statement includes .returning(), SQLAlchemy rewrites the insert as batched multi-value INSERT ... VALUES (...), (...) RETURNING ... grouped in pages of up to insertmanyvalues_page_size rows (default 1000).
This is measurably faster than standard executemany for workloads that need to capture generated PKs, because it reduces round-trips from N (one per row) to ceil(N / page_size). For workloads that do not need RETURNING, standard executemany with asyncpg is comparably fast.
from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncEngine
async def insert_with_returning(
engine: AsyncEngine, rows: list[dict]
) -> list[int]:
stmt = (
insert(orders_table)
.returning(orders_table.c.id)
.execution_options(insertmanyvalues_page_size=2_000)
)
async with engine.begin() as conn:
result = await conn.execute(stmt, rows)
return [row.id for row in result]
asyncpg COPY for Maximum Throughput
When throughput requirements exceed 200,000 rows per second, abandon the SQL insert path entirely and use asyncpg's binary COPY protocol. Access the raw connection via conn.get_raw_connection():
import asyncpg
from sqlalchemy.ext.asyncio import AsyncEngine
async def copy_orders(engine: AsyncEngine, records: list[tuple]) -> None:
"""
records: list of (customer_id: int, amount: Decimal, status: str)
Column 'id' is omitted — PostgreSQL generates it via SERIAL/GENERATED.
"""
async with engine.connect() as conn:
raw: asyncpg.Connection = await conn.get_raw_connection()
await raw.copy_records_to_table(
"orders",
records=records,
columns=["customer_id", "amount", "status"],
timeout=120.0,
)
await conn.commit()
COPY bypasses row-by-row statement processing and streams data in the PostgreSQL wire binary format. It is 3–6× faster than insertmanyvalues for large batches. Constraint verification still occurs at commit — validate FK integrity before calling COPY to avoid full-batch rollbacks.
Advanced Batch Insert Patterns
Backpressure and Rate Limiting in Long-Running Pipelines
In production ingestion pipelines, inserting as fast as the database will accept data without any pacing causes two problems: it exhausts connection pool slots from concurrent workers, and it spikes WAL generation faster than PostgreSQL's background writer can flush to disk. Adding a concurrency semaphore provides implicit backpressure:
import asyncio
from sqlalchemy import insert, Table
from sqlalchemy.ext.asyncio import AsyncEngine
async def paced_bulk_insert(
engine: AsyncEngine,
table: Table,
chunks: list[list[dict]],
max_concurrent: int = 4,
) -> None:
sem = asyncio.Semaphore(max_concurrent)
stmt = insert(table)
async def insert_one(chunk: list[dict]) -> None:
async with sem:
async with engine.begin() as conn:
await conn.execute(stmt, chunk)
await asyncio.gather(*[insert_one(c) for c in chunks])
Set max_concurrent to match pool_size on the engine. Running more concurrent insert coroutines than available pool slots causes them to queue inside SQLAlchemy's connection pool, which is fine for throughput but creates long pool-wait latencies that mask the actual database-side bottleneck.
Validating Row Schema Before Insert
Silent type coercion failures are the hardest bulk insert bugs to diagnose: PostgreSQL may accept a string "123" for an INTEGER column through the text protocol, but asyncpg's binary protocol will reject it with DataError: invalid input for query argument. Pre-validate dict keys and types before passing to execute:
from sqlalchemy import Table
from typing import Any
def validate_rows(table: Table, rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Ensure all rows have only known column keys and no None in NOT NULL columns."""
valid_keys = {col.name for col in table.c}
nullable = {col.name for col in table.c if col.nullable}
errors = []
for i, row in enumerate(rows):
unknown = set(row) - valid_keys
if unknown:
errors.append(f"Row {i}: unknown keys {unknown}")
for col in table.c:
if not col.nullable and col.name != "id" and row.get(col.name) is None:
errors.append(f"Row {i}: NULL in NOT NULL column '{col.name}'")
if errors:
raise ValueError(f"Row validation failed:\n" + "\n".join(errors[:10]))
return rows
Run this validation on a sample of the first chunk during development, then disable it or move it to a separate offline validation step for production throughput.
Frequently Asked Questions
Does engine.begin() issue one transaction for all chunks or one per chunk?
One per engine.begin() block. If you write the chunking loop inside a single engine.begin(), all chunks share one transaction. If you place engine.begin() inside the loop, each chunk gets its own transaction with its own commit. Use the inner placement for large datasets to limit WAL growth and allow autovacuum to operate on committed data between chunks.
How do I handle duplicate key violations during bulk insert?
Use insert(table).on_conflict_do_nothing() to silently skip conflicting rows, or on_conflict_do_update() to upsert. Both work with executemany and with insertmanyvalues. Catching IntegrityError at the Python level requires re-inserting the entire chunk row by row, which is impractical at scale and should be avoided.
Can I use this pattern with SQLite in async tests?
Yes, with aiosqlite. Replace the DSN with sqlite+aiosqlite:///test.db. Note that aiosqlite does not support the binary protocol optimizations asyncpg provides, so throughput is lower. For test databases, use check_same_thread=False in connect_args and avoid concurrent writers — SQLite's write lock is file-level and concurrent inserts will produce OperationalError: database is locked.
Is it safe to share a single AsyncEngine across multiple async tasks doing bulk inserts?
Yes — AsyncEngine is designed for concurrent use. Each engine.begin() or engine.connect() call checks out a separate connection from the pool. Set pool_size and max_overflow to accommodate your expected concurrency level. As a rule of thumb, set pool_size equal to the number of concurrent insert coroutines you expect to run simultaneously.
What happens if a chunk insert fails partway through a multi-chunk pipeline?
If each chunk has its own engine.begin() block, only the failing chunk rolls back — all previously committed chunks are durable. Implement a checkpoint (e.g., writing the last committed offset to a status table) so you can resume from the failure point without re-inserting already-committed data.
Related
- High-Performance Bulk Inserts and Updates — parent guide covering upsert, RETURNING, synchronize_session, and asyncpg COPY in full context.
- Benchmarking Core Executemany Bulk Insert Performance — measuring and comparing ORM, Core executemany, and COPY with real throughput numbers.
- Configuring Async Engines and Connection Pools — pool sizing and
connect_argstuning that directly affects bulk insert throughput. - Streaming Large Result Sets with yield_per — the read-side complement to bulk inserts for processing large datasets incrementally.