Integrating SQLAlchemy Async with FastAPI and Starlette

Integrating async SQLAlchemy sessions into FastAPI requires three interlocking pieces: an AsyncEngine created once at startup via the async engines and connection pooling layer, a per-request AsyncSession delivered through FastAPI's Depends yield-dependency, and a commit/rollback strategy that survives unhandled exceptions without leaking connections back to the pool in a broken state.

Concept & Execution Model

FastAPI and Starlette run on the ASGI specification — a single-threaded, cooperative concurrency model driven by asyncio. Every request handler is a coroutine scheduled by the event loop. Blocking that loop with synchronous I/O — a plain psycopg2 query, a synchronous Session.execute(), a time.sleep() — stalls every other in-flight request until the call returns. One blocked request under load translates directly into elevated P99 latency for all concurrent requests sharing that process.

SQLAlchemy 2.0 eliminates this hazard by providing a fully async execution path through AsyncSession and AsyncEngine. The async engine delegates actual network I/O to an async-capable DBAPI driver (asyncpg, psycopg v3 async, or aiosqlite), which suspends the coroutine during socket waits and immediately yields control back to the event loop. From the event loop's perspective, a database query looks identical to any other awaited I/O operation: it occupies no CPU and blocks no other coroutine while the network round-trip is in flight.

The interaction model for a single request is:

  1. Application starts → create_async_engine registers driver and pool parameters. Connections are not yet opened.
  2. First request arrives → FastAPI resolves Depends(get_db_session). The dependency opens an AsyncSession and calls session.begin(), which lazily checks out a connection from the pool.
  3. Route handler executes → ORM operations are await-ed. The coroutine suspends during each DBAPI round-trip, releasing the event loop to process other requests.
  4. Route handler returns a value → FastAPI serializes the response. The dependency's finally block commits (or rolls back), then closes the session and returns the connection to the pool.
  5. Application shuts down → the lifespan's shutdown block calls await engine.dispose(), which drains and closes all idle pooled connections.

This lifecycle guarantees that no connection is held across requests and that exceptions at any point always trigger rollback before the connection is recycled. The session is a short-lived object: one per request, created by the dependency, destroyed when the dependency's context manager exits.

FastAPI AsyncSession request lifecycle Diagram showing an HTTP request flowing through Depends(get_db_session), which opens an AsyncSession from the session factory, borrows a connection from the asyncpg QueuePool, executes the query, then returns the connection on response completion. FastAPI lifespan: engine created on startup, disposed on shutdown AsyncEngine + QueuePool (pool_size=20) HTTP Request Depends(get_db_session) async_session_factory() yield → commit/rollback expire_on_commit=False Route Handler await session.scalars(select(...)) await session.execute(insert(...)) await session.flush() checkout conn slot 1 conn slot 2 conn slot 3 ← overflow … asyncpg QueuePool return on close HTTP Response + serialized JSON

Query Construction & Async Execution Patterns

All SQLAlchemy 2.0 queries use the select() construct. Execution always requires an await — there is no implicit synchronous fallback in AsyncSession. The statement object is the same whether you use a sync or async session; only the session type and the await keyword differ.

Engine and session factory setup

# Full production setup: engine, session factory, lifespan
from contextlib import asynccontextmanager
from typing import AsyncGenerator

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/appdb"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,          # base pool: matches expected concurrent DB-holding coroutines
    max_overflow=10,       # burst headroom above pool_size
    pool_recycle=1800,     # recycle before RDS/Cloud SQL idle timeout (typically 30 min)
    pool_pre_ping=True,    # validate connection before checkout — catches stale TCP
    echo=False,            # never True in production; logs every SQL statement
)

# expire_on_commit=False is critical in async FastAPI dependencies.
# Without it, every attribute read after commit raises MissingGreenlet.
# Full explanation: /async-engines-dialects-and-connection-pooling/
#   integrating-sqlalchemy-async-with-fastapi-and-starlette/
#   using-expire-on-commit-false-in-fastapi-dependencies/
async_session_factory = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    yield  # connections open lazily on first checkout
    await engine.dispose()  # drain pool: close all idle connections on shutdown


app = FastAPI(lifespan=lifespan)

Common query patterns

# Runnable 2.0 query patterns — all require await
from sqlalchemy import delete, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload

from myapp.models import Invoice, Order, User


async def get_user_by_email(session: AsyncSession, email: str) -> User | None:
    stmt = select(User).where(User.email == email)
    return await session.scalar(stmt)  # returns first row or None


async def get_pending_orders_with_items(
    session: AsyncSession, user_id: int
) -> list[Order]:
    stmt = (
        select(Order)
        .where(Order.user_id == user_id, Order.status == "pending")
        .options(selectinload(Order.items))  # eager-load to avoid MissingGreenlet
        .order_by(Order.created_at.desc())
        .limit(50)
    )
    result = await session.scalars(stmt)
    return list(result.all())


async def mark_invoices_overdue(
    session: AsyncSession, cutoff_date: str
) -> int:
    stmt = (
        update(Invoice)
        .where(Invoice.due_date < cutoff_date, Invoice.status == "outstanding")
        .values(status="overdue")
        .execution_options(synchronize_session=False)
    )
    result = await session.execute(stmt)
    return result.rowcount


async def delete_cancelled_orders(session: AsyncSession) -> int:
    stmt = delete(Order).where(Order.status == "cancelled")
    result = await session.execute(stmt)
    return result.rowcount

The contrast with SQLAlchemy 1.x is clear: session.query(User).filter_by(email=email).first() becomes await session.scalar(select(User).where(User.email == email)). The select() construct is composable — you can build it programmatically, pass it between functions, and add .where() clauses dynamically, all before the final await sends it to the database.

State Management & Session Boundaries

The yield dependency pattern

The canonical request-scoped session dependency uses a generator function with yield. FastAPI resolves dependencies before calling the route handler and tears them down (by resuming the generator past yield) after the handler returns — even if the handler raises an exception:

from typing import AsyncGenerator

from fastapi import Depends, HTTPException, status
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession


async def get_db_session() -> AsyncGenerator[AsyncSession, None]:
    """
    Yields a request-scoped AsyncSession with automatic transaction management.

    Control flow:
    - session.begin() opens a transaction on first DBAPI operation
    - On clean route return: context manager __aexit__ commits
    - On IntegrityError: convert to HTTP 409 (rollback is automatic via session.begin())
    - On other SQLAlchemyError: convert to HTTP 500
    - always: session.close() returns the connection to the pool
    """
    async with async_session_factory() as session:
        async with session.begin():
            try:
                yield session
            except IntegrityError as exc:
                # session.begin()'s __aexit__ handles rollback automatically;
                # we just need to convert to a structured HTTP error
                raise HTTPException(
                    status_code=status.HTTP_409_CONFLICT,
                    detail=f"Constraint violation: {exc.orig}",
                ) from exc
            except SQLAlchemyError as exc:
                raise HTTPException(
                    status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                    detail="Database error — operation was rolled back.",
                ) from exc

The async with session.begin() block automatically commits on a clean exit and rolls back on any exception, including exceptions raised after yield when FastAPI's dependency system throws the exception back into the generator. This is Python generator semantics: generator.throw(exc) resumes the generator at the yield with the exception raised there, so your except clauses inside the dependency fire correctly.

Commit and rollback per request

In most CRUD APIs, one transaction per request is the right default. With the yield dependency above, commit is implicit: the route handler adds objects to the session and returns; the dependency commits:

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import User
from myapp.schemas import UserCreate, UserRead

router = APIRouter()


@router.post("/users/", response_model=UserRead, status_code=201)
async def create_user(
    payload: UserCreate,
    session: AsyncSession = Depends(get_db_session),
) -> User:
    user = User(email=payload.email, display_name=payload.display_name)
    session.add(user)
    # No explicit commit — session.begin() commits when this coroutine returns
    # and the dependency's context manager exits normally.
    # expire_on_commit=False keeps user.id and user.email readable for Pydantic.
    return user

For operations requiring intermediate PKs (e.g., insert a parent, then insert children referencing the parent's ID), use flush():

@router.post("/orders/", response_model=OrderRead, status_code=201)
async def create_order(
    payload: OrderCreate,
    session: AsyncSession = Depends(get_db_session),
) -> Order:
    order = Order(user_id=payload.user_id, total=payload.total)
    session.add(order)
    await session.flush()    # assigns order.id from the DB sequence, within the txn

    # order.id is now populated — safe to use in child rows or side-effects
    for item_data in payload.items:
        session.add(OrderItem(order_id=order.id, **item_data.model_dump()))

    # Commit happens in the dependency's __aexit__
    return order

expire_on_commit and the MissingGreenlet trap

When async_sessionmaker is built without expire_on_commit=False, SQLAlchemy marks every ORM instance attribute as expired after session.commit(). The next attribute access triggers a lazy SQL SELECT. In an AsyncSession there is no synchronous I/O path, so that lazy SELECT raises MissingGreenlet — typically surfacing as a Pydantic ValidationError because Pydantic is the code that tries to read the expired attribute.

The guide on expire_on_commit=False for FastAPI dependencies covers the precise commit timeline, all related error variants, and the trade-offs of disabling attribute expiry.

Advanced FastAPI Patterns

Streaming responses with server-side cursors

For large result sets — millions of invoice rows, full product catalogs — loading everything into memory before streaming wastes RAM and delays first-byte delivery. FastAPI's StreamingResponse paired with SQLAlchemy's stream_scalars solves this:

import json
from collections.abc import AsyncIterator

from fastapi import Depends
from fastapi.responses import StreamingResponse
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Invoice


async def iter_invoices_ndjson(session: AsyncSession) -> AsyncIterator[str]:
    # yield_per instructs asyncpg to use a server-side cursor,
    # fetching 500 rows per network round-trip
    stmt = (
        select(Invoice)
        .order_by(Invoice.created_at)
        .execution_options(yield_per=500)
    )
    async with await session.stream_scalars(stmt) as result:
        async for invoice in result:
            yield (
                json.dumps({
                    "id": invoice.id,
                    "total": str(invoice.total),
                    "status": invoice.status,
                    "due_date": invoice.due_date.isoformat(),
                })
                + "\n"
            )


@router.get("/invoices/export")
async def export_invoices(
    session: AsyncSession = Depends(get_db_session),
) -> StreamingResponse:
    return StreamingResponse(
        iter_invoices_ndjson(session),
        media_type="application/x-ndjson",
        headers={"Content-Disposition": "attachment; filename=invoices.ndjson"},
    )

yield_per=500 limits asyncpg's fetch buffer to 500 rows. Memory usage stays constant regardless of result set size. The generator yields one JSON line per row; FastAPI sends each chunk to the client as it is produced.

Background tasks with their own sessions

FastAPI's BackgroundTasks run after the HTTP response is sent, outside the request's dependency graph. The request-scoped AsyncSession is closed and its transaction committed (or rolled back) before the background task runs. Never pass the request session to a background task — it is already closed and its connection returned to the pool.

from fastapi import BackgroundTasks, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Order


async def send_confirmation_email_task(order_id: int) -> None:
    # Background task MUST open its own session — never reuse the request session
    async with async_session_factory() as bg_session:
        async with bg_session.begin():
            order = await bg_session.get(Order, order_id)
            if order is not None:
                await send_email(order.user_email, order_id)


@router.post("/orders/{order_id}/confirm")
async def confirm_order(
    order_id: int,
    background_tasks: BackgroundTasks,
    session: AsyncSession = Depends(get_db_session),
) -> dict:
    order = await session.get(Order, order_id)
    if order is None:
        raise HTTPException(status_code=404, detail="Order not found")
    order.status = "confirmed"
    # The task runs after the response is sent; it sees the committed status
    background_tasks.add_task(send_confirmation_email_task, order_id)
    return {"status": "confirmed", "order_id": order_id}

Multiple dependent queries in a single request

When a route needs data from several tables, all within the same transaction, pass the same session to multiple helper functions:

from myapp.models import Product, Tenant, User


@router.get("/dashboard/{tenant_id}")
async def tenant_dashboard(
    tenant_id: int,
    session: AsyncSession = Depends(get_db_session),
) -> dict:
    tenant = await session.get(Tenant, tenant_id)
    if tenant is None:
        raise HTTPException(status_code=404, detail="Tenant not found")

    user_count_result = await session.execute(
        select(func.count()).select_from(User).where(User.tenant_id == tenant_id)
    )
    product_count_result = await session.execute(
        select(func.count()).select_from(Product).where(Product.tenant_id == tenant_id)
    )

    return {
        "tenant": tenant.name,
        "users": user_count_result.scalar_one(),
        "products": product_count_result.scalar_one(),
    }

All three queries run within the same transaction, ensuring a consistent snapshot.

Health-check endpoint without consuming a pool slot

A /healthz endpoint that opens a full AsyncSession consumes a pool connection on every Kubernetes liveness probe. For high-frequency probes, this can exhaust the pool before real traffic even hits. Use a raw connection from the engine instead:

from fastapi import APIRouter
from sqlalchemy import text

health_router = APIRouter()


@health_router.get("/healthz")
async def health_check() -> dict:
    # Borrow a raw connection without going through the ORM session layer
    async with engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
    return {"status": "ok"}

engine.connect() checks out a connection from the pool and releases it immediately after the async with block exits. No AsyncSession is created, no identity map is populated, and the connection is back in the pool in under a millisecond. This keeps the pool free for route handlers during probe bursts.

Read-only routes and autocommit mode

For read-heavy routes that never write, you can skip the session.begin() overhead entirely by using execution_options(isolation_level="AUTOCOMMIT") on the engine connection. This is useful for reporting endpoints that only SELECT:

from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession


async def get_db_session_readonly() -> AsyncGenerator[AsyncSession, None]:
    """Read-only session using AUTOCOMMIT — no transaction overhead."""
    async with async_session_factory() as session:
        await session.execute(text("SET TRANSACTION READ ONLY"))
        yield session
        # No commit needed — AUTOCOMMIT mode; no rollback possible on read-only txn

The SET TRANSACTION READ ONLY hint tells PostgreSQL the transaction will not write, enabling it to skip certain lock acquisition steps and use replica-routing if a pgBouncer or Citus proxy is in the path. Do not use this pattern for routes that mix reads with writes in the same session — the first INSERT will raise a PostgreSQL error.

Hybrid Architectures & Migration Strategies

Migrating synchronous SQLAlchemy to async FastAPI

Typical migration from a Flask + synchronous SQLAlchemy codebase:

StepAction
1Swap create_engine for create_async_engine with postgresql+asyncpg:// URL
2Replace sessionmaker with async_sessionmaker(expire_on_commit=False)
3Convert route functions from def to async def
4Replace session.query(Model).filter(...) with select(Model).where(...) + await session.scalars(...)
5Replace bare session.commit() with await session.commit() (or rely on session.begin() context manager)
6Wrap any remaining sync ORM code in await session.run_sync(fn)
7Add FastAPI lifespan with await engine.dispose() on shutdown
8Audit all relationship accesses — add selectinload or joinedload options to every query that touches a relationship in the route or serializer

The most common migration failures are (a) forgetting await on session.execute() — Python silently returns a coroutine object instead of a result, and (b) missing selectinload on relationships that previously lazy-loaded without issue in the sync session.

Mixing Core and ORM in async FastAPI

Core-level execution is available directly on AsyncSession. Use Core for bulk writes and ORM for reads where the identity map and relationship loading add value:

from sqlalchemy import insert
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import Product


async def bulk_create_products(
    session: AsyncSession, rows: list[dict]
) -> list[int]:
    # Core INSERT .. RETURNING: no ORM overhead, no identity map population
    stmt = insert(Product).returning(Product.id)
    result = await session.execute(stmt, rows)
    return list(result.scalars().all())

For the driver choice powering this — asyncpg vs psycopg3 in async FastAPI — see Choosing Between asyncpg and psycopg Async Drivers, which includes empirical benchmarks and connection-overhead comparisons relevant to FastAPI throughput characteristics.

Integrating with Alembic migrations

FastAPI apps managed by Alembic need a hybrid configuration: Alembic runs migrations synchronously (it was built for sync engines), but your app's runtime engine is async. The standard pattern wraps an async connection in a sync wrapper for migration execution. Full details are in configuring Alembic with async SQLAlchemy engines.

Production Pitfalls & Anti-Patterns

  • Not disposing the engine on shutdown. If await engine.dispose() is omitted from the lifespan shutdown, FastAPI worker processes leave zombie connections in the pool when the process exits. The database server eventually closes them by TCP timeout, causing OperationalError: SSL connection has been closed unexpectedly on the next startup. Fix: always call await engine.dispose() in the lifespan's shutdown path.
  • Sharing an AsyncSession across concurrent coroutines. AsyncSession is not safe for concurrent access from multiple coroutines. Two coroutines writing to the same session simultaneously corrupt the internal DBAPI state and produce InvalidRequestError: A transaction is already begun on this Session. Fix: one session per request, never shared across asyncio.gather() tasks.
  • Accessing expired attributes after commit. If expire_on_commit=True (the default), reading user.email after await session.commit() raises MissingGreenlet because it attempts a lazy SELECT through a closed async context. Fix: set expire_on_commit=False in async_sessionmaker. Full diagnosis is in fixing MissingGreenlet / GreenletSpawnError.
  • Lazy-loading relationships without eager loading. Accessing order.items in an async route without specifying .options(selectinload(Order.items)) on the query raises MissingGreenlet. SQLAlchemy's lazy loader is synchronous; it cannot function inside AsyncSession. Fix: always use selectinload, joinedload, or subqueryload for every relationship that will be accessed after the query returns.
  • Using BackgroundTasks with the request session. The request session is closed when the response is sent — before the background task runs. Fix: always open a fresh async_session_factory() context inside the background task function, never accept the request's AsyncSession as a parameter.
  • Not setting pool_pre_ping=True in cloud deployments. Cloud databases (RDS, Cloud SQL, Neon) close idle connections after a timeout (often 5–10 minutes). Without pool_pre_ping, the next request on a stale connection raises OperationalError. Handling stale connections comprehensively is covered in handling connection leaks and pool exhaustion.
  • Omitting await session.flush() before using auto-generated PKs. After session.add(obj), the object's id is None until flushed. Passing obj.id to a child row insert or side-effect before flushing inserts NULL for the foreign key. Fix: call await session.flush() after session.add() whenever you need the DB-generated PK within the same transaction.

Frequently Asked Questions

How do I prevent connection pool exhaustion in high-concurrency FastAPI deployments? Set pool_size equal to the number of async worker coroutines that may simultaneously hold an open database connection — typically far less than your request concurrency, since most request time is spent in Python, Pydantic serialization, or the network stack, not the DB. Use max_overflow for burst headroom. Configure pool_recycle=1800 to preempt DB-side idle timeouts. Always yield sessions via Depends with a finally block to guarantee pool return on error. Monitor pool utilization with Prometheus or SQLAlchemy's pool.status() event hook, and scale pool_size before scaling horizontally.

Can I use session.begin_nested() (SAVEPOINT) inside a FastAPI dependency? Yes. await session.begin_nested() opens a SAVEPOINT inside the outer transaction. This is useful for partial rollback in batch-insert loops — if one item fails a constraint, you roll back only that item's SAVEPOINT and continue with the rest, rather than aborting the entire request transaction. The outermost session.begin() context in the dependency still commits or rolls back the full transaction on request completion.

Why does Pydantic raise ValidationError on fields that are clearly set on the ORM model? Almost always because expire_on_commit=True caused the attribute to be expired after commit, and Pydantic's model_validate(obj) triggered a lazy load that raised MissingGreenlet before Pydantic could read the value. Setting expire_on_commit=False in async_sessionmaker resolves this in the vast majority of cases.

How do I run database migrations (Alembic) in an async FastAPI app? Alembic itself runs synchronously. The recommended approach is a helper that creates a synchronous connection from the async engine using engine.sync_engine and runs alembic.command.upgrade inside that context. The setup for env.py is non-trivial; the full walkthrough is in configuring Alembic with async SQLAlchemy engines.

Is it safe to create the AsyncEngine at module import time, outside the lifespan? Yes. create_async_engine does not open any connections eagerly — connections are checked out lazily on first session.execute(). The engine object is safe to create at module level. What must happen in the lifespan is engine.dispose() on shutdown, to drain idle connections cleanly rather than waiting for OS TCP timeouts.

Can I inject multiple sessions from different databases into a single route? Yes. Define one engine and one async_sessionmaker per database, and one yield-dependency per engine. FastAPI resolves multiple Depends parameters independently, giving you separate, independently-committed sessions in the same route handler. Be aware that there is no distributed transaction coordinator — if one session commits and the other fails, you have a partial write. Use saga patterns or an outbox pattern for cross-database consistency.