Sessions — commit, rollback, and AsyncSession
Imagine an SQLAlchemy session as a single ongoing transaction to the database. If an error occurs, the transaction is marked as failed, like a broken lock. You must explicitly unlock it with rollback() before you can start a new transaction or reuse the session.
session.rollback() inside the exception handler to avoid poisoning subsequent transactions.The Setup
You are running a worker service. An exception is caught during a database write operation, but the session is not rolled back. The worker thread then attempts to reuse the same session object for its next task.
What Does This Print?
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
class AuditLog(Base):
__tablename__ = 'audit_logs'
id = Column(Integer, primary_key=True)
event = Column(String, unique=True)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
session.add(AuditLog(event="START"))
session.add(AuditLog(event="START")) # Triggers a unique constraint failure
session.commit()
except Exception as e:
print(f"First operation failed: {e}")
# We do NOT call session.rollback() here
try:
session.add(AuditLog(event="SHUTDOWN"))
session.commit()
except Exception as e:
print(f"Second operation failed: {e}")
The Output
The second write fails with a PendingRollbackError. When an exception occurs during flush/commit, SQLAlchemy marks the internal transaction as inactive/failed. A poisoned transaction cannot execute any further commands until you explicitly call session.rollback(). This design prevents developers from accidentally saving partial data sets.
Why Python Does This
SQLAlchemy's Session acts as a facade over a DBAPI-level transaction. In Python, db drivers hold transaction states. When a database raises an integrity constraint, the active database transaction is aborted by the engine. SQLAlchemy tracks this transactional state inside its internal state machine (represented by the SessionTransaction class). If the transaction fails, SQLAlchemy transition's the transaction state to "inactive". Any attempt to use the connection in an inactive state raises PendingRollbackError. Calling rollback() resets the internal state machine, discards pending changes from the session's flush list, and releases the DBAPI transaction so a new one can begin.
The Fix
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
Base = declarative_base()
class AuditLog(Base):
__tablename__ = 'audit_logs'
id = Column(Integer, primary_key=True)
event = Column(String, unique=True)
engine = create_engine('sqlite:///:memory:')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
try:
session.add(AuditLog(event="START"))
session.add(AuditLog(event="START"))
session.commit()
except Exception as e:
print(f"First operation failed: {e}")
session.rollback() # FIX: Reset transaction state properly
try:
session.add(AuditLog(event="SHUTDOWN"))
session.commit() # This will now succeed cleanly
print("Second operation succeeded!")
except Exception as e:
session.rollback()
print(f"Second operation failed: {e}")
Calling session.rollback() explicitly clears the pending rollback state of the session and discards any uncommitted changes, allowing the session to be reused for new transactions or safely closed.
How This Fails in Real Systems
A long-running Celery worker handled API webhooks. A single duplicate webhook threw an IntegrityError, which was caught. Because the developer didn't call session.rollback(), the worker stayed alive but raised PendingRollbackError for every subsequent task, failing 50,000 requests over a weekend.
Key Takeaway
session.rollback() inside the exception handler to avoid poisoning subsequent transactions.rollback() or close() is called.