Async Python Patterns
Expert guidance for implementing asynchronous Python applications using asyncio, concurrent programming patterns, and async/await for building high-performance, non-blocking systems.
When to Use This Skill
- Building async web APIs (FastAPI, aiohttp, Sanic)
- Implementing concurrent I/O operations (database, file, network)
- Creating web scrapers with concurrent requests
- Developing real-time applications (WebSocket servers, chat systems)
- Processing multiple independent tasks simultaneously
- Optimizing I/O-bound workloads requiring parallelism
- Implementing async background tasks and task queues
Core Patterns
1. Basic Async/Await
Foundation for all async operations:
import asyncio
async def fetch_data(url: str) -> dict:
"""Fetch data asynchronously."""
await asyncio.sleep(1) # Simulate I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
Key concepts:
async defdefines coroutines (pausable functions)awaityields control back to event loopasyncio.run()is the entry point (Python 3.7+)- Single-threaded cooperative multitasking
2. Concurrent Execution with gather()
Execute multiple operations simultaneously:
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Fetch multiple users concurrently."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
# Speed: Sequential = 5s, Concurrent = 0.5s for 10 users
When to use:
- Independent operations that can run in parallel
- I/O-bound tasks (API calls, database queries)
- Need all results before proceeding
- Use
return_exceptions=Trueto handle partial failures
3. Task Creation and Management
Background tasks that run independently:
import asyncio
async def background_task(name: str, delay: int):
print(f"{name} started")
await asyncio.sleep(delay)
return f"Result from {name}"
async def main():
# Create tasks (starts execution immediately)
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Do other work while tasks run
print("Doing other work")
await asyncio.sleep(0.5)
# Wait for results when needed
result1, result2 = await task1, await task2
Differences:
await coroutine()- Waits immediatelyasyncio.create_task()- Starts background execution- Tasks can be cancelled with
task.cancel()
4. Error Handling
Robust error handling for concurrent operations:
import asyncio
from typing import List, Optional
async def safe_operation(item_id: int) -> Optional[dict]:
try:
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
# gather with return_exceptions=True continues on errors
results = await asyncio.gather(
*[safe_operation(iid) for iid in item_ids],
return_exceptions=True
)
successful = [r for r in results if r and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
5. Timeout Handling
Prevent operations from hanging indefinitely:
import asyncio
async def with_timeout():
try:
result = await asyncio.wait_for(
slow_operation(5),
timeout=2.0
)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
# Handle timeout (retry, fallback, etc.)
Advanced Patterns
6. Async Context Managers
Proper resource management with async operations:
import asyncio
class AsyncDatabaseConnection:
"""Async database connection with automatic cleanup."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simulate connection
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simulate cleanup
self.connection = None
async def query_database():
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
# Connection automatically closed on exit
return await perform_query(conn)
Use cases:
- Database connections (asyncpg, motor)
- HTTP sessions (aiohttp.ClientSession)
- File I/O (aiofiles)
- Locks and semaphores
7. Async Iterators and Generators
Stream data asynchronously:
import asyncio
from typing import AsyncIterator
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Fetch paginated data lazily."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # API call
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def process_stream():
async for page_data in fetch_pages("https://api.example.com", 10):
# Process each page as it arrives (memory efficient)
print(f"Processing page {page_data['page']}")
Benefits:
- Memory efficient for large datasets
- Start processing before all data arrives
- Natural backpressure handling
8. Producer-Consumer with Queues
Coordinate work between producers and consumers:
import asyncio
from asyncio import Queue
async def producer(queue: Queue, producer_id: int, num_items: int):
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
await asyncio.sleep(0.1)
await queue.put(None) # Signal completion
async def consumer(queue: Queue, consumer_id: int):
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def run_pipeline():
queue = Queue(maxsize=10)
# 2 producers, 3 consumers
producers = [asyncio.create_task(producer(queue, i, 5)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
await asyncio.gather(*producers)
await queue.join() # Wait for all items processed
for c in consumers:
c.cancel()
9. Rate Limiting with Semaphores
Control concurrent operations:
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore: # Only N operations at once
print(f"Calling {url}")
await asyncio.sleep(0.5)
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
return await asyncio.gather(*tasks)
# Limits to 5 concurrent requests regardless of total URLs
Use cases:
- API rate limiting (respect API quotas)
- Database connection limits
- File descriptor limits
- Memory-constrained operations
10. Async Locks for Shared State
Thread-safe operations in async context:
import asyncio
class AsyncCounter:
def __init__(self):
self.va