DocsHub
Async Python

asyncio

Learn how to use Python's asyncio module in depth — tasks, queues, synchronization, and real patterns.

asyncio

The previous file covered the basics — async def, await, gather. This file goes deeper into the asyncio module itself — tasks, queues, locks, semaphores, and real patterns you will use when building async applications.


Tasks — the unit of work

A task wraps a coroutine and schedules it to run on the event loop. Creating a task starts it immediately — you do not have to await it right away.

import asyncio

async def worker(name: str, delay: float) -> str:
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished")
    return f"{name} result"

async def main():
    # create tasks — they start running immediately
    task1 = asyncio.create_task(worker("A", 3), name="worker-A")
    task2 = asyncio.create_task(worker("B", 1), name="worker-B")
    task3 = asyncio.create_task(worker("C", 2), name="worker-C")

    # do other work here while tasks run...
    print("Tasks are running...")

    # wait for all tasks
    results = await asyncio.gather(task1, task2, task3)
    print(results)

asyncio.run(main())

Output:

A started
B started
C started
Tasks are running...
B finished
C finished
A finished
['A result', 'B result', 'C result']

Task inspection

async def main():
    task = asyncio.create_task(worker("A", 2))

    print(task.get_name())      # worker-A or Task-1
    print(task.done())          # False — still running
    print(task.cancelled())     # False

    await asyncio.sleep(1)
    print(task.done())          # False — still running

    await task
    print(task.done())          # True — finished
    print(task.result())        # the return value

asyncio.run(main())

Cancelling tasks

You can cancel a running task. It raises CancelledError inside the coroutine:

import asyncio

async def long_task() -> None:
    try:
        print("Long task started")
        await asyncio.sleep(10)
        print("Long task done")
    except asyncio.CancelledError:
        print("Long task was cancelled — cleaning up")
        raise   # always re-raise CancelledError

async def main():
    task = asyncio.create_task(long_task())

    await asyncio.sleep(2)   # let it run for 2 seconds

    task.cancel()            # cancel it

    try:
        await task
    except asyncio.CancelledError:
        print("Task cancelled successfully")

asyncio.run(main())

Output:

Long task started
Long task was cancelled — cleaning up
Task cancelled successfully

Always re-raise CancelledError after cleanup. If you swallow it, the task appears to cancel but the cancellation does not propagate correctly — this can cause hard-to-debug hangs.


asyncio.TaskGroup — modern task management (Python 3.11+)

TaskGroup is the modern, cleaner way to manage multiple tasks. All tasks in the group are cancelled if any one of them raises an exception:

import asyncio

async def fetch(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Data from {name}"

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(fetch("database", 2))
        task2 = tg.create_task(fetch("API", 1))
        task3 = tg.create_task(fetch("cache", 0.5))

    # all tasks are done by here
    print(task1.result())   # Data from database
    print(task2.result())   # Data from API
    print(task3.result())   # Data from cache

asyncio.run(main())

Prefer TaskGroup over gather() in Python 3.11+. It has better error handling — if one task fails, all others are cancelled and all exceptions are collected and reported together.


asyncio.Queue — producer/consumer pattern

Queue is a thread-safe async queue for passing data between coroutines. The classic use is the producer/consumer pattern — one coroutine produces work, another consumes it:

import asyncio
import random

async def producer(queue: asyncio.Queue, n: int) -> None:
    for i in range(n):
        item = f"item-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

    # signal consumers to stop
    await queue.put(None)

async def consumer(queue: asyncio.Queue, name: str) -> None:
    while True:
        item = await queue.get()

        if item is None:
            # put it back for other consumers
            await queue.put(None)
            break

        print(f"  {name} consumed: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer(queue, 8))
        tg.create_task(consumer(queue, "Consumer-A"))
        tg.create_task(consumer(queue, "Consumer-B"))

asyncio.run(main())

Output:

Produced: item-0
  Consumer-A consumed: item-0
Produced: item-1
  Consumer-B consumed: item-1
Produced: item-2
  Consumer-A consumed: item-2
...

maxsize=5 means the producer blocks when the queue has 5 items — it cannot flood the consumers with more work than they can handle.


asyncio.Queue methods

queue = asyncio.Queue(maxsize=10)

await queue.put(item)       # add item — waits if queue is full
queue.put_nowait(item)      # add item — raises QueueFull if full

item = await queue.get()    # get item — waits if queue is empty
item = queue.get_nowait()   # get item — raises QueueEmpty if empty

queue.task_done()           # signal that a gotten item was processed
await queue.join()          # wait until all items are processed

print(queue.qsize())        # current number of items
print(queue.empty())        # True if empty
print(queue.full())         # True if full

asyncio.Lock — prevent concurrent access

A Lock ensures only one coroutine accesses a shared resource at a time:

import asyncio

lock = asyncio.Lock()
shared_counter = 0

async def increment(name: str) -> None:
    global shared_counter

    async with lock:
        # only one coroutine runs this block at a time
        current = shared_counter
        await asyncio.sleep(0.1)   # simulate some work
        shared_counter = current + 1
        print(f"{name}: counter is now {shared_counter}")

async def main():
    await asyncio.gather(*[
        increment(f"Task-{i}")
        for i in range(5)
    ])
    print(f"Final counter: {shared_counter}")

asyncio.run(main())

Output:

Task-0: counter is now 1
Task-1: counter is now 2
Task-2: counter is now 3
Task-3: counter is now 4
Task-4: counter is now 5
Final counter: 5

Without the lock, multiple coroutines could read shared_counter at the same time, all see the same value, all increment it, and write back — giving wrong results.


asyncio.Semaphore — limit concurrency

A Semaphore allows a limited number of coroutines to run concurrently. Use it to avoid overwhelming a resource — like making too many API calls at once:

import asyncio
import time

async def fetch_with_limit(
    semaphore: asyncio.Semaphore,
    url: str
) -> str:
    async with semaphore:   # only N coroutines here at once
        print(f"Fetching {url}")
        await asyncio.sleep(1)   # simulate HTTP request
        return f"Data from {url}"

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(10)]

    # allow maximum 3 concurrent requests
    semaphore = asyncio.Semaphore(3)

    start = time.time()
    results = await asyncio.gather(*[
        fetch_with_limit(semaphore, url)
        for url in urls
    ])

    print(f"\nFetched {len(results)} URLs in {time.time() - start:.1f}s")

asyncio.run(main())

Output:

Fetching https://api.example.com/item/0
Fetching https://api.example.com/item/1
Fetching https://api.example.com/item/2
(waits for one to finish)
Fetching https://api.example.com/item/3
...
Fetched 10 URLs in 4.0s

10 requests, max 3 at a time, each taking 1 second → 4 batches → ~4 seconds. Without the semaphore all 10 would run at once — which might rate-limit or crash the API.


asyncio.Event — notify coroutines

An Event is a simple signaling mechanism. One coroutine sets the event, others wait for it:

import asyncio

async def waiter(event: asyncio.Event, name: str) -> None:
    print(f"{name} waiting for signal...")
    await event.wait()
    print(f"{name} received signal — proceeding")

async def trigger(event: asyncio.Event) -> None:
    print("Preparing...")
    await asyncio.sleep(2)
    print("Sending signal!")
    event.set()

async def main():
    event = asyncio.Event()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(waiter(event, "Worker A"))
        tg.create_task(waiter(event, "Worker B"))
        tg.create_task(waiter(event, "Worker C"))
        tg.create_task(trigger(event))

asyncio.run(main())

Output:

Worker A waiting for signal...
Worker B waiting for signal...
Worker C waiting for signal...
Preparing...
Sending signal!
Worker A received signal — proceeding
Worker B received signal — proceeding
Worker C received signal — proceeding

All three workers were waiting. When trigger called event.set(), all of them were released at once.


Running blocking code in async

Some libraries are not async — they use blocking I/O. Running them directly in a coroutine would block the event loop. Use run_in_executor to run them in a thread pool:

import asyncio
import time

def blocking_operation(n: int) -> int:
    """A slow, blocking function — not async aware."""
    time.sleep(2)
    return n * 2

async def main():
    loop = asyncio.get_event_loop()

    # run blocking function in a thread — does not block event loop
    result = await loop.run_in_executor(None, blocking_operation, 21)
    print(result)   # 42

asyncio.run(main())

Or using the cleaner asyncio.to_thread() (Python 3.9+):

import asyncio
import time

def blocking_operation(n: int) -> int:
    time.sleep(2)
    return n * 2

async def main():
    result = await asyncio.to_thread(blocking_operation, 21)
    print(result)   # 42

asyncio.run(main())

asyncio.to_thread() is the modern, recommended way.


A real pattern — rate-limited API client

A complete async pattern — fetch many items from an API with rate limiting, retries, and a timeout:

import asyncio
import aiohttp
import time
from typing import Any

MAX_CONCURRENT = 5
TIMEOUT_SECONDS = 10
MAX_RETRIES = 3

async def fetch_with_retry(
    session: aiohttp.ClientSession,
    semaphore: asyncio.Semaphore,
    url: str,
    retries: int = MAX_RETRIES
) -> dict[str, Any] | None:

    for attempt in range(1, retries + 1):
        try:
            async with semaphore:
                async with asyncio.timeout(TIMEOUT_SECONDS):
                    async with session.get(url) as response:
                        response.raise_for_status()
                        return await response.json()

        except aiohttp.ClientError as e:
            print(f"Attempt {attempt}/{retries} failed for {url}: {e}")
            if attempt < retries:
                await asyncio.sleep(2 ** attempt)   # exponential backoff

        except TimeoutError:
            print(f"Timeout on attempt {attempt} for {url}")

    return None


async def fetch_all(urls: list[str]) -> list[dict | None]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)

    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_retry(session, semaphore, url)
            for url in urls
        ]
        return await asyncio.gather(*tasks)


async def main():
    urls = [
        f"https://jsonplaceholder.typicode.com/posts/{i}"
        for i in range(1, 11)
    ]

    start = time.time()
    results = await fetch_all(urls)
    elapsed = time.time() - start

    successful = [r for r in results if r is not None]
    print(f"Fetched {len(successful)}/{len(urls)} successfully in {elapsed:.2f}s")

    for post in successful[:3]:
        print(f"  Post {post['id']}: {post['title'][:40]}")

asyncio.run(main())

This pattern includes:

  • Semaphore — max 5 concurrent requests
  • Timeout — cancel requests taking over 10 seconds
  • Retry with backoff — retry up to 3 times with increasing delay
  • Error handling — collect failures without crashing everything

asyncio debugging

Enable debug mode to find common async mistakes — forgetting to await, slow callbacks, coroutines that were never awaited:

import asyncio

async def main():
    await asyncio.sleep(1)

asyncio.run(main(), debug=True)

Or set the environment variable:

PYTHONASYNCIODEBUG=1 python3 app.py

Debug mode warns you about:

  • Coroutines that were created but never awaited
  • Callbacks that block the event loop for too long
  • Resources that were not properly closed

Summary

ConceptUse for
asyncio.create_task()Schedule a coroutine to run concurrently
asyncio.TaskGroupManage a group of tasks — Python 3.11+
asyncio.gather()Run multiple coroutines and collect results
asyncio.QueuePass data between producer and consumer
asyncio.LockPrevent concurrent access to shared state
asyncio.SemaphoreLimit how many coroutines run at once
asyncio.EventSignal between coroutines
asyncio.wait_for()Timeout a coroutine
asyncio.timeout()Modern timeout context manager (3.11+)
asyncio.to_thread()Run blocking code without blocking the loop
asyncio.run(main())Start the event loop

On this page