Setting Up an Async Engine from Scratch
To set up a SQLAlchemy async engine from scratch, install sqlalchemy and asyncpg, build a postgresql+asyncpg:// URL, call create_async_engine(), wrap it with async_sessionmaker, run your first query, and call await engine.dispose() on shutdown — all covered in Configuring Async Engines and Connection Pools.
Quick Answer
The following script is a complete, runnable example. It covers every step from installation through graceful shutdown. Copy it, substitute your credentials, and run it with python engine_setup.py.
Step 1 — Install dependencies
pip install sqlalchemy asyncpg
asyncpg is a pure-async PostgreSQL driver written in Cython. SQLAlchemy's async layer knows how to wrap it through the postgresql+asyncpg dialect string, so no other adapter is needed.
Step 2 — Complete runnable script
"""engine_setup.py — SQLAlchemy 2.0 async engine, session, query, and shutdown."""
import asyncio
import os
from urllib.parse import quote_plus
from sqlalchemy import Integer, String, select
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
# ---------------------------------------------------------------------------
# 1. Build the connection URL from environment variables.
# quote_plus() handles special characters in passwords (@, #, %, etc.).
# ---------------------------------------------------------------------------
DB_HOST = os.environ.get("DB_HOST", "localhost")
DB_PORT = os.environ.get("DB_PORT", "5432")
DB_NAME = os.environ.get("DB_NAME", "storefront")
DB_USER = os.environ.get("DB_USER", "appuser")
DB_PASS = quote_plus(os.environ.get("DB_PASS", "s3cr3t!"))
DATABASE_URL = (
f"postgresql+asyncpg://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
)
# ---------------------------------------------------------------------------
# 2. Create the async engine.
# pool_size — number of persistent connections kept open.
# max_overflow — extra connections allowed above pool_size during spikes.
# pool_pre_ping — sends a lightweight "SELECT 1" before each checkout
# so stale connections are recycled automatically.
# echo — set True during development to log every SQL statement.
# ---------------------------------------------------------------------------
engine = create_async_engine(
DATABASE_URL,
pool_size=10,
max_overflow=5,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=False,
)
# ---------------------------------------------------------------------------
# 3. Create the session factory.
# expire_on_commit=False keeps ORM attributes readable after commit
# without needing a second round-trip to the database.
# class_=AsyncSession is the default but shown here for clarity.
# ---------------------------------------------------------------------------
AsyncSessionLocal = async_sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False,
)
# ---------------------------------------------------------------------------
# 4. Define a minimal ORM model.
# ---------------------------------------------------------------------------
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "orders"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
customer_name: Mapped[str] = mapped_column(String(120))
status: Mapped[str] = mapped_column(String(32), default="pending")
# ---------------------------------------------------------------------------
# 5. A realistic query — fetch an order by primary key.
# ---------------------------------------------------------------------------
async def get_order(order_id: int) -> Order | None:
async with AsyncSessionLocal() as session:
result = await session.execute(
select(Order).where(Order.id == order_id)
)
return result.scalars().first()
# ---------------------------------------------------------------------------
# 6. Graceful shutdown — dispose() closes every connection in the pool
# and cancels any pending checkouts. Call this in your lifespan handler
# or atexit hook so PostgreSQL does not see abrupt disconnects.
# ---------------------------------------------------------------------------
async def shutdown() -> None:
await engine.dispose()
# ---------------------------------------------------------------------------
# 7. Entry-point for local testing.
# ---------------------------------------------------------------------------
async def main() -> None:
# Create tables (dev/test only — use Alembic migrations in production).
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
order = await get_order(order_id=1)
if order:
print(f"Order {order.id}: {order.customer_name} — {order.status}")
else:
print("Order not found.")
await shutdown()
if __name__ == "__main__":
asyncio.run(main())
Every import is explicit, the URL is built from environment variables, passwords are percent-encoded, and the engine is disposed before the process exits.
Execution Context & Async Workflow Integration
Why asyncpg + SQLAlchemy async works without greenlets
Traditional SQLAlchemy sync drivers (psycopg2, cx_Oracle) are blocking: each call occupies the OS thread until the database responds. To use them in async frameworks, you would need run_in_executor to move the blocking call off the event loop thread, or a greenlet-based shim.
asyncpg is different. It implements the PostgreSQL wire protocol using Python's native asyncio primitives — asyncio.StreamReader, asyncio.StreamWriter, and asyncio.Queue. When your coroutine awaits a query, control returns to the event loop, which can service other coroutines (handling HTTP requests, processing messages, etc.) while the network round-trip completes. No threads, no greenlets.
AsyncAdaptedQueuePool
SQLAlchemy's async layer uses AsyncAdaptedQueuePool rather than the synchronous QueuePool. It wraps the same queue-based pool management logic but replaces blocking threading.Condition calls with asyncio.Event and asyncio.Lock. The result is that connection borrowing and returning are themselves awaitable operations that yield control to the event loop instead of blocking a thread.
When pool_pre_ping=True is set, the pool calls SELECT 1 on each connection before handing it to your code. If the connection is stale (the database restarted, a firewall dropped the TCP session, etc.), asyncpg raises an error, the pool discards that connection, opens a fresh one, and retries — transparently to your application code.
How the session fits in
async_sessionmaker creates a factory that produces AsyncSession objects. Each session borrows a connection from the pool on first use and returns it when the async with block exits. Setting expire_on_commit=False is especially important in async code: with the default expire_on_commit=True, accessing an ORM attribute after commit() would trigger a lazy load, which is not allowed in async sessions and raises MissingGreenlet. Disabling expiry means attributes retain their in-memory values after commit so you can safely return domain objects from service functions without an extra await session.refresh(obj).
For further tuning options, including pool_size guidance for high-traffic services, see Setting Up asyncpg Connection Pool Size for High Concurrency.
The broader async engine and dialect landscape — including when to choose asyncpg versus psycopg3's async mode — is covered in the Async Engines, Dialects, and Connection Pooling section.
Resolving Warnings, Errors & Common Mistakes
| Exact error string | Root cause | Production fix |
|---|---|---|
ModuleNotFoundError: No module named 'asyncpg' | asyncpg is not installed in the active virtual environment. This is the most common first-run error. | Run pip install asyncpg inside the correct venv. Verify with pip show asyncpg. Pin the version in requirements.txt or pyproject.toml. |
sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called | ORM lazy-loading was triggered inside an async context. Typically caused by accessing a relationship attribute after commit without expire_on_commit=False, or by calling a sync SQLAlchemy function that internally tries to emit SQL. | Set expire_on_commit=False on async_sessionmaker. Use selectinload() or joinedload() for relationships instead of relying on lazy loads. Never call sync SQLAlchemy APIs (Session.execute, session.get) inside async code. |
ValueError: Could not parse rfc1738 URL from string or ArgumentError: Could not parse SQLAlchemy URL | The dialect string is malformed — often postgresql:// instead of postgresql+asyncpg://, or a special character in the password is not percent-encoded. | Use urllib.parse.quote_plus() to encode the password component. Use the exact dialect string postgresql+asyncpg. Validate by printing the URL before passing it to create_async_engine. |
asyncio.TimeoutError or sqlalchemy.exc.TimeoutError: QueuePool limit … overflow … timed out | All connections in the pool (including overflow slots) are checked out, and no connection became available within pool_timeout seconds. Common under sustained high concurrency or when sessions are not closed promptly. | Ensure every AsyncSession is used inside async with. Increase pool_size and max_overflow proportionally to the number of concurrent requests. Add connection lifecycle monitoring (see pool event hooks). |
sqlalchemy.exc.OperationalError: (asyncpg.exceptions.ConnectionDoesNotExistError) connection was closed or connection refused | The database is unreachable — wrong host/port, PostgreSQL not running, or a firewall rule blocking the connection. | Verify DB_HOST and DB_PORT environment variables. Confirm PostgreSQL is running (pg_isready -h $DB_HOST -p $DB_PORT). Enable pool_pre_ping=True so the pool recovers from transient outages without propagating stale-connection errors to application code. |
Advanced Engine Initialization Optimization
Pre-warming the connection pool on startup
By default, SQLAlchemy opens connections lazily — the first requests after startup compete for fresh connection slots, which can add noticeable latency to your cold-start response times. In production services where latency SLOs are tight (for example, a B2B tenant management API serving Invoice and Tenant queries), pre-warming the pool during application startup eliminates that first-request spike.
"""pool_warmup.py — pre-warm the asyncpg connection pool at application startup."""
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine, AsyncConnection
from typing import Sequence
engine = create_async_engine(
"postgresql+asyncpg://appuser:s3cr3t@localhost:5432/storefront",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
async def _ping(conn: AsyncConnection) -> None:
"""Validate a single connection with a lightweight query."""
await conn.execute(text("SELECT 1"))
async def prewarm_pool(pool_size: int = 10) -> None:
"""
Open `pool_size` connections concurrently so the pool is fully
populated before the first real request arrives.
"""
async with asyncio.TaskGroup() as tg:
conns: list[AsyncConnection] = []
for _ in range(pool_size):
conn = await engine.connect()
conns.append(conn)
tg.create_task(_ping(conn))
# Return all connections to the pool.
for conn in conns:
await conn.close()
print(f"Pool pre-warmed: {pool_size} connections ready.")
# FastAPI lifespan example:
#
# from contextlib import asynccontextmanager
# from fastapi import FastAPI
#
# @asynccontextmanager
# async def lifespan(app: FastAPI):
# await prewarm_pool(pool_size=10)
# yield
# await engine.dispose()
#
# app = FastAPI(lifespan=lifespan)
asyncio.TaskGroup (Python 3.11+) runs all pings concurrently and raises the first exception if any connection fails, giving you a fast fail-safe during startup rather than a slow cascade of errors under load.
An alternative to explicit pre-warming is the creator= parameter, which lets you supply a factory function that asyncpg calls when it needs a new raw connection. This is useful when connecting through a proxy that requires a per-connection handshake (for example, Cloud SQL Auth Proxy or PgBouncer with SCRAM authentication), where the default connection string approach is insufficient.
Frequently Asked Questions
Can I reuse the same create_async_engine engine for synchronous SQLAlchemy code?
No. AsyncEngine wraps an async-only connection pool and cannot be passed to synchronous Session or Connection objects. If you need both sync and async access in the same application (for example, during a migration period), create two separate engines — one with create_engine("postgresql+psycopg2://...") for sync code and one with create_async_engine("postgresql+asyncpg://...") for async code. They can point at the same database but manage independent connection pools.
How do I write tests for async database code without a real PostgreSQL instance?
Use aiosqlite as an in-process SQLite driver for unit and integration tests:
# conftest.py
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
@pytest_asyncio.fixture
async def db_session():
test_engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
echo=False,
)
async with test_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
factory = async_sessionmaker(bind=test_engine, expire_on_commit=False)
async with factory() as session:
yield session
await test_engine.dispose()
Install aiosqlite alongside pytest-asyncio (pip install aiosqlite pytest-asyncio). Be aware that SQLite does not support all PostgreSQL-specific types and functions; for dialect-specific queries, prefer testing against a real PostgreSQL instance running in Docker.
What happens if I forget to call await engine.dispose() on shutdown?
The OS will reclaim all open file descriptors when the process exits, so your application will not hang indefinitely. However, PostgreSQL will see abrupt TCP disconnects rather than clean CLOSE messages. This leaves connections in the idle state on the server side until tcp_keepalives_idle expires (often minutes), which temporarily consumes max_connections slots. In high-restart-frequency environments (Kubernetes rolling deploys, Lambda cold starts) this can cause connection exhaustion on the database server. Always call await engine.dispose() in a lifespan handler, atexit hook, or signal handler to send clean disconnects.
Is echo=True safe in production for debugging a specific query?
Enabling echo=True writes every SQL statement and its parameters to the Python logging system at the INFO level. This is fine for short debugging sessions but carries two risks in production: it can expose sensitive data (customer names, invoice amounts, authentication tokens) in log aggregation systems, and it adds measurable I/O overhead under high query volume. Instead, configure the sqlalchemy.engine logger at DEBUG level only for the specific request or task you are investigating, then revert. SQLAlchemy also supports echo="debug" which additionally logs result rows — avoid this in production entirely.
Related
- Configuring Async Engines and Connection Pools — parent guide covering the full engine configuration surface
- Setting Up asyncpg Connection Pool Size for High Concurrency — production pool sizing formulas and overflow tuning
- Handling Connection Leaks and Pool Exhaustion — diagnosing and fixing pool exhaustion in running services
- Choosing Between asyncpg and psycopg Async Drivers — when to switch from asyncpg to psycopg3's async mode