Async & Await
Learn how asynchronous programming works in Python using async def, await, and the event loop.
Async & Await
Most of the code you have written so far is synchronous — it runs line by line, top to bottom. Each line waits for the previous one to finish before starting.
That works fine for most things. But what about waiting? Fetching data from an API, reading a large file, querying a database — these operations spend most of their time doing nothing, just waiting for a response. While one request is waiting, your program sits idle.
Asynchronous programming solves this. Instead of waiting, Python can switch to something else useful while the current operation is pending, then come back when it is ready.
The problem — synchronous waiting
import time
def fetch_user(user_id):
print(f"Fetching user {user_id}...")
time.sleep(2) # simulates a network request taking 2 seconds
return {"id": user_id, "name": f"User {user_id}"}
def main():
start = time.time()
user1 = fetch_user(1) # waits 2 seconds
user2 = fetch_user(2) # waits another 2 seconds
user3 = fetch_user(3) # waits another 2 seconds
print(f"Done in {time.time() - start:.1f}s")
main()Output:
Fetching user 1...
Fetching user 2...
Fetching user 3...
Done in 6.0s6 seconds — because each request waits for the previous one. But all three requests are independent. There is no reason they cannot happen at the same time.
The solution — async programming
import asyncio
import time
async def fetch_user(user_id):
print(f"Fetching user {user_id}...")
await asyncio.sleep(2) # simulates waiting — but does not block
return {"id": user_id, "name": f"User {user_id}"}
async def main():
start = time.time()
user1, user2, user3 = await asyncio.gather(
fetch_user(1),
fetch_user(2),
fetch_user(3),
)
print(f"Done in {time.time() - start:.1f}s")
asyncio.run(main())Output:
Fetching user 1...
Fetching user 2...
Fetching user 3...
Done in 2.0s2 seconds — all three requests ran concurrently. The total time is the duration of the longest one, not the sum of all three.
The event loop
The event loop is the engine that makes async work. It is a loop that runs tasks and checks — when a task is waiting for something, switch to another task. When the waiting is done, come back.
Event Loop:
1. Start task A
2. Task A hits await — it is waiting
3. Switch to task B
4. Task B hits await — it is waiting
5. Switch to task C
6. Task C finishes — return result
7. Task A's wait is done — resume it
8. Task A finishes
9. Task B's wait is done — resume it
10. Task B finishesOne thread. One process. But multiple things happening concurrently — because while one task waits, others run.
Async is concurrent, not parallel. Parallel means literally running at the same time on multiple CPU cores. Concurrent means switching between tasks so efficiently it feels simultaneous. For I/O-bound work (network, file, database), async concurrency is just as fast as parallelism.
async def — defining a coroutine
A function defined with async def is a coroutine function. Calling it returns a coroutine object — it does not run the function yet:
async def greet(name: str) -> str:
return f"Hello, {name}!"
result = greet("Ali") # does NOT run the function
print(result) # <coroutine object greet at 0x...>To actually run a coroutine, you need to await it or run it with asyncio.run():
result = await greet("Ali") # runs it and gets the resultawait — pause and wait for a result
await can only be used inside an async def function. It pauses the current coroutine until the awaited thing completes — but crucially, it does not block the event loop. Other tasks can run while this one waits.
import asyncio
async def step_one():
print("Step one starting")
await asyncio.sleep(1) # wait 1 second — non-blocking
print("Step one done")
return "result from step one"
async def main():
result = await step_one()
print(result)
asyncio.run(main())Output:
Step one starting
Step one done
result from step oneasyncio.run() — the entry point
asyncio.run() is how you start the event loop and run a coroutine from synchronous code. You call it once at the top level:
import asyncio
async def main():
print("Hello from async!")
asyncio.run(main())Never call asyncio.run() inside an already-running event loop — for example, inside another coroutine. It will raise a RuntimeError. Inside async code, just await coroutines directly.
asyncio.sleep() — non-blocking pause
asyncio.sleep() is the async version of time.sleep(). It pauses the current coroutine without blocking anything else:
import asyncio
async def task(name: str, delay: float):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished after {delay}s")
async def main():
await asyncio.gather(
task("A", 3),
task("B", 1),
task("C", 2),
)
asyncio.run(main())Output:
A started
B started
C started
B finished after 1s
C finished after 2s
A finished after 3sAll three started immediately. They finished in order of their delay — not the order they were started. Total time: 3 seconds, not 6.
asyncio.gather() — run multiple coroutines concurrently
gather() runs multiple coroutines at the same time and waits for all of them to finish. Returns a list of results in the same order as the inputs:
import asyncio
async def fetch_data(source: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"Data from {source}"
async def main():
results = await asyncio.gather(
fetch_data("database", 2),
fetch_data("API", 1),
fetch_data("cache", 0.5),
)
for result in results:
print(result)
asyncio.run(main())Output:
Data from database
Data from API
Data from cacheResults come back in the original order — even though "cache" finished first.
Handling errors in gather:
By default, if one coroutine raises an exception, gather() cancels the others and re-raises the exception. Use return_exceptions=True to collect exceptions as results instead:
async def might_fail(n: int) -> int:
if n == 2:
raise ValueError(f"Failed on {n}")
return n * 10
async def main():
results = await asyncio.gather(
might_fail(1),
might_fail(2),
might_fail(3),
return_exceptions=True
)
print(results) # [10, ValueError('Failed on 2'), 30]
asyncio.run(main())asyncio.create_task() — fire and forget
create_task() schedules a coroutine to run concurrently without waiting for it immediately. You can continue doing other work and collect the result later:
import asyncio
async def background_job(name: str) -> str:
print(f"{name} started in background")
await asyncio.sleep(2)
return f"{name} complete"
async def main():
# schedule tasks — they start running immediately
task1 = asyncio.create_task(background_job("Task A"))
task2 = asyncio.create_task(background_job("Task B"))
print("Tasks are running in background...")
print("Doing other work here...")
# now wait for the results
result1 = await task1
result2 = await task2
print(result1)
print(result2)
asyncio.run(main())Output:
Task A started in background
Task B started in background
Tasks are running in background...
Doing other work here...
Task A complete
Task B completeThe tasks start running as soon as they are created — not when you await them.
asyncio.wait_for() — add a timeout
Cancel a coroutine if it takes too long:
import asyncio
async def slow_operation() -> str:
await asyncio.sleep(10)
return "finally done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out after 3 seconds")
asyncio.run(main())Output:
Operation timed out after 3 secondsasyncio.timeout() — modern timeout (Python 3.11+)
Python 3.11 introduced a cleaner context manager for timeouts:
import asyncio
async def main():
try:
async with asyncio.timeout(3.0):
await asyncio.sleep(10)
except TimeoutError:
print("Timed out")
asyncio.run(main())Async context managers
Some resources need async setup and teardown. Use async with:
import asyncio
class AsyncDatabaseConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(0.1) # simulate connection time
print("Connected")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1)
return False
async def fetch(self, query: str) -> list:
await asyncio.sleep(0.2) # simulate query time
return [{"id": 1, "name": "Ali"}]
async def main():
async with AsyncDatabaseConnection() as db:
results = await db.fetch("SELECT * FROM users")
print(results)
asyncio.run(main())Output:
Connecting to database...
Connected
[{'id': 1, 'name': 'Ali'}]
Closing connectionAsync iterators and async for
You can iterate over async sources with async for:
import asyncio
class AsyncCounter:
def __init__(self, start: int, end: int):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current > self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1) # simulate async work
value = self.current
self.current += 1
return value
async def main():
async for number in AsyncCounter(1, 5):
print(number)
asyncio.run(main())Output:
1
2
3
4
5async generators
Combine async def with yield:
import asyncio
async def fetch_pages(total_pages: int):
for page in range(1, total_pages + 1):
await asyncio.sleep(0.5) # simulate fetching each page
yield {"page": page, "data": f"Content of page {page}"}
async def main():
async for page in fetch_pages(4):
print(f"Page {page['page']}: {page['data']}")
asyncio.run(main())Output:
Page 1: Content of page 1
Page 2: Content of page 2
Page 3: Content of page 3
Page 4: Content of page 4When to use async — and when not to
| Situation | Use async? |
|---|---|
| Making many HTTP requests | Yes |
| Querying a database many times | Yes |
| Reading/writing many files | Yes |
| Waiting for user input | Yes |
| Heavy CPU computation — sorting, math | No — use multiprocessing |
| Simple scripts with one or two operations | No — sync is simpler |
| Working with libraries that are not async | No — they will block anyway |
Async shines for I/O-bound work — anything that involves waiting. It does not help for CPU-bound work — heavy computation that keeps the CPU fully busy. For that, use multiprocessing or concurrent.futures.ProcessPoolExecutor.
A real example — fetching multiple URLs concurrently
Using aiohttp — the standard async HTTP library:
pip install aiohttpimport asyncio
import aiohttp
import time
async def fetch_url(session: aiohttp.ClientSession, url: str) -> dict:
print(f"Fetching {url}")
async with session.get(url) as response:
data = await response.json()
print(f"Got response from {url}")
return data
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
"https://jsonplaceholder.typicode.com/posts/4",
"https://jsonplaceholder.typicode.com/posts/5",
]
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nFetched {len(results)} URLs in {elapsed:.2f}s")
for result in results:
print(f"Post {result['id']}: {result['title'][:40]}")
asyncio.run(main())Output:
Fetching https://jsonplaceholder.typicode.com/posts/1
Fetching https://jsonplaceholder.typicode.com/posts/2
...
Got response from posts/3
Got response from posts/1
...
Fetched 5 URLs in 0.43s
Post 1: sunt aut facere repellat provident occae...
Post 2: qui est esse...All 5 requests ran concurrently. What would have taken ~2.5 seconds synchronously took 0.43 seconds.
Summary
| Concept | Example |
|---|---|
| Define a coroutine | async def func(): |
| Await a coroutine | result = await func() |
| Run the event loop | asyncio.run(main()) |
| Non-blocking pause | await asyncio.sleep(n) |
| Run concurrently | await asyncio.gather(a(), b()) |
| Schedule a task | asyncio.create_task(func()) |
| Add a timeout | await asyncio.wait_for(func(), timeout=5) |
| Modern timeout | async with asyncio.timeout(5): |
| Async context manager | async with resource as r: |
| Async iteration | async for item in source: |