Using aiosqlite for Async Tests and Local Development

Use sqlite+aiosqlite:// with poolclass=StaticPool and connect_args={"check_same_thread": False} to run fast, dependency-free async SQLAlchemy tests — this guide, part of the selecting async drivers for SQLite, MySQL, and Postgres reference, walks through the exact setup.

Quick Answer

The minimum viable async SQLite test engine paired with pytest-asyncio:

# Before (sync, legacy SQLAlchemy 1.4 style)
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker

engine = create_engine("sqlite:///:memory:")
Base.metadata.create_all(engine)

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    session.add(User(email="test@example.com", tenant_id=1))
    session.commit()

# After (async, SQLAlchemy 2.0 + aiosqlite)
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool

engine = create_async_engine(
    "sqlite+aiosqlite://",                      # in-memory
    connect_args={"check_same_thread": False},   # required for aiosqlite
    poolclass=StaticPool,                        # share one connection across sessions
)

async def setup_and_test():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with factory() as session:
        async with session.begin():
            session.add(User(email="test@example.com", tenant_id=1))

asyncio.run(setup_and_test())

The StaticPool is the critical piece. Without it, sqlite+aiosqlite:// creates a new connection (and therefore a new, empty database) on every pool checkout. Your create_all runs on one connection, your test session gets a different connection, and you see OperationalError: no such table.

Execution Context & Async Workflow Integration

Why aiosqlite uses a background thread

aiosqlite is not a truly non-blocking async driver. Python's sqlite3 module wraps a C library that performs synchronous file I/O. aiosqlite's approach is to run all sqlite3 operations in a dedicated background thread and expose them as asyncio coroutines via loop.run_in_executor(). Every await session.execute(stmt) dispatches work to that thread, suspends the coroutine, and resumes it when the thread signals completion.

For local development and test workloads, this is invisible — SQLite operations complete in microseconds. For production services where database I/O latency could spike (network-attached storage, NFS-backed containers), the thread-dispatch overhead becomes measurable and the write-lock bottleneck (SQLite's database-wide write lock) dominates throughput at any meaningful concurrency level.

StaticPool and shared connection semantics

StaticPool from sqlalchemy.pool maintains exactly one connection and hands the same connection to every checkout. This is the correct choice for in-memory SQLite because:

  1. Each sqlite3.connect(":memory:") call creates a distinct database instance.
  2. QueuePool (the default) creates new connections on demand, each pointing at a different memory database.
  3. StaticPool ensures that engine.begin() (used for schema creation) and subsequent async_sessionmaker checkouts all touch the same in-memory database.

check_same_thread=False is required because aiosqlite's background thread and the asyncio thread that calls await are different OS threads. Without it, Python's sqlite3 module raises ProgrammingError: SQLite objects created in a thread can only be used in that same thread.

Installing aiosqlite

pip install aiosqlite sqlalchemy[asyncio] pytest-asyncio

Minimum versions: aiosqlite>=0.19.0, SQLAlchemy>=2.0, pytest-asyncio>=0.21. Older aiosqlite versions had internal thread pool inconsistencies that caused sporadic RuntimeError: Event loop is closed failures in long test sessions.

pytest-asyncio fixture patterns

The recommended fixture layout uses scope="session" for the engine and schema creation (expensive, runs once) and function-level scope for sessions with a rollback after each test:

# conftest.py
import asyncio
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool

from myapp.models import Base, User, Order


DATABASE_URL = "sqlite+aiosqlite://"


@pytest.fixture(scope="session")
def event_loop():
    """
    Create a single event loop for the entire test session.
    Required when using session-scoped async fixtures with pytest-asyncio.
    """
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()


@pytest_asyncio.fixture(scope="session")
async def engine():
    """
    Session-scoped async engine. StaticPool keeps one connection alive
    so all fixtures and tests see the same in-memory database.
    """
    _engine = create_async_engine(
        DATABASE_URL,
        connect_args={"check_same_thread": False},
        poolclass=StaticPool,
    )
    async with _engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    yield _engine

    async with _engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    await _engine.dispose()


@pytest_asyncio.fixture
async def session(engine):
    """
    Function-scoped session. Each test gets a fresh session;
    all writes are rolled back after the test completes.
    """
    factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with factory() as _session:
        yield _session
        await _session.rollback()   # undo any changes made during the test

You also need the asyncio_mode setting in pytest.ini (or pyproject.toml) to avoid having to mark every async test individually:

# pytest.ini
[pytest]
asyncio_mode = auto

Or in pyproject.toml:

[tool.pytest.ini_options]
asyncio_mode = "auto"

With asyncio_mode = "auto", pytest-asyncio detects async def test functions automatically. You no longer need @pytest.mark.asyncio on every test.

With this setup, individual tests are straightforward:

# test_users.py
import pytest
import pytest_asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from myapp.models import User


@pytest.mark.asyncio
async def test_create_and_retrieve_user(session: AsyncSession):
    user = User(email="alice@example.com", tenant_id=1)
    session.add(user)
    await session.flush()   # assigns id without committing

    result = await session.execute(
        select(User).where(User.email == "alice@example.com")
    )
    retrieved = result.scalar_one()
    assert retrieved.id is not None
    assert retrieved.tenant_id == 1


@pytest.mark.asyncio
async def test_multiple_users_same_tenant(session: AsyncSession):
    for i in range(5):
        session.add(User(email=f"user{i}@example.com", tenant_id=42))
    await session.flush()

    result = await session.execute(
        select(User).where(User.tenant_id == 42)
    )
    users = result.scalars().all()
    assert len(users) == 5

Resolving Warnings, Errors & Common Mistakes

Error / WarningRoot CauseProduction Fix
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: usersDefault QueuePool creates a new connection per checkout; the in-memory DB on that connection is empty.Add poolclass=StaticPool to create_async_engine().
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same threadaiosqlite's thread and asyncio's thread differ; Python's sqlite3 enforces per-thread ownership.Add connect_args={"check_same_thread": False} to create_async_engine().
ScopeMismatch: You tried to access the function scoped fixture ... with a session scoped request objectUsing a function-scoped fixture inside a session-scoped one in pytest-asyncio.Promote the fixture to scope="session" or restructure so session-scoped fixtures only depend on session-scoped ones.
DeprecationWarning: There is no current event loop (pytest-asyncio >= 0.21)Using a bare asyncio.get_event_loop() without a custom event_loop fixture at session scope.Define an explicit event_loop fixture at scope="session" in conftest.py (see example above).
sqlalchemy.exc.InvalidRequestError: Can't operate on a closed transactionAccessing session after await session.rollback() or outside the async with block.Obtain a fresh session for each test via the fixture; never share session instances across tests.
RuntimeError: Task attached to a different loopMixing session-scoped engine fixture and function-scoped event loop in pytest-asyncio.Use the same scope for event_loop, engine, and any fixture that creates asyncio objects at the same level.
aiofiles.threadpool.sync_to_async warning from aiosqlite internalsaiosqlite version mismatch with the installed asyncio runtime.Pin aiosqlite>=0.19.0; this version unified the internal thread pool implementation.

Advanced aiosqlite Optimization

Seeding reference data once at session scope

Test suites that need reference data (e.g., product categories, permission roles, tenant records) should seed that data in a session-scoped fixture rather than per-test. Combined with function-scoped rollback, this gives each test a clean slate for its own writes while avoiding the cost of re-inserting reference data on every test function:

# conftest.py (continued from above)
from myapp.models import Tenant, Role


@pytest_asyncio.fixture(scope="session")
async def reference_data(engine):
    """
    Insert reference data once. Because StaticPool keeps one connection,
    this data persists for the entire test session and is visible to all tests.
    Each test's session fixture rolls back only its own writes, not these rows.
    """
    factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with factory() as seed_session:
        async with seed_session.begin():
            seed_session.add_all([
                Tenant(id=1, name="Acme Corp", plan="enterprise"),
                Tenant(id=2, name="Beta Ltd", plan="starter"),
                Role(id=1, name="admin"),
                Role(id=2, name="viewer"),
            ])
    return  # data is now committed in the shared in-memory DB


@pytest.mark.asyncio
async def test_order_for_existing_tenant(session: AsyncSession, reference_data):
    """reference_data fixture ensures Tenant(id=1) exists before this runs."""
    order = Order(tenant_id=1, reference="ORD-001", status="pending")
    session.add(order)
    await session.flush()

    result = await session.execute(
        select(Order).where(Order.reference == "ORD-001")
    )
    assert result.scalar_one().tenant_id == 1

The key insight: because the per-test session fixture calls await _session.rollback() at teardown, the Order inserted in the test is undone — but the Tenant rows inserted by reference_data (committed in a separate session before the rollback) survive across tests. This mirrors the standard Django test-case pattern but in fully async form.

Using aiosqlite for local CLI tooling

Beyond tests, aiosqlite is useful in CLI tools and data-pipeline scripts where spinning up a Postgres instance is overkill. A local SQLite file behaves identically to the ORM session interface:

#!/usr/bin/env python3
"""
Local development seed script — populates a SQLite file for offline work.
Switch DATABASE_URL to postgresql+asyncpg:// for staging/production runs.
"""
import asyncio
import os
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import StaticPool

from myapp.models import Base, User, Product, Tenant

DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite+aiosqlite:///./dev.db")


async def seed_local_db() -> None:
    is_memory = DATABASE_URL == "sqlite+aiosqlite://"
    engine = create_async_engine(
        DATABASE_URL,
        connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {},
        poolclass=StaticPool if is_memory else None,
        echo=True,
    )

    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

    factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
    async with factory() as session:
        async with session.begin():
            session.add_all([
                Tenant(id=1, name="Dev Tenant", plan="developer"),
                User(email="dev@example.com", tenant_id=1),
                Product(sku="DEV-001", name="Widget Alpha", price=9.99, tenant_id=1),
                Product(sku="DEV-002", name="Widget Beta", price=19.99, tenant_id=1),
            ])
    print(f"Seeded development database at {DATABASE_URL}")
    await engine.dispose()


if __name__ == "__main__":
    asyncio.run(seed_local_db())

Running this with DATABASE_URL=sqlite+aiosqlite:///./dev.db python seed.py creates a persistent local file. Deleting and re-running it resets the database, giving you a reproducible starting state without touching any shared database.

Differences from Postgres to watch in tests

Differences from Postgres to watch in tests

aiosqlite is not a Postgres-compatible database. Tests that pass against aiosqlite can fail against Postgres (and vice versa) for the following reasons. Treat this list as a checklist when writing tests you want to run against both backends:

  • Case sensitivity: SQLite column comparisons are case-insensitive for ASCII text by default. WHERE email = 'ALICE@example.com' matches alice@example.com in SQLite but not in Postgres.
  • RETURNING clause: SQLAlchemy 2.0 uses RETURNING for session.add() on Postgres to retrieve server-generated values. SQLite supports RETURNING from version 3.35.0 (2021). If you are on an older Python installation with a bundled older SQLite, flush() after add() may not populate id fields correctly.
  • Foreign key enforcement: SQLite does not enforce foreign key constraints by default. Rows that would fail a FOREIGN KEY constraint on Postgres insert silently in SQLite unless you explicitly run PRAGMA foreign_keys = ON on the connection.
  • ON CONFLICT DO UPDATE (upserts): Supported in SQLite 3.24+ and Postgres, but the SQLAlchemy API for Postgres upserts (insert().on_conflict_do_update()) uses sqlalchemy.dialects.postgresql.insert, which is Postgres-only and will fail against SQLite.

Frequently Asked Questions

Why must I use StaticPool and not NullPool for in-memory SQLite?NullPool creates a new connection for every checkout and closes it immediately after release — which destroys the in-memory database on each session close. StaticPool maintains exactly one persistent connection. For file-based SQLite (sqlite+aiosqlite:///path/to/file.db), NullPool works correctly because the database is persisted on disk, but StaticPool is still safer in test contexts to avoid the overhead of opening and closing the file on every fixture checkout.

Can I run pytest-xdist parallel tests with an aiosqlite in-memory database? No. Each pytest-xdist worker is a separate process, and StaticPool connections are not shared across process boundaries. Use one in-memory database per worker by passing a worker-specific URL via pytest-xdist's worker_id fixture, or use a temporary file per worker: sqlite+aiosqlite:///tmp/test_{worker_id}.db.

Should I test against aiosqlite or run my tests against a real Postgres instance? Both. Use aiosqlite for unit tests and fast CI feedback loops — they run without any network dependency and complete in milliseconds. Run your integration and migration tests against a real Postgres instance (a Docker container in CI is sufficient) to catch Postgres-specific behaviour: case sensitivity, constraint enforcement, and dialect-specific SQL. The cost of maintaining both fixture modes is low compared to the production confidence a real-Postgres integration suite provides.

How do I enable foreign key enforcement in my aiosqlite test engine? Listen to the connect event and issue PRAGMA foreign_keys = ON on each new connection:

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import StaticPool


engine = create_async_engine(
    "sqlite+aiosqlite://",
    connect_args={"check_same_thread": False},
    poolclass=StaticPool,
)


@event.listens_for(engine.sync_engine, "connect")
def enable_foreign_keys(dbapi_connection, connection_record):
    cursor = dbapi_connection.cursor()
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.close()

This runs on every physical connection establishment. With StaticPool that means once per engine lifetime — exactly what you want.