BackgroundTasks in FastAPI — Resource Lifetimes
Think of FastAPI's BackgroundTasks as a separate, delayed execution environment that starts after the main request-response cycle has completed and the request-scoped resources have been cleaned up. It's like sending a postcard with information, not a direct live object.
The Setup
You want to process audits asynchronously without making your client wait. You write an endpoint that accepts a database connection dependency, saves a main record, and registers a background task to write log data using that same connection.
What Does This Print?
from fastapi import FastAPI, Depends, BackgroundTasks
app = FastAPI()
class DBConnection:
def __init__(self):
self.is_closed = False
def close(self):
self.is_closed = True
def execute(self, query: str):
if self.is_closed:
raise RuntimeError("Session closed")
return f"Ran: {query}"
def get_db():
db = DBConnection()
try:
yield db
finally:
db.close() # Clean up resource post-request
def write_audit_log(db: DBConnection, text: str):
db.execute(f"INSERT INTO audits VALUES ('{text}')")
@app.post("/action")
async def trigger_action(
text: str,
background_tasks: BackgroundTasks,
db: DBConnection = Depends(get_db)
):
db.execute("INSERT INTO actions VALUES ('payload')")
background_tasks.add_task(write_audit_log, db, text)
return {"status": "success"}
/action?text=user_login.
The Output
The HTTP client receives a 200 OK status, but the background worker task fails with a traceback in the server console:
The server returns a successful payload to the client, but the subsequent audit log write crashes because the database connection has already been torn down.
Why Python Does This
FastAPI runs BackgroundTasks after the response has been sent to the client. This occurs outside the request-response lifecycle. When the endpoint function returns, FastAPI executes the cleanup phase of any generator-based dependencies (the finally block in get_db). By the time the event loop schedules and runs write_audit_log, the database session has been closed. Sharing request-scoped contextual resources with background tasks is a structural lifecycle mismatch.
The Fix
from fastapi import FastAPI, Depends, BackgroundTasks
app = FastAPI()
class DBConnection:
def __init__(self):
self.is_closed = False
def close(self):
self.is_closed = True
def execute(self, query: str):
if self.is_closed:
raise RuntimeError("Session closed")
return f"Ran: {query}"
def get_db():
db = DBConnection()
try:
yield db
finally:
db.close()
def write_audit_log(text: str):
# Instantiate an isolated connection lifecycle context directly inside the background task
db = DBConnection()
try:
db.execute(f"INSERT INTO audits VALUES ('{text}')")
finally:
db.close()
@app.post("/action")
async def trigger_action(
text: str,
background_tasks: BackgroundTasks,
db: DBConnection = Depends(get_db)
):
db.execute("INSERT INTO actions VALUES ('payload')")
background_tasks.add_task(write_audit_log, text) # Pass data parameters, not connection instances
return {"status": "success"}
Instead of passing the db object directly, pass only the necessary text parameter to write_audit_log. The write_audit_log function should then obtain its own fresh, isolated DBConnection instance, perhaps using another dependency or by re-establishing a connection within the task, ensuring it has a valid resource to operate on.
How This Fails in Real Systems
An e-commerce company dispatched transactional dispatch emails inside a background task, sharing the request's SQL Alchemy DB session to mark notification status as complete. Under high API load, every single email transaction crashed with database connection closure errors.
Key Takeaway
BackgroundTasks function without considering the dependency's lifetime relative to the task's execution.