Using SQLAlchemy Async with Celery Task Workers
Use asyncio.run() inside each synchronous Celery task to create an isolated event loop per task invocation, and combine that with either NullPool or a per-process engine (created after os.fork()) to avoid asyncpg connection-state corruption — the full pattern is covered in the asyncpg vs psycopg3 driver guide.
Quick Answer
The minimal working pattern: create the engine after forking (in worker_process_init), use NullPool to prevent pool state crossing fork boundaries, wrap async DB work in asyncio.run() inside standard @app.task functions.
# Legacy / incorrect approach — engine created at module import time,
# before Celery forks worker processes. asyncpg connections are not fork-safe.
# BAD: module-level engine creation
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine("postgresql+asyncpg://user:pass@db:5432/prod")
SessionFactory = async_sessionmaker(engine, class_=AsyncSession)
# Correct approach — NullPool + post-fork engine initialisation
import asyncio
import os
from celery import Celery
from celery.signals import worker_process_init
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import NullPool
app = Celery("tasks", broker="redis://localhost:6379/0")
_engine = None
_SessionFactory = None
@worker_process_init.connect
def init_db_engine(**kwargs):
"""
Runs once in each worker process AFTER os.fork().
NullPool ensures no connections are shared across processes.
"""
global _engine, _SessionFactory
_engine = create_async_engine(
os.environ["DATABASE_URL"], # postgresql+asyncpg://...
poolclass=NullPool, # no persistent pool — each task gets a fresh connection
echo=False,
)
_SessionFactory = async_sessionmaker(
_engine,
class_=AsyncSession,
expire_on_commit=False,
)
@app.task(bind=True, max_retries=3)
def process_invoice(self, invoice_id: int) -> dict:
async def _run() -> dict:
async with _SessionFactory() as session:
async with session.begin():
from sqlalchemy import select
from myapp.models import Invoice
result = await session.execute(
select(Invoice).where(Invoice.id == invoice_id)
)
invoice = result.scalar_one_or_none()
if invoice is None:
return {"status": "not_found", "invoice_id": invoice_id}
invoice.status = "processed"
return {"status": "ok", "reference": invoice.reference}
try:
return asyncio.run(_run())
except Exception as exc:
raise self.retry(exc=exc, countdown=30)
Execution Context & Async Workflow Integration
Celery's default worker pool (prefork) creates worker processes by calling os.fork() on the parent process. Any resource that was open in the parent — file descriptors, asyncpg TCP connections, internal asyncio state — is duplicated into the child with shared but diverged state. asyncpg's connections are not safe after fork() because the underlying TCP socket is now owned by two processes, and the asyncpg protocol state machine expects to be the sole reader/writer.
Why asyncio.run() per task is correct
asyncio.run() creates a brand-new event loop, runs the coroutine to completion, then destroys the loop. This is exactly what you need for a Celery task:
- No loop state leaks between tasks (no lingering callbacks, handles, or pending futures).
- The event loop is garbage-collected after the task returns, releasing asyncio-internal resources.
- asyncpg connections acquired during the task are closed when the session context exits, before
asyncio.run()returns.
The alternative — sharing a persistent event loop across tasks in the same worker process — creates subtle race conditions when two tasks run concurrently in an async Celery setup and both try to schedule coroutines onto the same loop from different threads.
NullPool vs QueuePool in worker processes
| Pool class | Behaviour in prefork workers | When to use |
|---|---|---|
NullPool | Opens a connection per async with session:, closes it immediately on exit. No idle connections held. | Always use with prefork Celery workers. Avoids fork-safety issues. |
AsyncAdaptedQueuePool (default) | Keeps pool_size connections open indefinitely. Not safe after os.fork(). | ASGI servers (FastAPI, Starlette), not Celery prefork. |
StaticPool | Single reused connection — never appropriate for multi-process workers. | In-memory SQLite test fixtures only. |
NullPool does mean a TCP handshake and TLS negotiation per task if the database is remote. For tasks that run many short queries, batch them into a single task invocation rather than firing one task per row. If task throughput is high and connection overhead is measurable, deploy PgBouncer in front of Postgres — PgBouncer's connection pool amortises the handshake cost, and you use NullPool on the SQLAlchemy side (SQLAlchemy pool is redundant behind a pooler anyway).
Celery native async tasks (Celery 5.x)
Celery 5.1+ supports async def task functions natively when the worker runs under the gevent or eventlet pools, or when using the experimental asyncio pool. For most production deployments the prefork pool remains the default, so asyncio.run() in sync tasks is the recommended pattern. If you adopt Celery's async execution mode:
import asyncio
from celery import Celery
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from myapp.db import get_session_factory
from myapp.models import Order
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
async def fulfill_order(order_id: int) -> str:
"""
Native async Celery task (requires asyncio-compatible worker pool).
The task coroutine runs inside the worker's event loop directly —
no asyncio.run() wrapper needed.
"""
SessionFactory = get_session_factory() # must be created post-fork
async with SessionFactory() as session:
async with session.begin():
result = await session.execute(
select(Order).where(Order.id == order_id)
)
order = result.scalar_one()
order.fulfillment_status = "dispatched"
return f"Order {order_id} fulfilled"
Resolving Warnings, Errors & Common Mistakes
| Error / Warning | Root Cause | Production Fix |
|---|---|---|
asyncpg.exceptions.InterfaceError: cannot perform operation: another operation is in progress | Two coroutines sharing the same asyncpg connection concurrently (e.g., shared engine created before fork, used in multiple tasks). | Create engine per-worker in worker_process_init with NullPool. |
RuntimeError: no running event loop | asyncio.get_event_loop() called inside a sync task where no loop exists (Python 3.10+ deprecated the implicit loop). | Replace with asyncio.run(coroutine()) — creates its own loop. |
GreenletSpawnError: greenlet context switch not allowed | Sync SQLAlchemy ORM attribute access (lazy load) triggered inside an async context without a running greenlet. | Use expire_on_commit=False and eager-load all relationships with selectinload() before the session closes. |
QueuePool limit of size N overflow N reached, connection timed out | pool_size set on a pool that also has to serve multiple concurrent tasks, or engine created before fork so the pool is shared (and corrupted). | Use NullPool — removes pool contention entirely for prefork workers. |
sqlalchemy.exc.TimeoutError on pool.connect() after task retry storm | Many tasks retrying simultaneously all request connections; pool exhausted. | With NullPool, pool exhaustion is impossible (no pool). Switch to NullPool; if using PgBouncer, increase pool_size on the PgBouncer side. |
asyncpg.exceptions.InvalidSQLStatementNameError: prepared statement "..." does not exist | asyncpg's prepared-statement cache hit a PgBouncer-recycled connection where the server-side statement was discarded. | Add connect_args={"statement_cache_size": 0} to the engine when Postgres is behind PgBouncer in transaction mode. |
psycopg.OperationalError: the connection is closed | Worker process inherited an open psycopg3 connection from the parent; the forked file descriptor is in an undefined state. | Same fix as asyncpg: defer engine creation to worker_process_init. |
DetachedInstanceError: Instance <User> is not bound to a Session | ORM instance accessed outside the async with session: block (lazy relationship traversal after session close). | Set expire_on_commit=False and load all needed attributes/relationships before the session closes. |
Advanced Celery + Async SQLAlchemy Optimisation
Batching DB operations to amortise connection overhead
With NullPool, every task opens and closes a connection. If you have thousands of small tasks, the connection overhead dominates. Redesign task granularity to process batches:
import asyncio
from celery import Celery
from celery.signals import worker_process_init
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import NullPool
from myapp.models import User
import os
app = Celery("tasks", broker="redis://localhost:6379/0")
_SessionFactory = None
@worker_process_init.connect
def init_engine(**kwargs):
global _SessionFactory
engine = create_async_engine(
os.environ["DATABASE_URL"],
poolclass=NullPool,
)
_SessionFactory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@app.task
def send_digest_emails(user_ids: list[int]) -> dict:
"""
Processes a batch of users in a single DB round-trip instead of
one task per user. Single connection open/close for the entire batch.
"""
async def _run() -> dict:
async with _SessionFactory() as session:
async with session.begin():
result = await session.execute(
select(User)
.where(User.id.in_(user_ids))
.where(User.email_confirmed.is_(True))
)
users = result.scalars().all()
processed = []
for user in users:
# ... send email via external service ...
processed.append(user.id)
# Bulk-update in one statement
await session.execute(
update(User)
.where(User.id.in_(processed))
.values(last_digest_sent_at=__import__("datetime").datetime.utcnow())
)
return {"processed": len(processed), "batch_size": len(user_ids)}
return asyncio.run(_run())
Monitoring connection health in Celery workers
Because NullPool opens fresh connections per task, traditional pool-level metrics (QueuePool.checkedin, checkedout) are not available. Instead, instrument at the query level:
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
import logging
logger = logging.getLogger(__name__)
def instrument_engine(engine):
"""Attach timing hooks to an async engine for per-task latency logging."""
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before_execute(conn, cursor, statement, parameters, context, executemany):
conn.info["query_start"] = time.perf_counter()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after_execute(conn, cursor, statement, parameters, context, executemany):
elapsed = time.perf_counter() - conn.info.get("query_start", time.perf_counter())
logger.debug("Query completed in %.3fs: %.120s", elapsed, statement)
Frequently Asked Questions
Can I use a persistent QueuePool inside Celery workers instead of NullPool?
Only if you are certain the engine is created after os.fork() (via worker_process_init) and you are not sharing the engine object across processes. Even then, asyncpg connections have internal asyncio state tied to a specific event loop. Because asyncio.run() creates a new loop per task, and asyncpg connections cache a reference to the loop they were created on, a pooled connection from a previous task's asyncio.run() invocation will raise RuntimeError: loop is closed when the next task tries to reuse it. NullPool eliminates this entirely by never caching connections between tasks.
How do I pass the database URL to workers without hardcoding it?
Use Celery's worker_process_init signal with os.environ["DATABASE_URL"]. Set the variable via your process supervisor (systemd, Docker ENV, Kubernetes envFrom). Never embed credentials in the Celery broker URL or task arguments.
Does psycopg3 have the same fork-safety problems as asyncpg in prefork workers?
Yes. Both asyncpg and psycopg3 async connections are tied to the asyncio event loop that created them, and that loop's internal state is duplicated — not shared cleanly — across os.fork(). The fix is identical for both: defer engine creation to worker_process_init and use NullPool.
What happens if a Celery task times out (SoftTimeLimitExceeded) mid-await?
Celery raises celery.exceptions.SoftTimeLimitExceeded (a BaseException subclass) as a signal. If this interrupts an await session.execute() call, the async with session.begin(): context manager's __aexit__ may not run, leaving the connection in an indeterminate state. With NullPool, the underlying TCP connection is simply abandoned and eventually cleaned up by the OS. Wrap the outermost asyncio.run() in a try/except SoftTimeLimitExceeded block that explicitly calls asyncio.run(session.close()) if you need deterministic cleanup.
Related
- Choosing Between asyncpg and psycopg3 Async Drivers — parent guide covering driver selection, PgBouncer compatibility, COPY support, and type adaptation.
- Configuring Async Engines and Connection Pools — pool_size, max_overflow, and pool_recycle reference for ASGI-based applications.
- Handling Connection Leaks and Pool Exhaustion — diagnosing and fixing pool exhaustion in production environments.