asyncio.gather() — Running Coroutines Concurrently
asyncio.gather() is like a team project where everyone works on their part simultaneously. If one team member encounters a critical, unhandled error, the entire project is immediately halted, and the error is reported, potentially leaving other ongoing tasks unfinished.
The Setup
A cloud engineer writes a utility to push log archives to multiple storage buckets concurrently. If one bucket upload fails, they expect the system to abort and report the error, but they overlook how other uploads continue unmanaged.
What Does This Print?
import asyncio
async def upload_bucket(name, delay, fail=False):
print(f"Starting upload to {name}")
await asyncio.sleep(delay)
if fail:
raise IOError(f"Failed uploading to {name}")
print(f"Finished upload to {name}")
return f"Success {name}"
async def main():
# If bucket B fails, we want to know, but watch what happens to A and C
try:
results = await asyncio.gather(
upload_bucket("Bucket-A", 1.5),
upload_bucket("Bucket-B", 0.5, fail=True),
upload_bucket("Bucket-C", 1.0)
)
print(results)
except Exception as e:
print(f"Caught exception: {e}")
# Give outstanding tasks a moment to run to show if they completed
await asyncio.sleep(1.5)
asyncio.run(main())
The Output
When Bucket-B raises an IOError, asyncio.gather immediately propagates the exception up to the caller's try-except block. However, it does not cancel the remaining concurrent tasks. Bucket-A and Bucket-C continue running silently in the background until completion, potentially consuming critical network and computing resources with no way for the parent task to easily inspect their results or clean them up.
Why Python Does This
asyncio.gather() is not designed around structured concurrency principles. When you pass coroutines to gather(), they are wrapped in asyncio.Task objects and scheduled on the event loop. If one task encounters an error, gather() catches it and raises it immediately to the caller, but it does not track or abort the other tasks in its collection unless you manually iterate over them and cancel them. This lack of automated teardown is a frequent source of resource leaks, orphaned database connections, and incomplete API state transitions.
The Fix
import asyncio
async def upload_bucket(name, delay, fail=False):
print(f"Starting upload to {name}")
await asyncio.sleep(delay)
if fail:
raise IOError(f"Failed uploading to {name}")
print(f"Finished upload to {name}")
return f"Success {name}"
async def main():
# Setting return_exceptions=True prevents early propagation, returning errors as results
tasks = [
upload_bucket("Bucket-A", 1.5),
upload_bucket("Bucket-B", 0.5, fail=True),
upload_bucket("Bucket-C", 1.0)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
print(f"Handling task error: {res}")
else:
print(f"Task result: {res}")
asyncio.run(main())
Setting return_exceptions=True tells asyncio.gather() to treat exceptions from individual tasks as return values instead of immediately propagating them. This ensures all tasks attempt to complete, and the caller receives a list of results (or exception objects) for each task. Alternatively, handling exceptions within each sub-task prevents early termination.
How This Fails in Real Systems
A real-time notification engine used asyncio.gather to deliver push notifications, text messages, and emails concurrently. An API failure in the push notification provider caused an exception that bypassed the rest of the execution flow. The emails and text messages still sent, but because the gather exception propagated, the database status was never updated to mark the notifications as sent. This caused infinite resend loops, bombarding thousands of customers with duplicate messages.
Key Takeaway
asyncio.gather() to continue processing all tasks even if one fails, or they don't anticipate the immediate propagation of exceptions, which can leave other concurrent tasks in an unknown state.