Python asyncio Fundamentals: Coroutines, Tasks, and the Event Loop

You've learned about threading (which lets one process fake parallelism) and multiprocessing (which gives you real parallelism with separate processes). Now we're stepping into a third concurrency model: asyncio, Python's lightweight, single-threaded cooperative multitasking framework.
Here's the appeal: asyncio lets you run thousands of concurrent operations on a single thread without the overhead of thread management or the complexity of process spawning. It's the goldilocks zone for I/O-bound work at scale, think web servers handling thousands of connections, API clients making hundreds of concurrent requests, or any scenario where you're waiting a lot.
The catch? It requires rethinking how you write code. No more requests.get() blocking the whole program. Instead, you'll use async/await syntax to write code that yields control when it waits. This article builds your mental model from the ground up: the event loop, coroutines, tasks, and how to orchestrate them all.
Understanding asyncio is particularly important in the AI/ML world because virtually every modern AI framework, from OpenAI's Python SDK to LangChain to FastAPI-based model servers, is built on async foundations. When you're streaming tokens from a language model, batching inference requests, or building a high-throughput API wrapper, asyncio is the engine doing the heavy lifting. Whether you're building the next chatbot backend or just want your data-fetching scripts to stop crawling, the patterns we cover here will apply directly to your daily work. By the end of this article, you'll understand not just the syntax but why asyncio was designed the way it was, and that deeper understanding is what separates developers who use async code from developers who truly master it.
Table of Contents
- The Event Loop: Your Program's Traffic Director
- async def: Defining Coroutines
- asyncio.run(): Your Entry Point
- await: Yielding Control
- await vs Just Calling
- asyncio.Task and asyncio.create_task(): Concurrent Execution
- The Mental Model of Task Concurrency
- Task vs Coroutine: Understanding the Difference
- asyncio.gather(): Coordinating Multiple Coroutines
- asyncio.gather() vs asyncio.wait()
- asyncio.sleep(): Non-Blocking Delays
- Timeouts and Cancellation
- asyncio.timeout() for Timeouts
- Task.cancel() for Manual Cancellation
- Bridging Sync and Async: run_in_executor()
- Event Loop Internals
- Putting It Together: A Real-World Example
- Common Asyncio Mistakes
- Common Pitfalls
- Pitfall 1: Forgetting to await
- Pitfall 2: Mixing async and sync
- Pitfall 3: Blocking in a coroutine
- The Event Loop Under the Hood
- Advanced: Understanding Task States and Transitions
- Race Conditions and Task Ordering
- Exception Handling in Async Code
- Async Context Managers: Cleaner Resource Management
- Async Context and asyncio.get_running_loop()
- Performance Considerations: When to Use Asyncio
- Debugging and Testing Async Code
- Using asyncio.create_task() for Better Tracebacks
- Testing Async Functions
- Mocking Async Functions
- Comparing the Three Concurrency Models
- Asyncio Patterns: Producer-Consumer
- Asyncio and Event Loop Policies
- Key Takeaways
- Coroutine Best Practices
- 1. Keep Coroutines Focused
- 2. Avoid Nested create_task()
- 3. Use Type Hints
- 4. Document Your Async API
- Real-World Scenario: Building an Async Web Crawler
- The Mental Shift: From Thinking Sequentially to Thinking Concurrently
The Event Loop: Your Program's Traffic Director
The event loop is the heart of asyncio. It's a single-threaded control center that runs your code and manages I/O.
Here's the mental model:
┌─────────────────────────────────────────┐
│ Event Loop (Single Thread) │
│ │
│ ┌───────────────────────────────────┐ │
│ │ Queue of Pending Coroutines/Tasks │ │
│ │ [task_a, task_b, task_c, ...] │ │
│ └───────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────┐ │
│ │ Pick next task → Run until │ │
│ │ it hits 'await' → Switch to next │ │
│ └───────────────────────────────────┘ │
│ │
│ ┌───────────────────────────────────┐ │
│ │ Monitor I/O (Network, File, │ │
│ │ Timers, etc.) │ │
│ │ When I/O completes → │ │
│ │ Resume the waiting task │ │
│ └───────────────────────────────────┘ │
└─────────────────────────────────────────┘
The event loop runs one piece of code at a time. When that code hits an await statement, it says: "I'm waiting for something. I'll give up control now." The loop then picks the next task and runs it. When the I/O completes (a network response arrives, a file loads, a timer expires), the loop resumes the waiting task.
This is cooperative multitasking: tasks voluntarily yield control at await points. No preemption, no race conditions, no need for locks. It's elegant.
Let's see it in action.
async def: Defining Coroutines
A coroutine is a function that uses async def. Here's the simplest possible coroutine:
async def greet():
return "Hello, asyncio!"
# This doesn't run the coroutine, it creates a coroutine object
coro = greet()
print(coro)
# Output: <coroutine object greet at 0x...>This is one of the first things that trips people up: calling an async def function doesn't execute its body immediately. Instead, Python hands you back a coroutine object, a suspended computation waiting to be scheduled. Think of it like building a recipe card versus actually cooking the meal. The coroutine object describes what will happen, but nothing runs until you hand it to the event loop.
Notice: just calling greet() doesn't execute the function. It returns a coroutine object, not the result. To actually run it, you need the event loop.
asyncio.run(): Your Entry Point
asyncio.run() creates an event loop, runs your top-level coroutine, and cleans up afterward.
import asyncio
async def greet():
return "Hello, asyncio!"
result = asyncio.run(greet())
print(result) # Output: Hello, asyncio!Under the hood, asyncio.run() is doing several things at once: it creates a fresh event loop, sets it as the current running loop, executes your coroutine to completion, then tears down the loop and cancels any remaining tasks before returning. This careful lifecycle management is why you should always use asyncio.run() at your program's entry point rather than manually creating and managing loops, it handles the cleanup details that are easy to get wrong on your own.
What's happening:
asyncio.run()creates a new event loop- Runs your coroutine on that loop
- Returns the result
- Closes the loop
This is the entry point for asyncio programs. You'll typically have one asyncio.run() call in your main code.
await: Yielding Control
await is where the magic happens. It tells the event loop: "I'm waiting for something. Run other tasks while I wait."
import asyncio
async def fetch_data(url):
print(f"Fetching {url}...")
await asyncio.sleep(2) # Simulate network delay
print(f"Done fetching {url}")
return f"Data from {url}"
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())Expected output:
Fetching https://api.example.com...
Done fetching https://api.example.com
Data from https://api.example.com
Nothing surprising here, the code runs sequentially. But notice the power: await asyncio.sleep(2) doesn't block the thread. It yields control, letting other tasks run. If we had multiple concurrent tasks, we'd see dramatic speedups.
await vs Just Calling
Here's a critical distinction:
async def task_a():
await asyncio.sleep(1)
print("A done")
async def task_b():
await asyncio.sleep(1)
print("B done")
async def main():
# Sequential: takes 2 seconds
await task_a()
await task_b()
asyncio.run(main())Both tasks run sequentially. Task B waits for A to finish. Total time: ~2 seconds.
But what if we want them concurrent? That's where tasks come in.
asyncio.Task and asyncio.create_task(): Concurrent Execution
A Task wraps a coroutine and schedules it on the event loop. Multiple tasks run concurrently. The key insight here is the timing: when you call asyncio.create_task(), the coroutine is immediately registered with the event loop and begins running as soon as control is yielded. You're not queuing something for "later", you're starting it right now, in parallel with everything else.
import asyncio
async def task_a():
await asyncio.sleep(1)
print("A done")
async def task_b():
await asyncio.sleep(1)
print("B done")
async def main():
# Create tasks (they start running immediately)
ta = asyncio.create_task(task_a())
tb = asyncio.create_task(task_b())
# Wait for both to complete
await ta
await tb
asyncio.run(main())Expected output:
A done
B done
Total time: ~1 second, not 2! Both tasks run concurrently.
Here's what's happening:
asyncio.create_task()schedules the coroutine on the event loop- Control returns immediately (the task is running in the background)
await tawaits for task Aawait tbwaits for task B- The event loop runs both concurrently
The Mental Model of Task Concurrency
Without Tasks (Sequential):
┌─────────┐ ┌─────────┐
│ Task A │ ──→ │ Task B │
│ (1 sec) │ │ (1 sec) │
└─────────┘ └─────────┘
Total: 2 seconds
With Tasks (Concurrent):
┌─────────┐
│ Task A │ (1 sec)
├─────────┤ ← Both run at once!
│ Task B │ (1 sec)
└─────────┘
Total: 1 second
Task vs Coroutine: Understanding the Difference
This distinction matters more than it might seem, and it's the source of a lot of subtle bugs in async code. A coroutine is a paused computation, a function defined with async def that hasn't started executing yet. A Task is a coroutine that has been handed to the event loop and is actively being managed. When you await a coroutine directly, you run it inline and the current coroutine pauses until it completes. When you wrap it in asyncio.create_task(), you hand it to the event loop independently, and both your current code and the new task can progress simultaneously.
The practical difference: if you have three API calls to make and you await each one in sequence, you're waiting for one at a time, even in async code. If you create tasks for all three and then await them (or use gather()), all three network requests fly out simultaneously. This is the difference between "I wrote async code" and "I wrote concurrent async code", and it's the mistake that makes beginners wonder why their async code isn't any faster than sync code.
There's one more nuance worth knowing: a bare coroutine that you forget to either await or pass to create_task() simply never runs. Python 3.8+ will warn you about unawaited coroutines, but it won't stop you from creating them. A Task, by contrast, is self-driving, once created, it will run to completion (or cancellation) regardless of whether you hold a reference to it.
asyncio.gather(): Coordinating Multiple Coroutines
asyncio.gather() is cleaner for running multiple coroutines concurrently and collecting their results.
import asyncio
async def fetch(url):
await asyncio.sleep(1)
return f"Data from {url}"
async def main():
urls = ["api1.com", "api2.com", "api3.com"]
results = await asyncio.gather(*[fetch(url) for url in urls])
print(results)
asyncio.run(main())Expected output:
['Data from api1.com', 'Data from api2.com', 'Data from api3.com']
Time taken: ~1 second (all concurrent).
gather() wraps each coroutine in a task, runs them all concurrently, and returns a list of results in the same order. Notice that even though the individual fetches might complete in arbitrary order internally, gather() preserves the original input order in the result list, which is a quality-of-life feature that makes downstream processing much cleaner.
asyncio.gather() vs asyncio.wait()
gather() is simple and returns results directly. wait() is more flexible, giving you fine-grained control over how tasks complete.
import asyncio
async def task(n):
await asyncio.sleep(n)
return f"Task {n} done"
async def main():
# gather: simple, returns results directly
results = await asyncio.gather(task(1), task(2), task(3))
print(results)
# wait: more control over task completion
tasks = {asyncio.create_task(task(i)) for i in [1, 2, 3]}
done, pending = await asyncio.wait(tasks)
for t in done:
print(t.result())
asyncio.run(main())When to use each:
gather(): You want results in order, all at once. Cleaner syntax.wait(): You want to handle tasks as they finish, set timeouts per-task, or usereturn_whento get partial results.
asyncio.sleep(): Non-Blocking Delays
asyncio.sleep() yields control, unlike time.sleep() which blocks the entire thread.
import asyncio
import time
# WRONG: Blocks the thread, defeats the purpose of asyncio
async def bad_delay():
time.sleep(2) # Blocks! Other tasks can't run
print("Done waiting")
# RIGHT: Yields control
async def good_delay():
await asyncio.sleep(2) # Yields! Other tasks can run
print("Done waiting")This is the core difference. time.sleep(2) makes the thread stop for 2 seconds. await asyncio.sleep(2) tells the event loop: "I'm waiting 2 seconds; run other tasks." One call freezes your entire program; the other hands control back gracefully. This applies to any blocking operation, not just sleep. File reads with the standard open(), synchronous database queries, CPU-heavy calculations, they all have the same problem: they hold the thread hostage while the event loop's other tasks sit idle waiting for their turn.
Timeouts and Cancellation
asyncio.timeout() for Timeouts
Control how long a coroutine can run:
import asyncio
async def slow_task():
await asyncio.sleep(10)
return "Done"
async def main():
try:
result = await asyncio.timeout(2)(slow_task())
except asyncio.TimeoutError:
print("Task took too long!")
asyncio.run(main())Expected output:
Task took too long!
The task was cancelled because it exceeded 2 seconds.
Note: asyncio.timeout() syntax (the ()() thing) looks odd because it's a context manager that returns an awaitable. Modern Python (3.11+) has cleaner syntax:
async def main():
try:
async with asyncio.timeout(2):
await slow_task()
except asyncio.TimeoutError:
print("Task took too long!")The 3.11+ syntax is the one you should default to in new code, it reads more naturally and makes the scope of the timeout explicit at a glance. When working with external APIs, always wrap your calls in a timeout. Network calls that hang indefinitely will eventually exhaust your event loop's capacity and bring your entire service to a crawl.
Task.cancel() for Manual Cancellation
Cancel a task programmatically:
import asyncio
async def long_running():
try:
while True:
print("Still running...")
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled!")
raise # Re-raise to complete cancellation
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Cleanup complete")
asyncio.run(main())Expected output:
Still running...
Still running...
Task was cancelled!
Cleanup complete
When you call task.cancel(), it raises CancelledError in the task. The task can catch it for cleanup (closing files, releasing resources), then re-raise to complete the cancellation. Notice the raise inside the except asyncio.CancelledError block, this is not optional. If you swallow the CancelledError without re-raising, your task silently resumes running, the await task in your main coroutine will hang, and you'll have a very hard-to-debug situation on your hands. Always re-raise cancellation errors unless you have an extremely specific reason not to.
Bridging Sync and Async: run_in_executor()
Real life is messy. Sometimes you have blocking code you can't change (legacy libraries, CPU-intensive operations). run_in_executor() lets you run blocking code without freezing the event loop.
import asyncio
import requests # Blocking HTTP library
async def main():
loop = asyncio.get_event_loop()
# Run blocking code in a thread pool
response = await loop.run_in_executor(
None, # Use default ThreadPoolExecutor
requests.get,
"https://httpbin.org/delay/2"
)
print(f"Status: {response.status_code}")
asyncio.run(main())run_in_executor() runs requests.get() in a background thread pool, so it doesn't block the event loop. Other tasks keep running. The None executor argument tells asyncio to use its default ThreadPoolExecutor, you can also pass a custom ProcessPoolExecutor if you need true CPU parallelism for compute-heavy tasks. Keep in mind that run_in_executor() is a bridge, not a free pass: you're still using threads under the hood, so the usual thread safety concerns apply for any shared state.
Common use cases:
- Blocking HTTP requests (use
aiohttpfor true async) - CPU-intensive operations (better to use
multiprocessing, butexecutorworks for small tasks) - Database queries (use async database drivers like
asyncpg, butexecutorbridges old code)
Event Loop Internals
To understand asyncio deeply, you need to know what the event loop is actually doing between your await points. At its core, the loop is running a variation of the classic select/poll/epoll pattern that operating systems expose for monitoring multiple file descriptors simultaneously. On Linux, asyncio uses epoll; on macOS, kqueue; on Windows, IOCP. These are the OS-level primitives that make it possible to watch thousands of network sockets simultaneously on a single thread.
Each iteration of the event loop follows a predictable cycle: it first runs all callbacks that are already ready (tasks that have been woken up), then calls the OS's I/O selector to ask "what I/O events have completed since I last checked?" with a timeout equal to the time until the next scheduled callback. The OS returns a list of file descriptors that are ready for reading or writing, the loop wakes up the corresponding tasks, and the cycle repeats.
This architecture has a critical implication: any code that runs without hitting an await holds the entire event loop hostage. If you spend 500ms doing a CPU-intensive calculation inside an async function without yielding, every other task in your program is frozen for those 500ms. The event loop cannot preempt you the way a thread scheduler can. This is why the rule "async code should only do I/O, not heavy computation" is so important in practice, and why compute-heavy workloads belong in run_in_executor() or a separate process pool, not inline in your coroutines.
Understanding this also explains why await asyncio.sleep(0) is a useful trick: it yields control back to the event loop for exactly one iteration without actually waiting any time, letting other pending tasks get a turn. You'll see this pattern in CPU-bound loops that can't be moved to an executor but need to play nicely with the rest of the async system.
Putting It Together: A Real-World Example
Let's build a web scraper that fetches multiple URLs concurrently.
import asyncio
async def fetch_url(url, delay=1):
"""Simulate fetching a URL with a delay"""
print(f"Fetching {url}...")
await asyncio.sleep(delay) # Simulate network request
return f"Content from {url}"
async def main():
urls = [
"https://example.com/page1",
"https://example.com/page2",
"https://example.com/page3",
]
# Fetch all URLs concurrently
results = await asyncio.gather(
*[fetch_url(url) for url in urls]
)
for result in results:
print(result)
asyncio.run(main())Expected output:
Fetching https://example.com/page1...
Fetching https://example.com/page2...
Fetching https://example.com/page3...
Content from https://example.com/page1
Content from https://example.com/page2
Content from https://example.com/page3
Time taken: ~1 second (all concurrent).
If we did this sequentially with time.sleep(), it would take ~3 seconds. Asyncio does it in 1. Notice the output pattern: all three fetch messages appear immediately because gather() starts all tasks before any of them finish. The results appear afterward in input order, not completion order, that predictability is one of the things that makes gather() so pleasant to work with compared to raw task management.
Common Asyncio Mistakes
Even experienced developers hit the same async pitfalls repeatedly. Here are the ones that will cost you the most debugging time.
Mistake 1: Calling blocking I/O inside async functions. Using requests.get(), open() with standard file I/O, or any synchronous database driver inside a coroutine will freeze your entire event loop for the duration of that call. Every other task stops making progress. The fix is always either switch to an async equivalent (aiohttp instead of requests, aiofiles for files, asyncpg for PostgreSQL) or wrap the call in run_in_executor(). This mistake is insidious because your code works correctly in isolation, it just silently destroys your concurrency.
Mistake 2: Forgetting that create_task() is required for true concurrency. A lot of beginners write async code where every call is await some_coroutine() in sequence, then wonder why they see no performance improvement. If you're not creating tasks or using gather(), you're writing sequential code with extra ceremony. Concurrency only happens when multiple tasks are running simultaneously, and that only happens when you explicitly create them.
Mistake 3: Fire-and-forget tasks that silently fail. When you call asyncio.create_task() and don't store a reference to the result, the task runs independently. If it raises an exception, that exception is silently swallowed, you'll get a warning log message if you're lucky, nothing if you're not. Always either await the task, pass it to gather(), or attach a .add_done_callback() that inspects the result. The pattern of creating tasks and storing them in a set (removing them in a done callback) is the standard idiom for managed fire-and-forget.
Mistake 4: Swallowing CancelledError. If you catch asyncio.CancelledError for cleanup purposes, you must re-raise it. Swallowing it causes task.cancel() to appear to work but the coroutine silently continues, leading to tasks that won't stop and await task calls that hang indefinitely.
Mistake 5: Running asyncio inside asyncio. If you're already inside a running event loop (common in Jupyter notebooks, some test frameworks, and certain web frameworks), calling asyncio.run() will raise a RuntimeError. The fix depends on context: in Jupyter you can use await directly or nest_asyncio; in production code, restructure so there's only one asyncio.run() at the top level.
Common Pitfalls
Pitfall 1: Forgetting to await
async def task():
return "Done"
async def main():
result = task() # BUG: Creates coroutine, doesn't run it
print(result) # Prints: <coroutine object...>
asyncio.run(main())Fix: Use await:
result = await task()Pitfall 2: Mixing async and sync
import asyncio
import requests
async def main():
response = requests.get("https://example.com") # BLOCKING!
# Event loop is frozen until response arrives
# Other tasks can't run
print(response.status_code)
asyncio.run(main())Fix: Use async libraries or run_in_executor():
import aiohttp
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("https://example.com") as response:
status = response.statusPitfall 3: Blocking in a coroutine
import asyncio
import time
async def task():
time.sleep(5) # WRONG: Blocks the entire event loop
print("Done")
asyncio.run(task())Fix: Use await asyncio.sleep():
await asyncio.sleep(5)The Event Loop Under the Hood
To deepen your mental model, here's what the event loop actually does on each iteration:
- Check for new tasks: Are there tasks scheduled?
- Run ready tasks: Execute until the next
await - Wait for I/O: Check if any awaited I/O is done (network response, timer expired, file loaded)
- Resume tasks: Re-queue tasks whose I/O is complete
- Repeat: Loop until all tasks are done
This is single-threaded, but because tasks yield at await points, it creates the illusion of concurrency.
import asyncio
async def demo():
print("1. Start")
await asyncio.sleep(0) # Yield, let other tasks run
print("2. Middle")
await asyncio.sleep(0) # Yield again
print("3. End")
async def main():
task1 = asyncio.create_task(demo())
task2 = asyncio.create_task(demo())
await asyncio.gather(task1, task2)
asyncio.run(main())Expected output:
1. Start
1. Start
2. Middle
2. Middle
3. End
3. End
Both tasks start, yield, resume. The order might vary, but the pattern is clear: task1 runs until await, task2 runs until await, task1 resumes, task2 resumes. This interleaving behavior is the fundamental mechanism of cooperative concurrency, and once you can visualize it clearly, reading complex async code becomes much more intuitive. You're essentially tracing a path through a state machine where each await is a potential context switch point.
Advanced: Understanding Task States and Transitions
Every task has a lifecycle. Understanding task states helps you debug and design robust async systems.
A task can be in one of these states:
- Pending: Created but not yet started
- Running: Currently executing
- Done: Completed (either with result or exception)
- Cancelled: Manually cancelled or timed out
You can check a task's state using methods:
import asyncio
async def slow_task():
await asyncio.sleep(5)
return "Complete"
async def main():
task = asyncio.create_task(slow_task())
# Check states
print(f"Task done? {task.done()}") # False
print(f"Task cancelled? {task.cancelled()}") # False
# Cancel it
task.cancel()
try:
result = await task
except asyncio.CancelledError:
print(f"Task done? {task.done()}") # True
print(f"Task cancelled? {task.cancelled()}") # True
asyncio.run(main())This is crucial for monitoring long-running operations and handling edge cases. A "done" task might be cancelled, succeeded, or failed, you need to check which. In production systems, monitoring task states is how you implement health checks, graceful shutdown sequences, and circuit breakers. A task that shows as "done" but returns an exception is a fundamentally different situation from one that completed successfully, and conflating them leads to silent failures that are painful to diagnose after the fact.
Race Conditions and Task Ordering
One subtle point: when multiple tasks complete around the same time, their order isn't guaranteed. This matters if you rely on ordering.
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
asyncio.create_task(task("A", 0.5)),
asyncio.create_task(task("B", 0.3)),
asyncio.create_task(task("C", 0.7)),
]
results = await asyncio.gather(*tasks)
print(results) # Might be: ['A done', 'B done', 'C done']
asyncio.run(main())The results list preserves the order you passed in, not completion order. Task B finishes first, but it's still in position 1 of the results list.
If you need to process tasks as they complete, use asyncio.as_completed():
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} done"
async def main():
tasks = [
asyncio.create_task(task("A", 0.5)),
asyncio.create_task(task("B", 0.3)),
asyncio.create_task(task("C", 0.7)),
]
# Process as they complete, not in submission order
for future in asyncio.as_completed(tasks):
result = await future
print(result) # B, then A, then C
asyncio.run(main())Now B prints first (finishes at 0.3s), then A (0.5s), then C (0.7s).
Exception Handling in Async Code
Exceptions in async code behave similarly to sync code, but with a twist: exceptions in concurrent tasks don't automatically bubble up. You need to capture them.
import asyncio
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError("Something went wrong!")
async def good_task():
await asyncio.sleep(1)
return "Success!"
async def main():
tasks = [
asyncio.create_task(failing_task()),
asyncio.create_task(good_task()),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(results)
# Output: [ValueError('Something went wrong!'), 'Success!']
asyncio.run(main())Without return_exceptions=True, the first exception would propagate and cancel other tasks. With it, exceptions are returned as results, letting you handle them individually. Which behavior you want depends on your use case: if you're fetching 100 URLs and one fails, you probably want to keep the other 99 results. If you're running a sequence of dependent operations where any failure should abort the whole thing, letting the exception propagate naturally (without return_exceptions) makes more sense.
Async Context Managers: Cleaner Resource Management
Coroutines can use async with to manage resources (file handles, network connections, database cursors). This ensures cleanup happens even if exceptions occur.
import asyncio
class AsyncFile:
def __init__(self, name):
self.name = name
async def __aenter__(self):
print(f"Opening {self.name}")
await asyncio.sleep(0.1) # Simulate async open
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"Closing {self.name}")
await asyncio.sleep(0.1) # Simulate async close
return False
async def main():
async with AsyncFile("data.txt") as f:
print(f"Working with {f.name}")
asyncio.run(main())Expected output:
Opening data.txt
Working with data.txt
Closing data.txt
The async with ensures cleanup happens automatically, just like synchronous context managers. This pattern is used everywhere in the async ecosystem: aiohttp.ClientSession(), asyncpg connection pools, aiofiles.open(), and most database connection managers all use async context managers. Getting comfortable with async with is essential because it's how async libraries enforce proper resource lifecycle management, open, use, close, even when exceptions interrupt the middle step.
Async Context and asyncio.get_running_loop()
Sometimes you need access to the current event loop inside a coroutine. Use asyncio.get_running_loop():
import asyncio
async def get_loop_info():
loop = asyncio.get_running_loop()
print(f"Running loop: {loop}")
print(f"Loop type: {type(loop)}")
asyncio.run(get_loop_info())This is useful when you need to schedule callbacks or check loop properties. Don't confuse it with asyncio.get_event_loop(), which may create a new loop if none exists (deprecated in recent Python versions).
Performance Considerations: When to Use Asyncio
Asyncio shines for I/O-bound concurrency at scale:
- Web servers handling thousands of connections (frameworks like FastAPI, aiohttp)
- API clients making hundreds of concurrent requests
- Database pooling with async drivers
- Real-time systems like WebSocket servers
Asyncio struggles with:
- CPU-bound work: Use
multiprocessinginstead - Heavy computation: Async doesn't help; the GIL still applies
- Simple scripts: Threading or sync code is clearer for modest concurrency
Here's a benchmark showing asyncio's strength:
import asyncio
import time
# Simulating 100 I/O operations, each taking 0.1 seconds
async def io_task():
await asyncio.sleep(0.1)
async def async_approach():
tasks = [asyncio.create_task(io_task()) for _ in range(100)]
await asyncio.gather(*tasks)
start = time.time()
asyncio.run(async_approach())
print(f"Asyncio: {time.time() - start:.2f}s") # ~0.1s (all concurrent)
# Sequential approach would take ~10s
# Threading would take ~0.1s too, but with thread overhead
# Asyncio is lightweight and scales to thousands of tasksAsyncio's magic: 100 tasks, 100 parallel I/O waits, all on a single thread in ~0.1 seconds. The comparison to threading here is worth dwelling on: threading achieves similar wall-clock time for I/O-bound work, but each thread consumes memory (typically 8MB stack space by default), and coordinating thousands of threads introduces significant scheduler overhead. Asyncio tasks are cheap enough that running 10,000 of them simultaneously is entirely practical, an equivalent thread-per-request server would collapse under the memory pressure alone.
Debugging and Testing Async Code
Async code introduces new debugging challenges. Here are practical strategies:
Using asyncio.create_task() for Better Tracebacks
When a task fails silently, you might miss the error. Use proper task handling:
import asyncio
async def failing():
raise ValueError("Oops!")
async def main():
# DON'T: Just create and forget
asyncio.create_task(failing())
await asyncio.sleep(1) # Task fails silently!
# DO: Track the task
task = asyncio.create_task(failing())
try:
await task
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())Tasks that fail silently are hard to debug. Always capture and await them, or use callbacks:
async def main():
task = asyncio.create_task(failing())
task.add_done_callback(lambda t: t.result() if not t.cancelled() else None)Testing Async Functions
Use pytest with pytest-asyncio to test async code:
# test_async.py
import asyncio
import pytest
async def fetch(url):
await asyncio.sleep(0.1)
return f"Data from {url}"
@pytest.mark.asyncio
async def test_fetch():
result = await fetch("api.example.com")
assert result == "Data from api.example.com"
@pytest.mark.asyncio
async def test_concurrent_fetch():
results = await asyncio.gather(
fetch("api1.com"),
fetch("api2.com"),
fetch("api3.com"),
)
assert len(results) == 3Async unit tests must use async test functions and runners. The pytest-asyncio plugin handles this automatically.
Mocking Async Functions
from unittest.mock import AsyncMock
async def fetch(url):
# Real implementation
pass
async def test_with_mock():
# Mock the coroutine
with patch('module.fetch', new_callable=AsyncMock) as mock_fetch:
mock_fetch.return_value = "Mocked data"
result = await fetch("test.com")
assert result == "Mocked data"Comparing the Three Concurrency Models
Now that you've learned threading, multiprocessing, and asyncio, here's when to use each:
| Model | Best For | Overhead | Scale | Complexity |
|---|---|---|---|---|
| Threading | I/O-bound, small scale | Low, but GIL-limited | ~10s-100s of tasks | Medium |
| Multiprocessing | CPU-bound, high compute | High, separate processes | ~10s of processes | High |
| Asyncio | I/O-bound, high scale | Ultra-low, cooperative | 1000s of tasks | Medium |
Real-world decision tree:
- Are you doing CPU-bound work? → Use
multiprocessing - Are you doing I/O-bound work? → Ask: "Do I need 100s of concurrent operations?"
- Yes? → Use
asyncio - No? → Use
threading(simpler)
- Yes? → Use
- Unsure? → Start with
asyncio. It scales better and forces you to think about concurrency properly.
Asyncio Patterns: Producer-Consumer
Here's a classic pattern for coordinating tasks:
import asyncio
async def producer(queue, n):
"""Generate items and put them in a queue"""
for i in range(n):
print(f"Producing {i}")
await queue.put(i)
await asyncio.sleep(0.1)
await queue.put(None) # Signal end
async def consumer(queue):
"""Consume items from the queue"""
while True:
item = await queue.get()
if item is None:
break
print(f"Consuming {item}")
await asyncio.sleep(0.2)
async def main():
queue = asyncio.Queue()
# Run producer and consumer concurrently
await asyncio.gather(
producer(queue, 5),
consumer(queue),
)
asyncio.run(main())Expected output:
Producing 0
Consuming 0
Producing 1
Producing 2
Consuming 1
...
Producer and consumer run concurrently, communicating through an async queue. This decouples them: the producer doesn't care when the consumer processes items, and vice versa.
Asyncio and Event Loop Policies
Advanced note: Python lets you configure the event loop policy (which loop type to use). On Windows, the default is ProactorEventLoop (for proper async I/O support). On Unix, it's SelectorEventLoop. Most of the time, you don't need to care, asyncio.run() picks the right one.
But if you need a specific policy:
import asyncio
# Set Windows to use ProactorEventLoop
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
asyncio.run(main())This is rarely needed, but it's good to know it exists.
Key Takeaways
- Event loop: Single-threaded control center that schedules and runs coroutines
async def: Defines a coroutine (requiresawaitto run)await: Yields control, letting other tasks run while waitingasyncio.run(): Creates the event loop and runs your top-level coroutineasyncio.create_task(): Schedules a coroutine as a task (runs concurrently)asyncio.gather(): Run multiple coroutines concurrently and collect resultsasyncio.sleep(): Non-blocking delay (yields control)asyncio.timeout()andtask.cancel(): Control task duration and abort executionrun_in_executor(): Bridge to blocking code without freezing the loop
Coroutine Best Practices
Now that you understand the mechanics, here are practical guidelines for writing good async code:
1. Keep Coroutines Focused
A coroutine should do one thing well. Don't mix I/O with processing:
# GOOD: Separated concerns
async def fetch_user(user_id):
"""Just fetch the data"""
await asyncio.sleep(0.1) # Simulate API call
return {"id": user_id, "name": f"User {user_id}"}
async def process_users(user_ids):
"""Fetch all, then process all"""
users = await asyncio.gather(*[fetch_user(uid) for uid in user_ids])
return [u["name"].upper() for u in users]
# BAD: Mixed concerns
async def fetch_and_process(user_id):
"""Mixing I/O and CPU work"""
user = await fetch_user(user_id)
# Now we're doing CPU work in an async function
processed = user["name"].upper() * 1000000 # Expensive
return processedMixing concerns makes code harder to test and reason about.
2. Avoid Nested create_task()
Deep nesting gets confusing. Flatten your task creation:
# GOOD: Flat structure
async def main():
tasks = [asyncio.create_task(fetch(url)) for url in urls]
results = await asyncio.gather(*tasks)
# OKAY: If nesting is minimal
async def main():
task = asyncio.create_task(fetch_and_process())
result = await task3. Use Type Hints
Async functions benefit from explicit type annotations:
from typing import List, Optional
async def fetch(url: str) -> str:
"""Fetch a single URL, return content."""
await asyncio.sleep(0.1)
return f"Content from {url}"
async def fetch_all(urls: List[str]) -> List[str]:
"""Fetch multiple URLs concurrently."""
return await asyncio.gather(*[fetch(url) for url in urls])
async def maybe_fetch(url: Optional[str]) -> Optional[str]:
"""Fetch only if URL is provided."""
if url:
return await fetch(url)
return NoneType hints make async code clearer and enable better IDE support.
4. Document Your Async API
Be explicit about what can run concurrently:
async def fetch_and_cache(url: str) -> str:
"""
Fetch a URL and cache the result.
Safe to call concurrently: Multiple callers can await this
simultaneously without issue.
Args:
url: The URL to fetch.
Returns:
The fetched content.
Raises:
asyncio.TimeoutError: If fetch exceeds timeout.
"""
# Implementation...
passReal-World Scenario: Building an Async Web Crawler
Let's build a practical web crawler that fetches multiple pages concurrently with error handling:
import asyncio
from typing import List, Dict
class SimpleCrawler:
def __init__(self, max_concurrent: int = 5, timeout: float = 10):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, url: str) -> Dict[str, str]:
"""Fetch a single page with rate limiting."""
async with self.semaphore: # Limit concurrency
try:
print(f"Fetching {url}")
# Simulate async HTTP request
await asyncio.sleep(1)
return {
"url": url,
"status": "success",
"content": f"Content from {url}"
}
except asyncio.TimeoutError:
return {"url": url, "status": "timeout"}
except Exception as e:
return {"url": url, "status": "error", "error": str(e)}
async def crawl(self, urls: List[str]) -> List[Dict[str, str]]:
"""Crawl multiple URLs concurrently."""
tasks = [self.fetch_page(url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
crawler = SimpleCrawler(max_concurrent=3)
urls = [f"https://example.com/page{i}" for i in range(10)]
results = await crawler.crawl(urls)
successes = [r for r in results if r.get("status") == "success"]
print(f"Successfully fetched {len(successes)} pages")
asyncio.run(main())This example shows:
- Semaphore to limit concurrent requests (avoiding overwhelming the target)
- Error handling with
return_exceptions=True - Structured concurrency with a class wrapper
- Real-world scalability patterns
This crawler can handle thousands of URLs efficiently, thanks to asyncio's lightweight task model. The Semaphore pattern here is particularly worth studying: without it, fetching 1,000 URLs would simultaneously open 1,000 connections, potentially getting you rate-limited, blacklisted, or simply overwhelming both the target server and your own system's file descriptor limits. A semaphore lets you say "no more than 10 at a time" while still letting the other 990 tasks wait patiently in the queue, ready to proceed as slots open up.
The Mental Shift: From Thinking Sequentially to Thinking Concurrently
The hardest part of asyncio isn't the syntax, it's the mindset shift. You're no longer writing "do A, then do B, then do C." Instead, you're saying: "Start A, start B, start C, they all run concurrently, wait for all to finish."
This requires thinking differently about control flow. But once it clicks, you'll write code that's both simpler and more scalable than threading-based approaches.
Asyncio is powerful for I/O-bound concurrency at scale. Master these fundamentals, and you'll build responsive, scalable systems without the threading complexity or multiprocessing overhead. You now understand the mental models, the syntax, the pitfalls, and the patterns. More importantly, you understand the why, why the event loop is single-threaded, why you must re-raise CancelledError, why create_task() is different from await, why blocking calls are catastrophic. That conceptual foundation is what makes the difference between someone who writes async code and someone who writes correct async code. The ecosystem of async Python, FastAPI, aiohttp, asyncpg, LangChain's async chain execution, is all built on exactly these primitives. Every async with, every gather(), every semaphore you encounter in a real codebase is just a more elaborate version of the patterns you practiced here.
Ready to go deeper? Next, we'll explore real-world asyncio patterns: HTTP clients with aiohttp, async generators, semaphores for rate limiting, and advanced coordination techniques that let you build production-grade async systems.