DocsHub
Async Python

Async & Await

Learn how asynchronous programming works in Python using async def, await, and the event loop.

Async & Await

Most of the code you have written so far is synchronous — it runs line by line, top to bottom. Each line waits for the previous one to finish before starting.

That works fine for most things. But what about waiting? Fetching data from an API, reading a large file, querying a database — these operations spend most of their time doing nothing, just waiting for a response. While one request is waiting, your program sits idle.

Asynchronous programming solves this. Instead of waiting, Python can switch to something else useful while the current operation is pending, then come back when it is ready.


The problem — synchronous waiting

import time

def fetch_user(user_id):
    print(f"Fetching user {user_id}...")
    time.sleep(2)   # simulates a network request taking 2 seconds
    return {"id": user_id, "name": f"User {user_id}"}

def main():
    start = time.time()

    user1 = fetch_user(1)   # waits 2 seconds
    user2 = fetch_user(2)   # waits another 2 seconds
    user3 = fetch_user(3)   # waits another 2 seconds

    print(f"Done in {time.time() - start:.1f}s")

main()

Output:

Fetching user 1...
Fetching user 2...
Fetching user 3...
Done in 6.0s

6 seconds — because each request waits for the previous one. But all three requests are independent. There is no reason they cannot happen at the same time.


The solution — async programming

import asyncio
import time

async def fetch_user(user_id):
    print(f"Fetching user {user_id}...")
    await asyncio.sleep(2)   # simulates waiting — but does not block
    return {"id": user_id, "name": f"User {user_id}"}

async def main():
    start = time.time()

    user1, user2, user3 = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3),
    )

    print(f"Done in {time.time() - start:.1f}s")

asyncio.run(main())

Output:

Fetching user 1...
Fetching user 2...
Fetching user 3...
Done in 2.0s

2 seconds — all three requests ran concurrently. The total time is the duration of the longest one, not the sum of all three.


The event loop

The event loop is the engine that makes async work. It is a loop that runs tasks and checks — when a task is waiting for something, switch to another task. When the waiting is done, come back.

Event Loop:
1. Start task A
2. Task A hits await — it is waiting
3. Switch to task B
4. Task B hits await — it is waiting
5. Switch to task C
6. Task C finishes — return result
7. Task A's wait is done — resume it
8. Task A finishes
9. Task B's wait is done — resume it
10. Task B finishes

One thread. One process. But multiple things happening concurrently — because while one task waits, others run.

Async is concurrent, not parallel. Parallel means literally running at the same time on multiple CPU cores. Concurrent means switching between tasks so efficiently it feels simultaneous. For I/O-bound work (network, file, database), async concurrency is just as fast as parallelism.


async def — defining a coroutine

A function defined with async def is a coroutine function. Calling it returns a coroutine object — it does not run the function yet:

async def greet(name: str) -> str:
    return f"Hello, {name}!"

result = greet("Ali")   # does NOT run the function
print(result)           # <coroutine object greet at 0x...>

To actually run a coroutine, you need to await it or run it with asyncio.run():

result = await greet("Ali")   # runs it and gets the result

await — pause and wait for a result

await can only be used inside an async def function. It pauses the current coroutine until the awaited thing completes — but crucially, it does not block the event loop. Other tasks can run while this one waits.

import asyncio

async def step_one():
    print("Step one starting")
    await asyncio.sleep(1)   # wait 1 second — non-blocking
    print("Step one done")
    return "result from step one"

async def main():
    result = await step_one()
    print(result)

asyncio.run(main())

Output:

Step one starting
Step one done
result from step one

asyncio.run() — the entry point

asyncio.run() is how you start the event loop and run a coroutine from synchronous code. You call it once at the top level:

import asyncio

async def main():
    print("Hello from async!")

asyncio.run(main())

Never call asyncio.run() inside an already-running event loop — for example, inside another coroutine. It will raise a RuntimeError. Inside async code, just await coroutines directly.


asyncio.sleep() — non-blocking pause

asyncio.sleep() is the async version of time.sleep(). It pauses the current coroutine without blocking anything else:

import asyncio

async def task(name: str, delay: float):
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} finished after {delay}s")

async def main():
    await asyncio.gather(
        task("A", 3),
        task("B", 1),
        task("C", 2),
    )

asyncio.run(main())

Output:

A started
B started
C started
B finished after 1s
C finished after 2s
A finished after 3s

All three started immediately. They finished in order of their delay — not the order they were started. Total time: 3 seconds, not 6.


asyncio.gather() — run multiple coroutines concurrently

gather() runs multiple coroutines at the same time and waits for all of them to finish. Returns a list of results in the same order as the inputs:

import asyncio

async def fetch_data(source: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"Data from {source}"

async def main():
    results = await asyncio.gather(
        fetch_data("database", 2),
        fetch_data("API", 1),
        fetch_data("cache", 0.5),
    )

    for result in results:
        print(result)

asyncio.run(main())

Output:

Data from database
Data from API
Data from cache

Results come back in the original order — even though "cache" finished first.

Handling errors in gather:

By default, if one coroutine raises an exception, gather() cancels the others and re-raises the exception. Use return_exceptions=True to collect exceptions as results instead:

async def might_fail(n: int) -> int:
    if n == 2:
        raise ValueError(f"Failed on {n}")
    return n * 10

async def main():
    results = await asyncio.gather(
        might_fail(1),
        might_fail(2),
        might_fail(3),
        return_exceptions=True
    )
    print(results)   # [10, ValueError('Failed on 2'), 30]

asyncio.run(main())

asyncio.create_task() — fire and forget

create_task() schedules a coroutine to run concurrently without waiting for it immediately. You can continue doing other work and collect the result later:

import asyncio

async def background_job(name: str) -> str:
    print(f"{name} started in background")
    await asyncio.sleep(2)
    return f"{name} complete"

async def main():
    # schedule tasks — they start running immediately
    task1 = asyncio.create_task(background_job("Task A"))
    task2 = asyncio.create_task(background_job("Task B"))

    print("Tasks are running in background...")
    print("Doing other work here...")

    # now wait for the results
    result1 = await task1
    result2 = await task2

    print(result1)
    print(result2)

asyncio.run(main())

Output:

Task A started in background
Task B started in background
Tasks are running in background...
Doing other work here...
Task A complete
Task B complete

The tasks start running as soon as they are created — not when you await them.


asyncio.wait_for() — add a timeout

Cancel a coroutine if it takes too long:

import asyncio

async def slow_operation() -> str:
    await asyncio.sleep(10)
    return "finally done"

async def main():
    try:
        result = await asyncio.wait_for(slow_operation(), timeout=3.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out after 3 seconds")

asyncio.run(main())

Output:

Operation timed out after 3 seconds

asyncio.timeout() — modern timeout (Python 3.11+)

Python 3.11 introduced a cleaner context manager for timeouts:

import asyncio

async def main():
    try:
        async with asyncio.timeout(3.0):
            await asyncio.sleep(10)
    except TimeoutError:
        print("Timed out")

asyncio.run(main())

Async context managers

Some resources need async setup and teardown. Use async with:

import asyncio

class AsyncDatabaseConnection:
    async def __aenter__(self):
        print("Connecting to database...")
        await asyncio.sleep(0.1)   # simulate connection time
        print("Connected")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.1)
        return False

    async def fetch(self, query: str) -> list:
        await asyncio.sleep(0.2)   # simulate query time
        return [{"id": 1, "name": "Ali"}]


async def main():
    async with AsyncDatabaseConnection() as db:
        results = await db.fetch("SELECT * FROM users")
        print(results)

asyncio.run(main())

Output:

Connecting to database...
Connected
[{'id': 1, 'name': 'Ali'}]
Closing connection

Async iterators and async for

You can iterate over async sources with async for:

import asyncio

class AsyncCounter:
    def __init__(self, start: int, end: int):
        self.current = start
        self.end = end

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current > self.end:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)   # simulate async work
        value = self.current
        self.current += 1
        return value


async def main():
    async for number in AsyncCounter(1, 5):
        print(number)

asyncio.run(main())

Output:

1
2
3
4
5

async generators

Combine async def with yield:

import asyncio

async def fetch_pages(total_pages: int):
    for page in range(1, total_pages + 1):
        await asyncio.sleep(0.5)   # simulate fetching each page
        yield {"page": page, "data": f"Content of page {page}"}


async def main():
    async for page in fetch_pages(4):
        print(f"Page {page['page']}: {page['data']}")

asyncio.run(main())

Output:

Page 1: Content of page 1
Page 2: Content of page 2
Page 3: Content of page 3
Page 4: Content of page 4

When to use async — and when not to

SituationUse async?
Making many HTTP requestsYes
Querying a database many timesYes
Reading/writing many filesYes
Waiting for user inputYes
Heavy CPU computation — sorting, mathNo — use multiprocessing
Simple scripts with one or two operationsNo — sync is simpler
Working with libraries that are not asyncNo — they will block anyway

Async shines for I/O-bound work — anything that involves waiting. It does not help for CPU-bound work — heavy computation that keeps the CPU fully busy. For that, use multiprocessing or concurrent.futures.ProcessPoolExecutor.


A real example — fetching multiple URLs concurrently

Using aiohttp — the standard async HTTP library:

pip install aiohttp
import asyncio
import aiohttp
import time

async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
    print(f"Fetching {url}")
    async with session.get(url) as response:
        data = await response.json()
        print(f"Got response from {url}")
        return data

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
        "https://jsonplaceholder.typicode.com/posts/4",
        "https://jsonplaceholder.typicode.com/posts/5",
    ]

    start = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"\nFetched {len(results)} URLs in {elapsed:.2f}s")
    for result in results:
        print(f"Post {result['id']}: {result['title'][:40]}")

asyncio.run(main())

Output:

Fetching https://jsonplaceholder.typicode.com/posts/1
Fetching https://jsonplaceholder.typicode.com/posts/2
...
Got response from posts/3
Got response from posts/1
...
Fetched 5 URLs in 0.43s
Post 1: sunt aut facere repellat provident occae...
Post 2: qui est esse...

All 5 requests ran concurrently. What would have taken ~2.5 seconds synchronously took 0.43 seconds.


Summary

ConceptExample
Define a coroutineasync def func():
Await a coroutineresult = await func()
Run the event loopasyncio.run(main())
Non-blocking pauseawait asyncio.sleep(n)
Run concurrentlyawait asyncio.gather(a(), b())
Schedule a taskasyncio.create_task(func())
Add a timeoutawait asyncio.wait_for(func(), timeout=5)
Modern timeoutasync with asyncio.timeout(5):
Async context managerasync with resource as r:
Async iterationasync for item in source:

On this page