Handling Connection Leaks and Pool Exhaustion in SQLAlchemy Async

Connection leaks and pool exhaustion are the most operationally damaging failure modes in async SQLAlchemy applications — and they share a deceptive trait: they build silently before erupting as a QueuePool limit of size X overflow Y reached, connection timed out, timeout 30 error under load. This guide covers the Async Engines, Dialects, and Connection Pooling layer's most critical operational concern: diagnosing why connections are not returning to the pool, fixing the async context-manager discipline that prevents leaks, and monitoring pool health with events and structured logging.

Concept & Execution Model

SQLAlchemy 2.0's async pool — AsyncAdaptedQueuePool — wraps an asyncio.Queue to manage a bounded set of real database connections. The lifecycle is simple in theory: your code checks out a connection, runs queries, and checks it back in. Pool exhaustion happens when more checkouts accumulate than the pool's pool_size + max_overflow ceiling allows, and the next checkout waits past pool_timeout seconds (default: 30) without a slot becoming free.

What makes async applications uniquely prone to exhaustion is the gap between when a connection is checked out and when it is checked in. In synchronous code, Python's try/finally and garbage collection eventually reclaim connections. In async code, an unhandled exception that aborts a coroutine mid-way, or a fire-and-forget background task that holds a session, will keep the connection checked out until the event loop is torn down — or until the pool times out.

Pool Checkout / Checkin Lifecycle and Leak Points Diagram showing the lifecycle of a connection from the pool: normal checkout/checkin vs. three leak vectors — unhandled exception, missing await on close, and background task holding a connection. AsyncAdapted QueuePool pool_size=10 max_overflow=5 pool_timeout=30 checkout async with engine.connect() checkin ✓ Unhandled Exception / Cancel LEAK Missing await session.close() Background task holding session LEAK QueuePool limit reached connection timed out Solid teal = normal flow Dashed red = leak path

The pool's three tunable dials control the envelope:

  • pool_size — persistent connections kept alive between requests. Default: 5.
  • max_overflow — additional connections allowed above pool_size during spikes. Default: 10.
  • pool_timeout — seconds a checkout waits before raising sqlalchemy.exc.TimeoutError. Default: 30.

When pool_size + max_overflow connections are all checked out and a new checkout arrives, SQLAlchemy blocks in an asyncio wait for up to pool_timeout seconds. If no connection is returned in time, the distinctive error fires:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 10 overflow 5 reached,
connection timed out, timeout 30.00

Understanding which of the three leak vectors above is responsible determines the fix.

Distinguishing Exhaustion from Leaks

Pool exhaustion and connection leaks are often conflated but require different diagnoses:

  • True exhaustion — all connections are legitimately in use handling concurrent requests. pool.checkedout() tracks close to pool_size + max_overflow, but connections are being returned as requests complete. The fix is to tune the pool ceiling upward (if the database can absorb it) or reduce query latency.
  • Slow leaks — connections are acquired but not returned due to application bugs. pool.checkedout() climbs monotonically across a deployment's lifetime. Eventually no slots remain and every new request times out. The fix is code — context-manager discipline, not pool tuning.
  • Burst leaks — a specific code path (a problematic endpoint, a batch job, a cron task) acquires connections without releasing them. The pool exhausts under that specific traffic pattern. The fix requires identifying which coroutine pathway does not close its session.

The fastest diagnostic: enable echo_pool=True, tail the application log during the incident, and count checked out events against checked in events over a rolling window. A growing deficit confirms a leak.

Query Construction & Async Execution Patterns

Before diagnosing leaks, the correct baseline pattern for every database interaction must be established. The non-negotiable rule: every connection and session must be obtained via async with, never via bare await engine.connect() without a context manager.

The async with guarantee works because SQLAlchemy's AsyncSession and AsyncConnection implement Python's async context manager protocol (__aenter__ / __aexit__). The __aexit__ method is called regardless of whether the block exits normally, via an exception, or via asyncio.CancelledError. Inside __aexit__, SQLAlchemy rolls back any open transaction and places the underlying connection back into the pool queue. Without async with, you must manually call await session.close() — and if an exception interrupts the coroutine before that call, the connection is permanently lost from the pool's perspective until engine.dispose() is called or the process exits.

# Correct — connection is guaranteed to return to the pool
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy import select, text

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db.internal/app",
    pool_size=10,
    max_overflow=5,
    pool_timeout=30,
    pool_recycle=1800,
    echo_pool=True,  # logs every checkout/checkin; disable in production
)

AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)


async def fetch_order(order_id: int) -> dict | None:
    async with AsyncSessionLocal() as session:
        stmt = select(Order).where(Order.id == order_id)
        result = await session.execute(stmt)
        order = result.scalar_one_or_none()
        return {"id": order.id, "total": str(order.total)} if order else None

The async with AsyncSessionLocal() as session: block is the checkin guarantee. When the block exits — normally, via exception, or via asyncio.CancelledError — SQLAlchemy's __aexit__ rolls back any open transaction and returns the underlying connection to the pool.

The pattern that causes most leaks is acquiring a session or connection without async with:

# WRONG — connection leaks if an exception is raised before .close()
session = AsyncSessionLocal()
result = await session.execute(select(Order))
# ... if this raises, session.close() is never called
await session.close()

For raw Core connections the same discipline applies:

# Correct — Core connection with explicit transaction
async with engine.connect() as conn:
    async with conn.begin():
        result = await conn.execute(
            select(Order).where(Order.status == "pending")
        )
        rows = result.fetchall()
# connection returns to pool here, even on exception

State Management & Session Boundaries

Long Transactions Hold Connections

A connection checked out to an AsyncSession is not returned to the pool until the session is closed or the connection is explicitly released. This means a long-running transaction — a multi-step business process that keeps the session alive for seconds or minutes — holds a pool slot for its entire duration.

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession

# Anti-pattern: session spans an expensive external call
async def process_invoice_with_external_call(session: AsyncSession, invoice_id: int) -> None:
    invoice = await session.get(Invoice, invoice_id)
    invoice.status = "processing"
    await session.flush()

    # PROBLEM: connection is held during the entire HTTP call
    external_result = await call_payment_gateway(invoice)  # may take 2-10 seconds

    invoice.status = "paid" if external_result.success else "failed"
    await session.commit()

The fix is to commit and release the connection before the slow external call, then reopen a session for the final state update:

async def process_invoice_correctly(invoice_id: int) -> None:
    # Phase 1: mark as processing and release connection
    async with AsyncSessionLocal() as session:
        async with session.begin():
            invoice = await session.get(Invoice, invoice_id)
            invoice.status = "processing"
    # connection returned to pool here

    # External call without holding a connection
    external_result = await call_payment_gateway(invoice_id)

    # Phase 2: write final state with a fresh connection
    async with AsyncSessionLocal() as session:
        async with session.begin():
            invoice = await session.get(Invoice, invoice_id)
            invoice.status = "paid" if external_result.success else "failed"

Background Tasks and Fire-and-Forget Patterns

A common pitfall in FastAPI applications is passing a session into a BackgroundTask or asyncio.create_task(). The session was created inside a request's dependency, which will close() it when the request lifecycle ends — but the background task may still be running and using it.

from fastapi import BackgroundTasks, Depends
from sqlalchemy.ext.asyncio import AsyncSession

# WRONG — session closed by FastAPI dependency before background task finishes
async def send_confirmation_email(session: AsyncSession, order_id: int) -> None:
    order = await session.get(Order, order_id)  # session may already be closed
    await email_service.send(order.customer_email, order.id)

@app.post("/orders/")
async def create_order(background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session)):
    order = Order(status="new")
    session.add(order)
    await session.commit()
    background_tasks.add_task(send_confirmation_email, session, order.id)  # DANGER
    return {"id": order.id}

The correct pattern gives the background task its own session scope:

# Correct — background task owns its own session lifecycle
async def send_confirmation_email_safe(order_id: int) -> None:
    async with AsyncSessionLocal() as session:
        order = await session.get(Order, order_id)
        if order:
            await email_service.send(order.customer_email, order.id)

@app.post("/orders/")
async def create_order(background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session)):
    order = Order(status="new")
    session.add(order)
    await session.commit()
    await session.refresh(order)
    order_id = order.id
    background_tasks.add_task(send_confirmation_email_safe, order_id)  # safe
    return {"id": order_id}

Advanced Pool Monitoring Patterns

echo_pool and Structured Event Logging

SQLAlchemy provides two complementary ways to observe the pool in real time. The echo_pool=True engine parameter (or echo_pool="debug") writes every checkout, checkin, connect, and disconnect event to Python's logging system under the sqlalchemy.pool logger:

import logging

# Route pool events to your structured logger
logging.getLogger("sqlalchemy.pool").setLevel(logging.DEBUG)

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db.internal/app",
    pool_size=10,
    max_overflow=5,
    echo_pool=True,
)

In production, prefer attaching event listeners to emit structured metrics rather than printing raw SQL logs. The checkout, checkin, connect, and close pool events fire synchronously on the sync_engine:

from sqlalchemy import event
from sqlalchemy.pool import Pool
import time
import logging

logger = logging.getLogger("app.pool")

_checkout_times: dict[int, float] = {}


def attach_pool_telemetry(engine) -> None:
    sync_pool = engine.sync_engine.pool

    @event.listens_for(sync_pool, "checkout")
    def on_checkout(dbapi_conn, conn_record, conn_proxy) -> None:
        conn_id = id(dbapi_conn)
        _checkout_times[conn_id] = time.monotonic()
        logger.debug(
            "pool.checkout",
            extra={
                "pool_size": sync_pool.size(),
                "checked_out": sync_pool.checkedout(),
                "overflow": sync_pool.overflow(),
            },
        )

    @event.listens_for(sync_pool, "checkin")
    def on_checkin(dbapi_conn, conn_record) -> None:
        conn_id = id(dbapi_conn)
        held_for = time.monotonic() - _checkout_times.pop(conn_id, time.monotonic())
        logger.debug(
            "pool.checkin",
            extra={
                "held_seconds": round(held_for, 3),
                "pool_size": sync_pool.size(),
                "checked_out": sync_pool.checkedout(),
            },
        )

    @event.listens_for(sync_pool, "connect")
    def on_connect(dbapi_conn, conn_record) -> None:
        logger.info("pool.new_connection — pool growing")

    @event.listens_for(sync_pool, "close")
    def on_close(dbapi_conn, conn_record) -> None:
        logger.info("pool.connection_closed — pool shrinking")

The held_seconds metric is the most useful signal: a connection held for much longer than your median query time (say, >5 seconds when queries average 50 ms) almost certainly represents a leak.

Prometheus Integration

For production Prometheus scraping, expose the pool status on a health endpoint:

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncEngine

app = FastAPI()


def get_pool_metrics(engine: AsyncEngine) -> dict:
    pool = engine.sync_engine.pool
    return {
        "pool_size": pool.size(),
        "checked_out": pool.checkedout(),
        "overflow": pool.overflow(),
        "checkedin": pool.checkedin(),
    }


@app.get("/health/pool")
async def pool_health(engine: AsyncEngine = Depends(get_engine)):
    metrics = get_pool_metrics(engine)
    # Alert if more than 80% of capacity is consumed
    capacity = metrics["pool_size"] + max(0, metrics["overflow"])
    utilization = metrics["checked_out"] / max(capacity, 1)
    return {
        **metrics,
        "utilization_pct": round(utilization * 100, 1),
        "status": "warn" if utilization > 0.8 else "ok",
    }

Detecting Stale Connections Alongside Leaks

Leaks and stale connections are distinct problems that manifest similarly. A stale connection is not "leaked" — it was checked in correctly — but the underlying TCP socket was closed by the database or a network device while it sat idle in the pool. The next checkout hands out a dead socket, producing asyncpg.exceptions.ConnectionDoesNotExistError or OperationalError: server closed the connection unexpectedly.

The dedicated fix for stale connections is pool_pre_ping=True, which validates the connection before handing it out. See the guide to configuring pool pre-ping to handle stale connections for the full treatment including pool_recycle as a complement.

The key observable distinction: stale-connection errors appear on the first request after an idle period (the pool had connections sitting unused during the idle window). Leak-induced exhaustion appears under concurrent load (many requests in flight simultaneously). If your error traces always include a long idle gap before the first error, stale connections are the culprit. If the errors appear during peak traffic and pool.checkedout() is near the ceiling, you have a leak or genuine concurrency demand.

Hybrid Architectures & Migration Strategies

Transitioning from Synchronous Code to Async Context Managers

The most common source of connection leaks in applications being migrated from SQLAlchemy 1.4 is old synchronous cleanup patterns that don't translate to async:

# 1.4 sync pattern — works because CPython's GC closes on dealloc
session = Session(engine)
try:
    result = session.execute(select(User))
finally:
    session.close()

# 2.0 async — GC timing is unpredictable; __del__ may never run in async contexts
# This is NOT safe:
session = AsyncSession(engine)
try:
    result = await session.execute(select(User))
finally:
    await session.close()  # OK but verbose; use async with instead

The canonical 2.0 async replacement:

# 2.0 async — correct
async with AsyncSessionLocal() as session:
    result = await session.execute(select(User))
    users = result.scalars().all()
# session.close() is called automatically, connection returns to pool

Mixing Core and ORM in the Same Pool

When using Core engine.connect() alongside ORM AsyncSession in the same application, both draw from the same pool. A Core connection that starts a transaction but is not closed before a session checkout can cause deadlocks if both operate on the same rows. Keep session and connection contexts short and non-overlapping:

from sqlalchemy.ext.asyncio import AsyncConnection, AsyncSession

async def bulk_update_then_fetch(product_ids: list[int]) -> list[Product]:
    # Core bulk update — short-lived, committed before ORM fetch
    async with engine.connect() as conn:
        await conn.execute(
            Product.__table__.update()
            .where(Product.id.in_(product_ids))
            .values(updated_at=func.now())
        )
        await conn.commit()
    # Core connection returned to pool before ORM session opens
    async with AsyncSessionLocal() as session:
        result = await session.execute(
            select(Product).where(Product.id.in_(product_ids))
        )
        return result.scalars().all()

NullPool for Serverless and Short-Lived Workers

Celery workers, AWS Lambda, and Cloud Run functions should use NullPool to avoid holding connections across invocations. The guide on using SQLAlchemy async with Celery task workers explains the per-task engine lifecycle in detail.

from sqlalchemy.pool import NullPool
from sqlalchemy.ext.asyncio import create_async_engine

# For Celery workers or Lambda — no persistent pool
worker_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db.internal/app",
    poolclass=NullPool,
)

Graceful Shutdown and Connection Draining

When deploying a new version of your application or responding to a SIGTERM from Kubernetes or ECS, the engine must be disposed cleanly to return all pooled connections to the database server. Simply killing the process leaves the database holding open connections until its own tcp_keepalives_idle timer fires — which may take minutes, temporarily reducing available connections for the new deployment.

import asyncio
import signal
from sqlalchemy.ext.asyncio import AsyncEngine


def install_shutdown_handler(engine: AsyncEngine, loop: asyncio.AbstractEventLoop) -> None:
    """Register SIGTERM and SIGINT handlers to gracefully drain the connection pool."""

    async def _shutdown() -> None:
        # Allow in-flight requests up to 10 seconds to complete
        await asyncio.sleep(0.5)
        await engine.dispose()

    def _handle_signal() -> None:
        loop.create_task(_shutdown())

    loop.add_signal_handler(signal.SIGTERM, _handle_signal)
    loop.add_signal_handler(signal.SIGINT, _handle_signal)

In FastAPI, the ASGI lifespan context manager is the canonical hook:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@db.internal/app",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
    pool_recycle=1800,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup — engine already created at module level
    yield
    # Shutdown — return all pooled connections to the database
    await engine.dispose()


app = FastAPI(lifespan=lifespan)

engine.dispose() closes every idle connection in the pool immediately. Connections that are currently checked out continue to function until their coroutine returns them, at which point they are closed rather than returned to the pool. This gives in-flight requests a clean finish without stranding their connections.

Production Pitfalls & Anti-Patterns

  • Bare session = AsyncSessionLocal() without async with — if an exception interrupts the coroutine before await session.close(), the connection is never returned. Use async with AsyncSessionLocal() as session: unconditionally.
  • asyncio.create_task() capturing a session from outer scope — the task runs concurrently and may outlive the session's owning context. asyncio.CancelledError in the parent will close the session while the child task is mid-query, raising sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction is rolled back. Give every task its own session scope.
  • Ignoring pool_timeout as a diagnosis tool — setting pool_timeout=0 to "fix" exhaustion makes the TimeoutError raise immediately and masks the real problem. Instead, lower it temporarily to 5–10 seconds to surface leaks faster in staging.
  • pool_size set to the database's max_connections limit — leaves no connections for DBA tools, migration runners, or monitoring. Leave at least 10–20% headroom: if Postgres is configured with max_connections=100, set pool_size * replicas + max_overflow * replicas ≤ 80.
  • Long transactions spanning external HTTP calls or message queue waits — a single slow external API call can monopolize a pool slot for 5–30 seconds. Commit and release the connection before the external call; reopen after. This is the pattern shown in the session boundaries section above.
  • expire_on_commit=True (the default) causing lazy-load attempts after commit — in async code, accessing an expired attribute after session.commit() raises MissingGreenlet or greenlet_spawn has not been called. Use expire_on_commit=False in async session factories, or explicitly refresh the attributes you need before closing the session.

Frequently Asked Questions

What is the exact error message for pool exhaustion and what triggers it? The full error is sqlalchemy.exc.TimeoutError: QueuePool limit of size X overflow Y reached, connection timed out, timeout 30.00 (Background on this error at: ...). It fires when pool_size + max_overflow connections are all checked out simultaneously and the pool_timeout wait expires. The root cause is almost always connections not being returned, not the pool being too small.

How do I tell the difference between a leak and genuine high concurrency? Check pool.checkedout() and pool.checkedin() metrics during the incident. If checkedout climbs monotonically without checkedin keeping pace, you have a leak — connections are leaving the pool but not returning. If both values oscillate near the ceiling, you have genuine concurrency demand and need to either add replicas, increase pool_size (if the DB can handle it), or reduce query latency.

Can I use pool_pre_ping to fix connection leaks? No. pool_pre_ping detects stale connections (sockets closed by the database while idle in the pool) and discards them before checkout. It has no effect on connections that are checked out and never returned — those are leaks, and they require fixing the application's context-manager discipline.

Why does my FastAPI app leak connections only under high load, not during local testing? Under low concurrency, Python's event loop processes coroutines sequentially, and even improperly scoped sessions often close before the pool ceiling is reached. Under high load, many coroutines run concurrently and the marginal held connection tips the pool over the limit. The code was always wrong — load exposes it.

How do I safely increase pool_size without destabilizing the database? Calculate: (DB max_connections − DBA/monitoring headroom) ÷ number of application replicas, then set that as pool_size with max_overflow at 20–30% of pool_size. For cloud databases (RDS, Cloud SQL), also set pool_recycle to a value below the database's idle connection timeout (typically 1800–3600 seconds for Postgres on RDS).