← Python Code Async Python
Browse Python Concepts

asyncio.TaskGroup — Structured Concurrency in Python 3.11+

Mental Model

asyncio.TaskGroup is like a safe, well-managed sandbox for your asynchronous tasks. When you enter the sandbox, you launch tasks, and when you exit, the sandbox ensures that all tasks have either completed successfully or been explicitly cancelled, preventing any tasks from escaping and running indefinitely.

Rule: Always use asyncio.TaskGroup in Python 3.11+ to enforce structured concurrency and avoid orphaned background tasks.

The Setup

A pipeline engineer develops a real-time data ingestion system that fetches sensor data from three distinct feeds. They want to ensure that if any feed connection drops, the entire collection process halts cleanly without leaving orphaned tasks.

What Does This Print?

Broken code
Python
import asyncio

async def read_sensor(name, duration, fail=False):
    try:
        print(f"Reading from {name}")
        await asyncio.sleep(duration)
        if fail:
            raise RuntimeError(f"Sensor {name} failed!")
        print(f"Successfully read from {name}")
    except asyncio.CancelledError:
        print(f"Cleanup initiated for {name}")
        raise

async def main():
    # Attempting to use older patterns instead of structured TaskGroups
    try:
        task1 = asyncio.create_task(read_sensor("Sensor-A", 2.0))
        task2 = asyncio.create_task(read_sensor("Sensor-B", 0.5, fail=True))
        
        # Manually awaiting sequentially leaves task1 orphaned on task2's failure
        await task2
        await task1
    except Exception as e:
        print(f"Main caught: {e}")
        # Allow remaining orphaned tasks to run to demonstrate the leak
        await asyncio.sleep(2.0)

asyncio.run(main())
Predict what happens to Sensor-A when Sensor-B fails during sequential execution.

The Output

What actually happens
Reading from Sensor-A Reading from Sensor-B Main caught: Sensor Sensor-B failed! Successfully read from Sensor-A

Sensor-B raises RuntimeError, not CancelledError, so its except asyncio.CancelledError cleanup block never fires. More importantly, Sensor-A was never cancelled — it continued running to completion even after the exception from Sensor-B was caught. This is the orphaned-task problem: task1 consumes resources with no way for the parent to control or observe it.

Why Python Does This

Prior to Python 3.11, managing the lifecycle of concurrent tasks required complex, manual error-handling loops involving asyncio.all_tasks(), task cancellation, and awaiting cancellations. Without structured concurrency, tasks created via asyncio.create_task are completely detached from their parent context, scheduled independently on the global event loop. If the parent task exits due to an exception, the child tasks are left as "orphaned" executions. Python 3.11 introduced asyncio.TaskGroup as a context manager that binds task lifetimes. If a task inside a TaskGroup raises an exception, the group immediately cancels all other active tasks inside the group, awaits their cancellation, and raises an ExceptionGroup containing all raised errors.

The Fix

Corrected pattern
Python
import asyncio

async def read_sensor(name, duration, fail=False):
    try:
        print(f"Reading from {name}")
        await asyncio.sleep(duration)
        if fail:
            raise RuntimeError(f"Sensor {name} failed!")
        print(f"Successfully read from {name}")
    except asyncio.CancelledError:
        # TaskGroup guarantees this cancellation block executes
        print(f"Cleanup initiated for {name}")
        raise

async def main():
    try:
        # TaskGroup automatically handles concurrent execution and cancellation
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(read_sensor("Sensor-A", 2.0))
            task2 = tg.create_task(read_sensor("Sensor-B", 0.5, fail=True))
    except* RuntimeError as eg:
        # Python 3.11+ except* syntax handles ExceptionGroups
        print(f"Caught exception group containing: {eg.exceptions}")

asyncio.run(main())

asyncio.TaskGroup enforces structured concurrency. When an exception occurs within any task managed by the group, the group automatically cancels all other running tasks within its scope and then propagates the first exception to the TaskGroup's await call, guaranteeing proper cleanup.

How This Fails in Real Systems

A distributed microservice used dynamic worker tasks to execute database syncs. When a sync task failed due to a deadlocked connection, outstanding read tasks kept spinning in the background, consuming all available database connection pool slots. Within 24 hours, the service experienced complete starvation. The issue was solved by refactoring the task orchestration to use Python 3.11 TaskGroup structures, ensuring that database locks automatically trigger cleanups of sister tasks.

Key Takeaway

Always use asyncio.TaskGroup in Python 3.11+ to enforce structured concurrency and avoid orphaned background tasks.
Common mistake: Developers often manage groups of asynchronous tasks manually using asyncio.create_task() and asyncio.await without ensuring all tasks are properly awaited or cancelled upon an error, leading to orphaned background tasks.