asyncio
Learn how to use Python's asyncio module in depth — tasks, queues, synchronization, and real patterns.
asyncio
The previous file covered the basics — async def, await, gather. This file goes deeper into the asyncio module itself — tasks, queues, locks, semaphores, and real patterns you will use when building async applications.
Tasks — the unit of work
A task wraps a coroutine and schedules it to run on the event loop. Creating a task starts it immediately — you do not have to await it right away.
import asyncio
async def worker(name: str, delay: float) -> str:
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished")
return f"{name} result"
async def main():
# create tasks — they start running immediately
task1 = asyncio.create_task(worker("A", 3), name="worker-A")
task2 = asyncio.create_task(worker("B", 1), name="worker-B")
task3 = asyncio.create_task(worker("C", 2), name="worker-C")
# do other work here while tasks run...
print("Tasks are running...")
# wait for all tasks
results = await asyncio.gather(task1, task2, task3)
print(results)
asyncio.run(main())Output:
A started
B started
C started
Tasks are running...
B finished
C finished
A finished
['A result', 'B result', 'C result']Task inspection
async def main():
task = asyncio.create_task(worker("A", 2))
print(task.get_name()) # worker-A or Task-1
print(task.done()) # False — still running
print(task.cancelled()) # False
await asyncio.sleep(1)
print(task.done()) # False — still running
await task
print(task.done()) # True — finished
print(task.result()) # the return value
asyncio.run(main())Cancelling tasks
You can cancel a running task. It raises CancelledError inside the coroutine:
import asyncio
async def long_task() -> None:
try:
print("Long task started")
await asyncio.sleep(10)
print("Long task done")
except asyncio.CancelledError:
print("Long task was cancelled — cleaning up")
raise # always re-raise CancelledError
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(2) # let it run for 2 seconds
task.cancel() # cancel it
try:
await task
except asyncio.CancelledError:
print("Task cancelled successfully")
asyncio.run(main())Output:
Long task started
Long task was cancelled — cleaning up
Task cancelled successfullyAlways re-raise CancelledError after cleanup. If you swallow it, the task appears to cancel but the cancellation does not propagate correctly — this can cause hard-to-debug hangs.
asyncio.TaskGroup — modern task management (Python 3.11+)
TaskGroup is the modern, cleaner way to manage multiple tasks. All tasks in the group are cancelled if any one of them raises an exception:
import asyncio
async def fetch(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Data from {name}"
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(fetch("database", 2))
task2 = tg.create_task(fetch("API", 1))
task3 = tg.create_task(fetch("cache", 0.5))
# all tasks are done by here
print(task1.result()) # Data from database
print(task2.result()) # Data from API
print(task3.result()) # Data from cache
asyncio.run(main())Prefer TaskGroup over gather() in Python 3.11+. It has better error handling — if one task fails, all others are cancelled and all exceptions are collected and reported together.
asyncio.Queue — producer/consumer pattern
Queue is a thread-safe async queue for passing data between coroutines. The classic use is the producer/consumer pattern — one coroutine produces work, another consumes it:
import asyncio
import random
async def producer(queue: asyncio.Queue, n: int) -> None:
for i in range(n):
item = f"item-{i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
# signal consumers to stop
await queue.put(None)
async def consumer(queue: asyncio.Queue, name: str) -> None:
while True:
item = await queue.get()
if item is None:
# put it back for other consumers
await queue.put(None)
break
print(f" {name} consumed: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
async with asyncio.TaskGroup() as tg:
tg.create_task(producer(queue, 8))
tg.create_task(consumer(queue, "Consumer-A"))
tg.create_task(consumer(queue, "Consumer-B"))
asyncio.run(main())Output:
Produced: item-0
Consumer-A consumed: item-0
Produced: item-1
Consumer-B consumed: item-1
Produced: item-2
Consumer-A consumed: item-2
...maxsize=5 means the producer blocks when the queue has 5 items — it cannot flood the consumers with more work than they can handle.
asyncio.Queue methods
queue = asyncio.Queue(maxsize=10)
await queue.put(item) # add item — waits if queue is full
queue.put_nowait(item) # add item — raises QueueFull if full
item = await queue.get() # get item — waits if queue is empty
item = queue.get_nowait() # get item — raises QueueEmpty if empty
queue.task_done() # signal that a gotten item was processed
await queue.join() # wait until all items are processed
print(queue.qsize()) # current number of items
print(queue.empty()) # True if empty
print(queue.full()) # True if fullasyncio.Lock — prevent concurrent access
A Lock ensures only one coroutine accesses a shared resource at a time:
import asyncio
lock = asyncio.Lock()
shared_counter = 0
async def increment(name: str) -> None:
global shared_counter
async with lock:
# only one coroutine runs this block at a time
current = shared_counter
await asyncio.sleep(0.1) # simulate some work
shared_counter = current + 1
print(f"{name}: counter is now {shared_counter}")
async def main():
await asyncio.gather(*[
increment(f"Task-{i}")
for i in range(5)
])
print(f"Final counter: {shared_counter}")
asyncio.run(main())Output:
Task-0: counter is now 1
Task-1: counter is now 2
Task-2: counter is now 3
Task-3: counter is now 4
Task-4: counter is now 5
Final counter: 5Without the lock, multiple coroutines could read shared_counter at the same time, all see the same value, all increment it, and write back — giving wrong results.
asyncio.Semaphore — limit concurrency
A Semaphore allows a limited number of coroutines to run concurrently. Use it to avoid overwhelming a resource — like making too many API calls at once:
import asyncio
import time
async def fetch_with_limit(
semaphore: asyncio.Semaphore,
url: str
) -> str:
async with semaphore: # only N coroutines here at once
print(f"Fetching {url}")
await asyncio.sleep(1) # simulate HTTP request
return f"Data from {url}"
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(10)]
# allow maximum 3 concurrent requests
semaphore = asyncio.Semaphore(3)
start = time.time()
results = await asyncio.gather(*[
fetch_with_limit(semaphore, url)
for url in urls
])
print(f"\nFetched {len(results)} URLs in {time.time() - start:.1f}s")
asyncio.run(main())Output:
Fetching https://api.example.com/item/0
Fetching https://api.example.com/item/1
Fetching https://api.example.com/item/2
(waits for one to finish)
Fetching https://api.example.com/item/3
...
Fetched 10 URLs in 4.0s10 requests, max 3 at a time, each taking 1 second → 4 batches → ~4 seconds. Without the semaphore all 10 would run at once — which might rate-limit or crash the API.
asyncio.Event — notify coroutines
An Event is a simple signaling mechanism. One coroutine sets the event, others wait for it:
import asyncio
async def waiter(event: asyncio.Event, name: str) -> None:
print(f"{name} waiting for signal...")
await event.wait()
print(f"{name} received signal — proceeding")
async def trigger(event: asyncio.Event) -> None:
print("Preparing...")
await asyncio.sleep(2)
print("Sending signal!")
event.set()
async def main():
event = asyncio.Event()
async with asyncio.TaskGroup() as tg:
tg.create_task(waiter(event, "Worker A"))
tg.create_task(waiter(event, "Worker B"))
tg.create_task(waiter(event, "Worker C"))
tg.create_task(trigger(event))
asyncio.run(main())Output:
Worker A waiting for signal...
Worker B waiting for signal...
Worker C waiting for signal...
Preparing...
Sending signal!
Worker A received signal — proceeding
Worker B received signal — proceeding
Worker C received signal — proceedingAll three workers were waiting. When trigger called event.set(), all of them were released at once.
Running blocking code in async
Some libraries are not async — they use blocking I/O. Running them directly in a coroutine would block the event loop. Use run_in_executor to run them in a thread pool:
import asyncio
import time
def blocking_operation(n: int) -> int:
"""A slow, blocking function — not async aware."""
time.sleep(2)
return n * 2
async def main():
loop = asyncio.get_event_loop()
# run blocking function in a thread — does not block event loop
result = await loop.run_in_executor(None, blocking_operation, 21)
print(result) # 42
asyncio.run(main())Or using the cleaner asyncio.to_thread() (Python 3.9+):
import asyncio
import time
def blocking_operation(n: int) -> int:
time.sleep(2)
return n * 2
async def main():
result = await asyncio.to_thread(blocking_operation, 21)
print(result) # 42
asyncio.run(main())asyncio.to_thread() is the modern, recommended way.
A real pattern — rate-limited API client
A complete async pattern — fetch many items from an API with rate limiting, retries, and a timeout:
import asyncio
import aiohttp
import time
from typing import Any
MAX_CONCURRENT = 5
TIMEOUT_SECONDS = 10
MAX_RETRIES = 3
async def fetch_with_retry(
session: aiohttp.ClientSession,
semaphore: asyncio.Semaphore,
url: str,
retries: int = MAX_RETRIES
) -> dict[str, Any] | None:
for attempt in range(1, retries + 1):
try:
async with semaphore:
async with asyncio.timeout(TIMEOUT_SECONDS):
async with session.get(url) as response:
response.raise_for_status()
return await response.json()
except aiohttp.ClientError as e:
print(f"Attempt {attempt}/{retries} failed for {url}: {e}")
if attempt < retries:
await asyncio.sleep(2 ** attempt) # exponential backoff
except TimeoutError:
print(f"Timeout on attempt {attempt} for {url}")
return None
async def fetch_all(urls: list[str]) -> list[dict | None]:
semaphore = asyncio.Semaphore(MAX_CONCURRENT)
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_retry(session, semaphore, url)
for url in urls
]
return await asyncio.gather(*tasks)
async def main():
urls = [
f"https://jsonplaceholder.typicode.com/posts/{i}"
for i in range(1, 11)
]
start = time.time()
results = await fetch_all(urls)
elapsed = time.time() - start
successful = [r for r in results if r is not None]
print(f"Fetched {len(successful)}/{len(urls)} successfully in {elapsed:.2f}s")
for post in successful[:3]:
print(f" Post {post['id']}: {post['title'][:40]}")
asyncio.run(main())This pattern includes:
- Semaphore — max 5 concurrent requests
- Timeout — cancel requests taking over 10 seconds
- Retry with backoff — retry up to 3 times with increasing delay
- Error handling — collect failures without crashing everything
asyncio debugging
Enable debug mode to find common async mistakes — forgetting to await, slow callbacks, coroutines that were never awaited:
import asyncio
async def main():
await asyncio.sleep(1)
asyncio.run(main(), debug=True)Or set the environment variable:
PYTHONASYNCIODEBUG=1 python3 app.pyDebug mode warns you about:
- Coroutines that were created but never awaited
- Callbacks that block the event loop for too long
- Resources that were not properly closed
Summary
| Concept | Use for |
|---|---|
asyncio.create_task() | Schedule a coroutine to run concurrently |
asyncio.TaskGroup | Manage a group of tasks — Python 3.11+ |
asyncio.gather() | Run multiple coroutines and collect results |
asyncio.Queue | Pass data between producer and consumer |
asyncio.Lock | Prevent concurrent access to shared state |
asyncio.Semaphore | Limit how many coroutines run at once |
asyncio.Event | Signal between coroutines |
asyncio.wait_for() | Timeout a coroutine |
asyncio.timeout() | Modern timeout context manager (3.11+) |
asyncio.to_thread() | Run blocking code without blocking the loop |
asyncio.run(main()) | Start the event loop |