asyncio.Queue — Producer/Consumer Pattern
Imagine asyncio.Queue as a conveyor belt with a counter. When you put an item, the counter goes up. queue.join() waits for the counter to return to zero. If you get an item but don't call task_done(), it's like taking an item off the belt without acknowledging its completion, leaving the counter stuck.
The Setup
A pipeline architect designs an async log parser where producer tasks parse raw files and enqueue lines, and a pool of consumers processes the parsed structures and writes them to Elasticsearch.
What Does This Print?
import asyncio
async def producer(queue):
for i in range(3):
print(f"Produced item {i}")
await queue.put(f"item_{i}")
# Sending sentinel values to stop consumers
await queue.put(None)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f"Consumed {item}")
# Missing task_done call!
async def main():
queue = asyncio.Queue()
# Run producer and consumer concurrently
await asyncio.gather(
producer(queue),
consumer(queue)
)
# Wait for the queue to clear
await queue.join()
print("Processing complete!")
asyncio.run(main())
The Output
The program hangs indefinitely on await queue.join(). The queue maintains an internal counter of unfinished tasks. This counter is incremented when an item is added via put(), but it is only decremented when a consumer explicitly calls queue.task_done(). Since the consumer never called task_done(), the queue believes tasks are still in progress, and join() blocks forever.
Why Python Does This
The asyncio.Queue API is designed to support worker coordination patterns. An internal counter, _unfinished_tasks, tracks active items. When join() is awaited, it blocks on an internal condition variable until _unfinished_tasks drops to zero. CPython requires developers to explicitly call task_done() to signify that the processing of an item retrieved from get() is completed. If you fail to notify the queue that processing is done, the lock condition is never resolved, leading to thread-like deadlocks within the event loop's single thread.
The Fix
import asyncio
async def producer(queue):
for i in range(3):
print(f"Produced item {i}")
await queue.put(f"item_{i}")
async def consumer(queue):
while True:
item = await queue.get()
try:
print(f"Consumed {item}")
finally:
# Always call task_done inside a finally block to prevent leaks on failure
queue.task_done()
async def main():
queue = asyncio.Queue()
# Start background consumer
consumer_task = asyncio.create_task(consumer(queue))
# Wait for producer to populate
await producer(queue)
# Wait until all items are fully processed
await queue.join()
# Gracefully cancel background consumer now that queue is empty
consumer_task.cancel()
print("Processing complete!")
asyncio.run(main())
queue.task_done() decrements the internal counter that queue.join() monitors. By ensuring task_done() is called for every item retrieved, queue.join() correctly signals completion when all items have been processed and acknowledged. Using a try...finally block guarantees task_done() is called even if processing fails.
How This Fails in Real Systems
A dynamic image resizing service used asyncio.Queue to buffer incoming requests. During a sudden traffic surge, several consumer tasks crashed on corrupted image payloads. Because the error handler failed to execute task_done(), the internal task counter was never decremented. The service health checks, which depended on queue draining metrics, timed out, causing Kubernetes to repeatedly restart healthy pods, compounding the downtime to several hours.
Key Takeaway
queue.task_done() after processing an item from an asyncio.Queue, causing queue.join() to hang indefinitely as it waits for an internal counter to reach zero.