Async Engines, Dialects, and Connection Pooling in SQLAlchemy 2.0

SQLAlchemy 2.0's async subsystem — built on create_async_engine, AsyncAdaptedQueuePool, and native async DBAPI drivers — gives Python backend teams a production-grade toolkit for high-concurrency database workloads; this page covers every layer of that stack, from greenlet bridge internals to cloud pool tuning numbers, for developers who want to go beyond the basic wiring and run async SQLAlchemy correctly under load.

SQLAlchemy Async Request Flow Diagram showing an async HTTP request passing through AsyncSession, AsyncEngine, AsyncAdaptedQueuePool, the greenlet sync/async bridge, asyncpg driver, and finally PostgreSQL. Application ORM Layer Pool Driver Database async HTTP Request FastAPI / Starlette AsyncSession await session.execute(stmt) AsyncEngine create_async_engine() AsyncAdaptedQueuePool asyncio.Queue-backed asyncpg driver binary protocol sync bridge greenlet bridge sync_io → await run_sync() / AsyncAdapt PostgreSQL TCP / TLS

Architectural Foundations

How the Async Engine Works

create_async_engine() does not instantiate a fundamentally different engine. It wraps a standard Engine inside an AsyncEngine adapter that translates every synchronous DBAPI operation into awaitable coroutines. The underlying connection pool, dialect stack, and SQL compilation pipeline are unchanged — what changes is the execution path that crosses the Python↔C boundary into the DBAPI layer.

When your code calls await session.execute(stmt), SQLAlchemy compiles the statement normally, then delegates to the async connection's execute() coroutine. The async connection in turn awaits the raw DBAPI cursor operation provided by the driver (asyncpg, psycopg3, aiomysql, aiosqlite). The event loop is never blocked; execution suspends at the await and resumes when the driver signals I/O completion.

# Async — full wiring from engine to result
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Order(Base):
    __tablename__ = "orders"
    id: Mapped[int] = mapped_column(primary_key=True)
    customer_id: Mapped[int]
    total_cents: Mapped[int]

DATABASE_URL = "postgresql+asyncpg://app:secret@localhost:5432/orders_db"

engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10, max_overflow=5)
async_session = async_sessionmaker(engine, expire_on_commit=False)

async def get_large_orders(min_cents: int) -> list[Order]:
    async with async_session() as session:
        stmt = select(Order).where(Order.total_cents >= min_cents)
        result = await session.execute(stmt)
        return result.scalars().all()

asyncio.run(get_large_orders(10_000))

The Greenlet Bridge

SQLAlchemy's async adapter relies on the greenlet library to handle the impedance mismatch between its synchronous ORM internals (attribute loading, lazy-relation traversal, before/after event hooks) and the async execution context.

When the async engine needs to call into synchronous code — for example, ORM event hooks or the connection pool's internal bookkeeping — it uses greenlet.spawn() to switch into a synchronous greenlet, completes the operation, then switches back. This "sync/async bridge" is invisible during normal await-based workflows, but it surfaces as MissingGreenletError the instant you trigger synchronous I/O from outside the bridge (typically lazy-loading a relationship in async context).

The practical rule: any attribute access that would trigger SQL must be eagerly loaded. Use selectinload() or joinedload() at query time, or explicitly await session.refresh(obj, attribute_names=["relationship"]) after the fact. The integrating SQLAlchemy async with FastAPI and Starlette guide shows how to wire this correctly in request-scoped session factories.

AsyncAdaptedQueuePool

The default pool for async engines is AsyncAdaptedQueuePool. It replaces QueuePool's threading primitives with asyncio.Queue and asyncio.Lock, so connection checkout and return are coroutine-safe without OS thread involvement.

Key internal differences from QueuePool:

AttributeQueuePool (sync)AsyncAdaptedQueuePool (async)
Wait primitivethreading.Lockasyncio.Lock
Overflow queuequeue.Queueasyncio.Queue
Connection check-insynchronousawait pool.checkin()
Pre-ping mechanismthread-blockingnon-blocking coroutine

Under the hood, AsyncAdaptedQueuePool delegates to an AsyncAdaptedQueuePool-wrapped QueuePool whose synchronous methods are called through greenlet bridges — meaning the pool configuration parameters (pool_size, max_overflow, pool_timeout, pool_recycle, pool_pre_ping) are identical to their synchronous counterparts.

DBAPI Adaptation Layer

SQLAlchemy does not talk directly to asyncpg or psycopg3. It uses an AsyncAdapt* shim layer: AsyncAdaptedConnection wraps the driver's native async connection and exposes a synchronous-looking DBAPI 2.0 interface, which the engine's dialect stack expects. This indirection is why you can use all the familiar text(), select(), insert() constructs without modification — only the execution path changes.

Key Component Deep-Dive 1: Async Driver Selection and create_async_engine

The choice of async DBAPI driver has significant runtime implications. Each driver has distinct wire-protocol behavior, prepared-statement caching defaults, and error surface.

Driver Comparison

DriverURL prefixProtocolPrepared stmt cacheNotable constraint
asyncpgpostgresql+asyncpg://binaryOn by defaultIncompatible with PgBouncer transaction mode (unless statement_cache_size=0)
psycopg3 (async)postgresql+psycopg://text + binaryOptionalRequires psycopg[binary] or psycopg[c] for best performance
aiomysqlmysql+aiomysql://MySQL textNoneNo server-side cursor; large result sets buffer in RAM
asyncmymysql+asyncmy://MySQL textNoneLower latency than aiomysql at high QPS
aiosqlitesqlite+aiosqlite://SQLite fileN/ARuns sync SQLite in threadpool; not for production Postgres workloads

For a full treatment of asyncpg vs psycopg3 trade-offs, see the choosing between asyncpg and psycopg async drivers guide. For SQLite and MySQL driver selection rationale, the selecting async drivers for SQLite, MySQL, and Postgres page covers the decision matrix.

create_async_engine Parameters

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool, NullPool

# Production PostgreSQL engine (asyncpg)
pg_engine = create_async_engine(
    "postgresql+asyncpg://app:secret@db.internal:5432/app_db",
    # Pool sizing
    poolclass=AsyncAdaptedQueuePool,
    pool_size=10,           # persistent connections per process
    max_overflow=5,         # burst connections (total max = 15)
    pool_timeout=30,        # seconds to wait for a connection before TimeoutError
    pool_recycle=1800,      # recycle connections every 30 min (RDS idle timeout is 8h but network gear can kill sooner)
    pool_pre_ping=True,     # issue SELECT 1 before handing out stale connections
    # Dialect / driver kwargs passed through connect_args
    connect_args={
        "statement_cache_size": 0,   # REQUIRED if behind PgBouncer transaction mode
        "server_settings": {"application_name": "orders-api"},
    },
    echo=False,
)

# Serverless / AWS Lambda — no pooling
lambda_engine = create_async_engine(
    "postgresql+asyncpg://app:secret@db.internal:5432/app_db",
    poolclass=NullPool,   # no pool; each request opens and closes a raw connection
)

# psycopg3 alternative (drop-in for most workloads)
psycopg_engine = create_async_engine(
    "postgresql+psycopg://app:secret@db.internal:5432/app_db",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
)

# MySQL (aiomysql)
mysql_engine = create_async_engine(
    "mysql+aiomysql://app:secret@db.internal:3306/app_db",
    pool_size=10,
    max_overflow=5,
    pool_recycle=3600,
)

# SQLite for testing and local development
sqlite_engine = create_async_engine(
    "sqlite+aiosqlite:///./test.db",
    connect_args={"check_same_thread": False},
)

Dialect-Specific Gotchas

Every driver surfaces at least one non-obvious runtime behavior in production. asyncpg's prepared-statement cache conflicts with PgBouncer's transaction-mode pooling because each new physical connection gets a fresh prepared-statement namespace, invalidating the cache entries. psycopg3 defaults to autocommit at the driver level, requiring explicit transaction wrapping. aiomysql does not expose server-side cursors, meaning yield_per() streams rows through RAM rather than wire. Full coverage lives in the dialect-specific gotchas and driver quirks reference.

Key Component Deep-Dive 2: Connection Pool Configuration

Getting pool configuration wrong is the most common cause of production incidents with async SQLAlchemy. Too small and you queue requests until TimeoutError; too large and you exhaust database connection slots.

Parameters and Their Interactions

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import AsyncAdaptedQueuePool

engine = create_async_engine(
    "postgresql+asyncpg://app:secret@postgres:5432/app_db",
    poolclass=AsyncAdaptedQueuePool,

    # Base pool — connections held open permanently
    pool_size=10,

    # Overflow — created on demand, closed when returned (total cap = pool_size + max_overflow)
    max_overflow=5,

    # How long (seconds) a checkout() call waits before raising TimeoutError
    # Raise QueuePool limit error: "QueuePool limit of size 10 overflow 5 reached, connection timed out, timeout 30"
    pool_timeout=30,

    # Force recycle connections older than N seconds (prevents stale connections from cloud NAT)
    pool_recycle=1800,

    # Issue a lightweight SELECT 1 before returning a connection from the pool
    # Catches connections killed by DB server restart or cloud instance failover
    pool_pre_ping=True,
)

pool_size sets the number of connections kept alive between requests. For an async application, this is per-process. With 4 Gunicorn uvicorn workers, a pool_size=10 means 40 connections to Postgres before overflow.

max_overflow adds burst capacity. Overflow connections are opened on demand and closed immediately when returned to the pool (not recycled). They count against the database's max_connections limit.

pool_timeout is the maximum wait before sqlalchemy.exc.TimeoutError is raised with the message QueuePool limit of size X overflow Y reached, connection timed out, timeout 30. Set it shorter than your HTTP server's request timeout to fail fast and surface saturation.

pool_recycle forces a connection refresh after N seconds regardless of health. AWS RDS and Aurora set idle connection timeout at 8 hours by default, but TCP middleboxes (NAT gateways, load balancers) can silently drop connections in 5–30 minutes. A value of 1800 (30 minutes) is safe for most cloud setups.

pool_pre_ping issues a SELECT 1 when a connection is checked out from the pool. This adds ~1 ms of latency per request but prevents the asyncpg.exceptions.ConnectionDoesNotExistError / OperationalError: server closed the connection unexpectedly that fires after a DB failover event. Enable it unconditionally in cloud environments.

NullPool for Serverless Environments

Lambda functions, Cloud Run containers, and other ephemeral compute cannot maintain persistent pool connections. The solution is NullPool, which skips the pool entirely:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool

# Each call to engine.connect() opens a fresh TCP connection.
# Each call to conn.close() tears it down immediately.
engine = create_async_engine(
    "postgresql+asyncpg://app:secret@db.internal/app_db",
    poolclass=NullPool,
)

For Lambda at scale, pair NullPool with RDS Proxy or PgBouncer in transaction-pool mode upstream. The proxy maintains persistent connections to Postgres on your behalf; Lambda gets a fresh proxy connection per invocation without exhausting max_connections. Full guidance lives in the configuring async engines and connection pools reference and the tuning connection pools for cloud databases deep-dive.

Pool Events for Observability

from sqlalchemy import event
from sqlalchemy.pool import AsyncAdaptedQueuePool
import logging

logger = logging.getLogger("db.pool")

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, connection_record):
    logger.info("Pool: new connection opened")

@event.listens_for(engine.sync_engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
    logger.debug("Pool: connection checked out")

@event.listens_for(engine.sync_engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
    logger.debug("Pool: connection returned to pool")

Note the engine.sync_engine access — pool events are registered on the underlying synchronous engine, not the async wrapper. This is one of the few places the sync/async boundary is visible in application code. The handling connection leaks and pool exhaustion guide expands on pool monitoring and leak detection with pool_status().

Key Component Deep-Dive 3: Framework Integration

FastAPI and Starlette: Request-Scoped AsyncSession

The canonical FastAPI pattern creates one engine at startup, one async_sessionmaker factory, and injects a fresh session per request via a dependency:

from contextlib import asynccontextmanager
from typing import AsyncGenerator, Annotated

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)
from sqlalchemy import select
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    is_active: Mapped[bool] = mapped_column(default=True)

DATABASE_URL = "postgresql+asyncpg://app:secret@postgres:5432/app_db"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
    pool_recycle=1800,
)
# expire_on_commit=False prevents DetachedInstanceError when accessing
# ORM attributes after the session has committed and closed
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)


@asynccontextmanager
async def lifespan(app: FastAPI):
    yield  # engine initialised at module level
    await engine.dispose()  # drain pool on shutdown


app = FastAPI(lifespan=lifespan)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionFactory() as session:
        yield session
        # AsyncSessionFactory context manager commits or rolls back here


DBDep = Annotated[AsyncSession, Depends(get_db)]


@app.get("/users/{user_id}")
async def read_user(user_id: int, db: DBDep) -> dict:
    stmt = select(User).where(User.id == user_id, User.is_active.is_(True))
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()
    if user is None:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail="User not found")
    return {"id": user.id, "email": user.email}

expire_on_commit=False is critical in async FastAPI code. By default, SQLAlchemy marks all ORM attributes as "expired" after a commit, requiring a fresh SELECT to access them. In async context, that SELECT fires as a lazy load on attribute access — outside an active async context — raising MissingGreenletError. Setting expire_on_commit=False keeps the in-memory state valid after commit, trading a tiny amount of stale-data risk for correct async behavior. The dedicated using expire_on_commit=False in FastAPI dependencies page covers the trade-offs in depth.

Celery Workers: Async SQLAlchemy in Sync Task Context

Celery tasks run in synchronous worker processes, not inside an asyncio event loop. The two integration strategies are:

Strategy 1 — Run an event loop per task (simple, safe for low-throughput workers):

import asyncio
from celery import Celery
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, update
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Invoice(Base):
    __tablename__ = "invoices"
    id: Mapped[int] = mapped_column(primary_key=True)
    status: Mapped[str]
    total_cents: Mapped[int]

celery_app = Celery("tasks", broker="redis://localhost:6379/0")

engine = create_async_engine(
    "postgresql+asyncpg://app:secret@postgres:5432/app_db",
    pool_size=5,
    max_overflow=2,
    pool_pre_ping=True,
)
AsyncSessionFactory = async_sessionmaker(engine, expire_on_commit=False)


async def _mark_invoice_paid(invoice_id: int) -> None:
    async with AsyncSessionFactory() as session:
        async with session.begin():
            stmt = (
                update(Invoice)
                .where(Invoice.id == invoice_id)
                .values(status="paid")
            )
            await session.execute(stmt)


@celery_app.task
def mark_invoice_paid(invoice_id: int) -> None:
    asyncio.run(_mark_invoice_paid(invoice_id))

Strategy 2 — Shared event loop with gevent or asyncio worker pool (for high-throughput Celery):

For workers that process many tasks concurrently, use asyncio.get_event_loop().run_until_complete() with a persistent loop per worker process, or switch to a Celery executor that natively supports asyncio. The using SQLAlchemy async with Celery task workers page covers both approaches with benchmark data.

A key concern for Celery is pool_size. Each Celery worker process spins up its own engine instance. With 8 worker processes and pool_size=5, you consume 40 Postgres connections at rest before overflow. Size accordingly and consider using NullPool + PgBouncer for task workers with sporadic access patterns.

Advanced Patterns & Production Configuration

Real Pool Numbers for Common Cloud Configurations

Postgres's hard max_connections is a finite resource. The formula for pool_size per application process is:

pool_size = (max_connections - reserved_for_superuser) / (total_app_processes)

Typical cloud configurations:

Instancemax_connectionsSuggested pool_sizemax_overflowNotes
RDS db.t4g.medium (4 GB)1708 (with 20 replicas)4Reserve 10 for RDS internal + admin
RDS db.r6g.xlarge (32 GB)130625 (with 40 replicas)10Comfortable headroom
Aurora Serverless v2Scales with ACU10–205Use RDS Proxy to multiplex
Cloud SQL (n2-standard-4)100020 (with 40 replicas)10Cloud SQL Proxy handles TLS termination
Supabase (free tier)6052Use PgBouncer (transaction mode, port 6543)

For a detailed derivation and Aurora-specific tuning, see tuning connection pools for cloud databases.

PgBouncer and RDS Proxy: Transaction-Mode Pooling

When your application pool sits in front of PgBouncer or RDS Proxy in transaction-mode, each SQL transaction may execute on a different physical Postgres connection. This breaks several asyncpg features:

  1. Prepared statement cache — asyncpg caches prepared statements per physical connection. A cache miss under PgBouncer raises asyncpg.exceptions.InvalidCachedStatementError. Fix: pass statement_cache_size=0 in connect_args.
  2. Advisory lockspg_advisory_lock() is connection-scoped; transaction mode invalidates them.
  3. LISTEN/NOTIFY — requires a dedicated, session-mode connection bypassing PgBouncer.
# Engine configured for PgBouncer transaction mode
engine = create_async_engine(
    "postgresql+asyncpg://app:secret@pgbouncer:6432/app_db",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
    connect_args={
        "statement_cache_size": 0,       # disable prepared-statement cache
        "prepared_statement_cache_size": 0,  # asyncpg >= 0.27
    },
)

RDS Proxy handles prepared statements transparently, so statement_cache_size=0 is not required there. But RDS Proxy does add ~1–3 ms of latency per query due to TLS termination and multiplexing overhead.

Read Replica Routing

For read-heavy workloads, route SELECT statements to a read replica while writes go to the primary. This requires two engines:

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, text

PRIMARY_URL = "postgresql+asyncpg://app:secret@primary.db.internal:5432/app_db"
REPLICA_URL = "postgresql+asyncpg://app:secret@replica.db.internal:5432/app_db"

write_engine = create_async_engine(PRIMARY_URL, pool_size=10, max_overflow=5, pool_pre_ping=True)
read_engine  = create_async_engine(REPLICA_URL, pool_size=20, max_overflow=10, pool_pre_ping=True)

WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession  = async_sessionmaker(read_engine,  expire_on_commit=False)


async def get_product_catalog(category: str) -> list[dict]:
    """Read from replica — eventual consistency is acceptable here."""
    async with ReadSession() as session:
        stmt = select(text("id, name, price_cents")).where(
            text("category = :cat")
        ).bindparams(cat=category)
        result = await session.execute(stmt)
        return [dict(row) for row in result.mappings()]


async def create_product(name: str, price_cents: int, category: str) -> int:
    """Write to primary — strong consistency required."""
    from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

    class Product:  # simplified for illustration
        pass

    async with WriteSession() as session:
        async with session.begin():
            session.add_all([])  # placeholder
        return 0

The advanced query patterns and bulk data operations pillar covers multi-tenant schema routing and dynamic replica selection in more depth.

Engine Disposal and Graceful Restart

await engine.dispose() closes all connections in the pool and, by default, waits for outstanding checkouts to complete. In containerized environments with SIGTERM-based graceful shutdown windows, call it inside the ASGI lifespan finally block:

@asynccontextmanager
async def lifespan(app: FastAPI):
    try:
        yield
    finally:
        await engine.dispose()
        await read_engine.dispose()

Pass close=False to engine.dispose(close=False) if you want to invalidate the pool but allow reconnection on the next request — useful for forking processes (Gunicorn prefork model) where the parent process should not close connections that child processes will inherit.

Common Pitfalls & Anti-Patterns

The errors below appear verbatim in production logs. Each has a single, addressable root cause.

  • sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called — You accessed a lazy-loaded ORM relationship (or triggered an expired attribute) from outside the SQLAlchemy greenlet bridge, typically by accessing obj.relationship after the session closed. Fix: load all needed relationships eagerly with selectinload() / joinedload() at query time, or call await session.refresh(obj, ["relationship"]) before closing the session. Set expire_on_commit=False to prevent attribute expiry on commit triggering the same error.
  • greenlet.error: cannot switch to a different thread / GreenletSpawnError — An async SQLAlchemy operation was initiated from a thread that did not create the greenlet. Common cause: running asyncio.run() inside a Celery beat thread or inside a concurrent.futures.ThreadPoolExecutor. Fix: ensure all async SQLAlchemy calls originate from the same thread that owns the event loop. See fixing GreenletSpawnError in async SQLAlchemy workflows.
  • sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 5 reached, connection timed out, timeout 30 — The pool is saturated. All 15 connections (pool_size + max_overflow) are checked out and no new connection was available within pool_timeout seconds. Root causes: long-running transactions holding connections, slow queries, or pool_size too small for the request rate. Fix: increase pool_size or max_overflow, find and fix slow transactions, or add a read replica. Enable pool checkout events to measure wait time.
  • asyncpg.exceptions.InvalidCachedStatementError: cached statement plan is invalid — asyncpg's prepared-statement cache is returning a plan that PgBouncer routed to a different physical backend that has no record of the statement. Fix: connect_args={"statement_cache_size": 0}. If you control PgBouncer, switching to session-mode pooling also resolves it, at the cost of fewer multiplexed connections.
  • RuntimeError: Event loop is closed — An await call reached the event loop after it had been closed, often because code runs asyncio.run() at the top level and then attempts to reuse an engine created before the run. Fix: create the engine inside the coroutine passed to asyncio.run(), or use a single persistent loop lifecycle managed by the ASGI server.
  • sqlalchemy.orm.exc.DetachedInstanceError: Instance <User> is not bound to a Session — An ORM object was accessed after its parent session closed, triggering lazy-load on an expired attribute. This is the sync equivalent of MissingGreenletError. Fix: expire_on_commit=False on the session factory, plus eager-load all relationships you access after session close.
  • asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already — Your application's aggregate connection count (all processes × pool_size + max_overflow) exceeds Postgres max_connections. Fix: reduce pool_size, add PgBouncer or RDS Proxy upstream, or upgrade to a larger DB instance. The setting pool_size and max_overflow for AWS RDS guide has instance-specific numbers.

Frequently Asked Questions

Can I use a synchronous DBAPI driver with create_async_engine? No. create_async_engine will raise ArgumentError or InvalidRequestError at engine creation time if the driver URL does not map to a known async dialect (asyncpg, psycopg3 async, aiomysql, asyncmy, aiosqlite). Synchronous drivers (psycopg2, cx_Oracle, pymysql) block the OS thread and cannot be awaited. If you need to keep a synchronous driver temporarily during migration, use AsyncEngine.run_sync() with caution — it spawns a greenlet bridge call that still blocks a thread under the hood.

How do I share a single engine across multiple FastAPI workers? You don't — and you shouldn't try. Each Uvicorn/Gunicorn worker process gets its own Python interpreter, its own event loop, and its own engine. The create_async_engine() call and async_sessionmaker factory should be module-level globals initialized at import time. The pool is per-process. This is the correct architecture; just size pool_size accordingly across all your worker processes.

Why does pool_pre_ping=True not prevent all stale-connection errors?pool_pre_ping issues a SELECT 1 when a connection is checked out, not during the actual query. If the network drops between the pre-ping and the subsequent query (a very narrow window), the stale-connection error still fires. More importantly, pre-ping adds latency. In ultra-low-latency paths, some teams disable it and instead rely on proper pool_recycle settings plus application-level retry logic on OperationalError.

Is it safe to call engine.dispose() and then continue using the engine? Yes. After dispose(), the pool is invalidated and all existing connections are closed. The next async with engine.connect() call creates a fresh connection and rebuilds the pool transparently. This behavior is used deliberately in Gunicorn's post_fork hook to ensure child processes do not share connections with the parent.

What is the difference between AsyncSession and AsyncConnection?AsyncSession is the ORM-level interface — it tracks the identity map, manages unit-of-work, and lets you work with mapped classes. AsyncConnection is the Core-level interface — it provides execute() / executemany() directly against the engine without ORM overhead, suitable for bulk inserts, raw SQL, or administrative commands. For high-throughput bulk operations, AsyncConnection avoids ORM instrumentation overhead; see the advanced query patterns and bulk data operations pillar for benchmarks.

How do I run Alembic migrations with an async engine? Alembic requires a synchronous connection for run_migrations_online(). The standard pattern is to use AsyncEngine.sync_engine or to call asyncio.run() with async_engine.connect() inside env.py, then call conn.run_sync(do_run_migrations). Full wiring is covered in the configuring Alembic with async SQLAlchemy engines guide.