Stack and Queue Patterns in Python
Think of standard library queue objects as a perfectly managed, thread-safe waiting room. Tasks enter and exit in an orderly fashion, with built-in locks and signaling to prevent collisions and ensure that operations like push and pop are atomic.
queue.Queue or queue.PriorityQueue instead of manual list abstractions.The Setup
You are designing an asynchronous task manager that processes tasks based on urgency (priority) in a multi-threaded system. You decide to write a clean priority queue pattern.
What Does This Print?
import threading
import time
# Simple priority queue using standard sorted lists
class UnsafePriorityQueue:
def __init__(self):
self.tasks = []
def push(self, priority, task):
self.tasks.append((priority, task))
self.tasks.sort(key=lambda x: x[0]) # Re-sort to maintain priority
def pop(self):
if self.tasks:
return self.tasks.pop(0) # Pop highest priority
return None
pq = UnsafePriorityQueue()
# Multiple threads push tasks simultaneously
def worker(thread_id):
for i in range(5):
pq.push(i, f"Task {i} from thread {thread_id}")
time.sleep(0.001)
threads = [threading.Thread(target=worker, args=(j,)) for j in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Queue size expected: 25, Actual: {len(pq.tasks)}")
The Output
Though the size might happen to be correct in this small sample, the list operations are completely unsafe. The elements will suffer from race conditions under load, and the O(N log N) sorting on every push completely tanks performance for large queues.
Why Python Does This
Python's standard list operations are not atomic when nested together (e.g., append + sort + pop). If thread context switching happens midway through sorting or popping, the data will corrupt. Furthermore, lists are poorly suited for dynamic priority queues due to sorting costs. The standard library provides optimized, thread-safe primitives like queue.PriorityQueue and heapq which are written in C, leveraging binary heaps for efficient log-time insertions and thread-safety locks.
The Fix
import queue
import threading
# Use thread-safe queue.PriorityQueue from the standard library
# It uses heapq under the hood for O(log N) pushes/pops
pq = queue.PriorityQueue()
def worker(thread_id):
for i in range(5):
# PriorityQueue takes tuples: (priority, data)
pq.put((i, f"Task {i} from thread {thread_id}"))
threads = [threading.Thread(target=worker, args=(j,)) for j in range(5)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Queue size: {pq.qsize()}")
# Pops are completely thread-safe and sorted
Standard library queue modules (like queue.PriorityQueue) are designed with internal locking mechanisms to ensure thread-safe access. This prevents race conditions and data corruption when multiple threads interact with the queue concurrently.
How This Fails in Real Systems
A high-frequency trading platform built a custom order-matching engine using raw sorted lists. Under volatile market conditions, rapid concurrent inserts led to out-of-order execution of limit orders, resulting in a $45,000 slippage loss before being replaced with a proper lock-based heap queue.
Key Takeaway
queue.Queue or queue.PriorityQueue instead of manual list abstractions.