install
source · Clone the upstream repo
git clone https://github.com/MacPhobos/research-mind
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/MacPhobos/research-mind "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/toolchains-python-async-asyncio" ~/.claude/skills/macphobos-research-mind-toolchains-python-async-asyncio && rm -rf "$T"
manifest:
.claude/skills/toolchains-python-async-asyncio/skill.mdsource content
Python asyncio - Async/Await Concurrency
Overview
Python's asyncio library enables writing concurrent code using async/await syntax. It's ideal for I/O-bound operations like HTTP requests, database queries, file operations, and WebSocket connections. asyncio provides non-blocking execution without the complexity of threading or multiprocessing.
Key Features:
- async/await syntax for readable concurrent code
- Event loop for managing concurrent operations
- Tasks for running multiple coroutines concurrently
- Primitives: locks, semaphores, events, queues
- HTTP client/server with aiohttp
- Database async support (asyncpg, aiomysql, motor)
- FastAPI async endpoints
- WebSocket support
- Background task management
Installation:
# asyncio is built-in (Python 3.7+) # Async HTTP client pip install aiohttp # Async HTTP requests (alternative) pip install httpx # Async database drivers pip install asyncpg aiomysql motor # PostgreSQL, MySQL, MongoDB # FastAPI with async support pip install fastapi uvicorn[standard] # Async testing pip install pytest-asyncio
Basic Async/Await Patterns
1. Simple Async Function
import asyncio async def hello(): """Basic async function (coroutine).""" print("Hello") await asyncio.sleep(1) # Async sleep (non-blocking) print("World") return "Done" # Run async function result = asyncio.run(hello()) print(result) # "Done"
Key Points:
defines a coroutine functionasync def
suspends execution until awaitable completesawait
is the entry point for async programsasyncio.run()- Coroutines must be awaited or scheduled
2. Multiple Concurrent Tasks
import asyncio import time async def task(name, duration): """Simulate async task.""" print(f"{name}: Starting (duration: {duration}s)") await asyncio.sleep(duration) print(f"{name}: Complete") return f"{name} result" async def run_concurrent(): """Run multiple tasks concurrently.""" start = time.time() # Sequential (slow) - 6 seconds total # result1 = await task("Task 1", 3) # result2 = await task("Task 2", 2) # result3 = await task("Task 3", 1) # Concurrent (fast) - 3 seconds total results = await asyncio.gather( task("Task 1", 3), task("Task 2", 2), task("Task 3", 1) ) elapsed = time.time() - start print(f"Total time: {elapsed:.2f}s") print(f"Results: {results}") asyncio.run(run_concurrent()) # Output: Total time: 3.00s (tasks ran concurrently)
3. Task Creation and Management
import asyncio async def background_task(name): """Long-running background task.""" for i in range(5): print(f"{name}: iteration {i}") await asyncio.sleep(1) return f"{name} complete" async def main(): # Create task (starts immediately) task1 = asyncio.create_task(background_task("Task-1")) task2 = asyncio.create_task(background_task("Task-2")) # Do other work while tasks run print("Main: doing other work") await asyncio.sleep(2) # Wait for tasks to complete result1 = await task1 result2 = await task2 print(f"Results: {result1}, {result2}") asyncio.run(main())
4. Error Handling in Async Code
import asyncio async def risky_operation(fail=False): """Operation that might fail.""" await asyncio.sleep(1) if fail: raise ValueError("Operation failed") return "Success" async def handle_errors(): # Individual try/except try: result = await risky_operation(fail=True) except ValueError as e: print(f"Caught error: {e}") result = "Fallback value" # Gather with error handling results = await asyncio.gather( risky_operation(fail=False), risky_operation(fail=True), risky_operation(fail=False), return_exceptions=True # Return exceptions instead of raising ) for i, result in enumerate(results): if isinstance(result, Exception): print(f"Task {i} failed: {result}") else: print(f"Task {i} succeeded: {result}") asyncio.run(handle_errors())
Event Loop Fundamentals
1. Event Loop Lifecycle
import asyncio # Modern approach (Python 3.7+) async def main(): print("Main coroutine") await asyncio.sleep(1) asyncio.run(main()) # Creates loop, runs main, closes loop # Manual loop management (advanced use cases) async def manual_example(): loop = asyncio.get_event_loop() # Schedule coroutine task = loop.create_task(some_coroutine()) # Schedule callback loop.call_later(5, callback_function) # Run until complete result = await task return result # Get current event loop async def get_current_loop(): loop = asyncio.get_running_loop() print(f"Loop: {loop}") # Schedule callback in event loop loop.call_soon(lambda: print("Callback executed")) await asyncio.sleep(0) # Let callback execute
2. Loop Scheduling and Callbacks
import asyncio from datetime import datetime def callback(name, loop): """Callback function (not async).""" print(f"{datetime.now()}: {name} callback executed") # Stop loop after callback # loop.stop() async def schedule_callbacks(): loop = asyncio.get_running_loop() # Schedule immediate callback loop.call_soon(callback, "Immediate", loop) # Schedule callback after delay loop.call_later(2, callback, "Delayed 2s", loop) # Schedule callback at specific time loop.call_at(loop.time() + 3, callback, "Delayed 3s", loop) # Wait for callbacks to execute await asyncio.sleep(5) asyncio.run(schedule_callbacks())
3. Running Blocking Code
import asyncio import time def blocking_io(): """CPU-intensive or blocking I/O operation.""" print("Blocking operation started") time.sleep(2) # Blocks thread print("Blocking operation complete") return "Blocking result" async def run_in_executor(): """Run blocking code in thread pool.""" loop = asyncio.get_running_loop() # Run in default executor (thread pool) result = await loop.run_in_executor( None, # Use default executor blocking_io ) print(f"Result: {result}") # Run blocking operations concurrently async def concurrent_blocking(): loop = asyncio.get_running_loop() # These run in thread pool, don't block event loop results = await asyncio.gather( loop.run_in_executor(None, blocking_io), loop.run_in_executor(None, blocking_io), loop.run_in_executor(None, blocking_io) ) print(f"All results: {results}") asyncio.run(concurrent_blocking())
Asyncio Primitives
1. Locks for Mutual Exclusion
import asyncio # Shared resource counter = 0 lock = asyncio.Lock() async def increment_with_lock(name): """Increment counter with lock protection.""" global counter async with lock: # Critical section - only one task at a time print(f"{name}: acquired lock") current = counter await asyncio.sleep(0.1) # Simulate processing counter = current + 1 print(f"{name}: released lock, counter={counter}") async def increment_without_lock(name): """Increment without lock - race condition!""" global counter current = counter await asyncio.sleep(0.1) # Race condition window counter = current + 1 print(f"{name}: counter={counter}") async def test_locks(): global counter # Without lock (race condition) counter = 0 await asyncio.gather( increment_without_lock("Task-1"), increment_without_lock("Task-2"), increment_without_lock("Task-3") ) print(f"Without lock: {counter}") # Often wrong (< 3) # With lock (correct) counter = 0 await asyncio.gather( increment_with_lock("Task-1"), increment_with_lock("Task-2"), increment_with_lock("Task-3") ) print(f"With lock: {counter}") # Always 3 asyncio.run(test_locks())
2. Semaphores for Resource Limiting
import asyncio # Limit concurrent operations semaphore = asyncio.Semaphore(2) # Max 2 concurrent async def limited_operation(name): """Operation limited by semaphore.""" print(f"{name}: waiting for semaphore") async with semaphore: print(f"{name}: acquired semaphore") await asyncio.sleep(2) # Simulate work print(f"{name}: releasing semaphore") async def test_semaphore(): # Create 5 tasks, but only 2 run concurrently await asyncio.gather( limited_operation("Task-1"), limited_operation("Task-2"), limited_operation("Task-3"), limited_operation("Task-4"), limited_operation("Task-5") ) asyncio.run(test_semaphore()) # Only 2 tasks hold semaphore at any time
3. Events for Signaling
import asyncio event = asyncio.Event() async def waiter(name): """Wait for event to be set.""" print(f"{name}: waiting for event") await event.wait() # Block until event is set print(f"{name}: event received!") async def setter(): """Set event after delay.""" await asyncio.sleep(2) print("Setter: setting event") event.set() # Wake up all waiters async def test_event(): # Create waiters await asyncio.gather( waiter("Waiter-1"), waiter("Waiter-2"), waiter("Waiter-3"), setter() ) asyncio.run(test_event())
4. Queues for Task Distribution
import asyncio import random async def producer(queue, name): """Produce items and add to queue.""" for i in range(5): item = f"{name}-item-{i}" await queue.put(item) print(f"{name}: produced {item}") await asyncio.sleep(random.uniform(0.1, 0.5)) # Signal completion await queue.put(None) async def consumer(queue, name): """Consume items from queue.""" while True: item = await queue.get() # Block until item available if item is None: # Shutdown signal queue.task_done() break print(f"{name}: consumed {item}") await asyncio.sleep(random.uniform(0.2, 0.8)) queue.task_done() async def test_queue(): queue = asyncio.Queue(maxsize=10) # Create producers and consumers await asyncio.gather( producer(queue, "Producer-1"), producer(queue, "Producer-2"), consumer(queue, "Consumer-1"), consumer(queue, "Consumer-2"), consumer(queue, "Consumer-3") ) # Wait for all items to be processed await queue.join() print("All tasks complete") asyncio.run(test_queue())
5. Condition Variables
import asyncio condition = asyncio.Condition() items = [] async def consumer(name): """Wait for items to be available.""" async with condition: # Wait until items are available await condition.wait_for(lambda: len(items) > 0) item = items.pop(0) print(f"{name}: consumed {item}") async def producer(name): """Add items and notify consumers.""" async with condition: item = f"{name}-item" items.append(item) print(f"{name}: produced {item}") # Notify one waiting consumer condition.notify(n=1) # Or notify all: condition.notify_all() async def test_condition(): await asyncio.gather( consumer("Consumer-1"), consumer("Consumer-2"), producer("Producer-1"), producer("Producer-2") ) asyncio.run(test_condition())
Async HTTP with aiohttp
1. Basic HTTP Client
import asyncio import aiohttp async def fetch_url(session, url): """Fetch single URL.""" async with session.get(url) as response: status = response.status text = await response.text() return {"url": url, "status": status, "length": len(text)} async def fetch_multiple_urls(): """Fetch multiple URLs concurrently.""" urls = [ "https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/json", ] async with aiohttp.ClientSession() as session: # Concurrent requests tasks = [fetch_url(session, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(f"{result['url']}: {result['status']} ({result['length']} bytes)") asyncio.run(fetch_multiple_urls())
2. HTTP Client with Error Handling
import asyncio import aiohttp from typing import Dict, Any async def fetch_with_retry( session: aiohttp.ClientSession, url: str, max_retries: int = 3 ) -> Dict[str, Any]: """Fetch URL with retry logic.""" for attempt in range(max_retries): try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: response.raise_for_status() # Raise for 4xx/5xx data = await response.json() return {"success": True, "data": data} except aiohttp.ClientError as e: print(f"Attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: return {"success": False, "error": str(e)} # Exponential backoff await asyncio.sleep(2 ** attempt) except asyncio.TimeoutError: print(f"Attempt {attempt + 1} timed out") if attempt == max_retries - 1: return {"success": False, "error": "Timeout"} await asyncio.sleep(2 ** attempt) async def parallel_api_calls(): """Make parallel API calls with error handling.""" urls = [ "https://httpbin.org/json", "https://httpbin.org/status/500", # Will fail "https://httpbin.org/delay/1", ] async with aiohttp.ClientSession() as session: results = await asyncio.gather( *[fetch_with_retry(session, url) for url in urls], return_exceptions=True # Don't stop on errors ) for url, result in zip(urls, results): if isinstance(result, Exception): print(f"{url}: Exception - {result}") elif result["success"]: print(f"{url}: Success") else: print(f"{url}: Failed - {result['error']}") asyncio.run(parallel_api_calls())
3. HTTP Server with aiohttp
from aiohttp import web import asyncio async def handle_hello(request): """Simple GET handler.""" name = request.query.get("name", "World") return web.json_response({"message": f"Hello, {name}!"}) async def handle_post(request): """POST handler with JSON body.""" data = await request.json() # Simulate async processing await asyncio.sleep(1) return web.json_response({ "received": data, "status": "processed" }) async def handle_stream(request): """Streaming response.""" response = web.StreamResponse() await response.prepare(request) for i in range(10): await response.write(f"Chunk {i}\n".encode()) await asyncio.sleep(0.5) await response.write_eof() return response # Create application app = web.Application() app.router.add_get("/hello", handle_hello) app.router.add_post("/process", handle_post) app.router.add_get("/stream", handle_stream) # Run server if __name__ == "__main__": web.run_app(app, host="0.0.0.0", port=8080)
4. WebSocket Client
import asyncio import aiohttp async def websocket_client(): """Connect to WebSocket server.""" url = "wss://echo.websocket.org" async with aiohttp.ClientSession() as session: async with session.ws_connect(url) as ws: # Send messages await ws.send_str("Hello WebSocket") await ws.send_json({"type": "greeting", "data": "test"}) # Receive messages async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: print(f"Received: {msg.data}") if msg.data == "close": await ws.close() break elif msg.type == aiohttp.WSMsgType.ERROR: print(f"Error: {ws.exception()}") break asyncio.run(websocket_client())
Async Database Operations
1. PostgreSQL with asyncpg
import asyncio import asyncpg async def database_operations(): """Async PostgreSQL operations.""" # Create connection pool pool = await asyncpg.create_pool( host="localhost", database="mydb", user="user", password="password", min_size=5, max_size=20 ) try: # Acquire connection from pool async with pool.acquire() as conn: # Execute query rows = await conn.fetch( "SELECT id, name, email FROM users WHERE active = $1", True ) for row in rows: print(f"User: {row['name']} ({row['email']})") # Insert data await conn.execute( "INSERT INTO users (name, email) VALUES ($1, $2)", "Alice", "alice@example.com" ) # Transaction async with conn.transaction(): await conn.execute("UPDATE users SET active = $1 WHERE id = $2", False, 1) await conn.execute("INSERT INTO audit_log (action) VALUES ($1)", "deactivate") finally: await pool.close() asyncio.run(database_operations())
2. MongoDB with motor
import asyncio from motor.motor_asyncio import AsyncIOMotorClient async def mongodb_operations(): """Async MongoDB operations.""" # Create client client = AsyncIOMotorClient("mongodb://localhost:27017") db = client.mydb collection = db.users try: # Insert document result = await collection.insert_one({ "name": "Alice", "email": "alice@example.com", "age": 30 }) print(f"Inserted ID: {result.inserted_id}") # Find documents cursor = collection.find({"age": {"$gte": 25}}) async for document in cursor: print(f"User: {document['name']}") # Update document await collection.update_one( {"name": "Alice"}, {"$set": {"age": 31}} ) # Aggregation pipeline pipeline = [ {"$match": {"age": {"$gte": 25}}}, {"$group": {"_id": None, "avg_age": {"$avg": "$age"}}} ] async for result in collection.aggregate(pipeline): print(f"Average age: {result['avg_age']}") finally: client.close() asyncio.run(mongodb_operations())
3. Connection Pool Pattern
import asyncio import asyncpg from typing import Optional class DatabasePool: """Async database connection pool manager.""" def __init__(self, dsn: str): self.dsn = dsn self.pool: Optional[asyncpg.Pool] = None async def connect(self): """Create connection pool.""" self.pool = await asyncpg.create_pool(self.dsn, min_size=5, max_size=20) async def close(self): """Close connection pool.""" if self.pool: await self.pool.close() async def execute(self, query: str, *args): """Execute query.""" async with self.pool.acquire() as conn: return await conn.execute(query, *args) async def fetch(self, query: str, *args): """Fetch multiple rows.""" async with self.pool.acquire() as conn: return await conn.fetch(query, *args) async def fetchrow(self, query: str, *args): """Fetch single row.""" async with self.pool.acquire() as conn: return await conn.fetchrow(query, *args) # Usage async def use_pool(): db = DatabasePool("postgresql://user:pass@localhost/mydb") await db.connect() try: # Execute operations rows = await db.fetch("SELECT * FROM users") for row in rows: print(row) finally: await db.close() asyncio.run(use_pool())
FastAPI Async Patterns
1. Async Endpoints
from fastapi import FastAPI, HTTPException import asyncio import httpx app = FastAPI() @app.get("/") async def root(): """Simple async endpoint.""" return {"message": "Hello World"} @app.get("/delay/{seconds}") async def delayed_response(seconds: int): """Endpoint with async delay.""" await asyncio.sleep(seconds) return {"message": f"Waited {seconds} seconds"} @app.get("/fetch") async def fetch_external(): """Fetch data from external API.""" async with httpx.AsyncClient() as client: response = await client.get("https://httpbin.org/json") return response.json() @app.get("/parallel") async def parallel_requests(): """Make parallel API calls.""" async with httpx.AsyncClient() as client: responses = await asyncio.gather( client.get("https://httpbin.org/delay/1"), client.get("https://httpbin.org/delay/2"), client.get("https://httpbin.org/json") ) return { "results": [r.json() for r in responses] }
2. Background Tasks
from fastapi import FastAPI, BackgroundTasks import asyncio app = FastAPI() async def send_email(email: str, message: str): """Simulate sending email.""" print(f"Sending email to {email}") await asyncio.sleep(5) # Simulate slow email service print(f"Email sent to {email}: {message}") @app.post("/send-notification") async def send_notification( email: str, message: str, background_tasks: BackgroundTasks ): """Send notification in background.""" # Add task to background background_tasks.add_task(send_email, email, message) # Return immediately return {"status": "notification queued"} # Alternative: manual task creation @app.post("/send-notification-manual") async def send_notification_manual(email: str, message: str): """Create background task manually.""" asyncio.create_task(send_email(email, message)) return {"status": "notification queued"}
3. Async Dependencies
from fastapi import FastAPI, Depends import asyncpg app = FastAPI() # Database pool (global) db_pool = None async def get_db(): """Dependency: database connection.""" async with db_pool.acquire() as conn: yield conn @app.on_event("startup") async def startup(): """Initialize database pool on startup.""" global db_pool db_pool = await asyncpg.create_pool( "postgresql://user:pass@localhost/mydb" ) @app.on_event("shutdown") async def shutdown(): """Close database pool on shutdown.""" await db_pool.close() @app.get("/users/{user_id}") async def get_user(user_id: int, conn=Depends(get_db)): """Get user with async database dependency.""" user = await conn.fetchrow( "SELECT * FROM users WHERE id = $1", user_id ) if not user: raise HTTPException(status_code=404, detail="User not found") return dict(user)
4. WebSocket Endpoints
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from typing import List import asyncio app = FastAPI() # Active connections active_connections: List[WebSocket] = [] @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint.""" await websocket.accept() active_connections.append(websocket) try: while True: # Receive message data = await websocket.receive_text() # Broadcast to all connections for connection in active_connections: await connection.send_text(f"Broadcast: {data}") except WebSocketDisconnect: active_connections.remove(websocket) print("Client disconnected") # Background task to send periodic updates async def broadcast_updates(): """Send periodic updates to all clients.""" while True: await asyncio.sleep(5) for connection in active_connections: try: await connection.send_text("Periodic update") except: active_connections.remove(connection) @app.on_event("startup") async def startup(): """Start background update task.""" asyncio.create_task(broadcast_updates())
Common Patterns and Best Practices
1. Timeout Handling
import asyncio async def slow_operation(): """Slow operation.""" await asyncio.sleep(10) return "Result" async def with_timeout(): """Run operation with timeout.""" try: result = await asyncio.wait_for(slow_operation(), timeout=5.0) print(f"Result: {result}") except asyncio.TimeoutError: print("Operation timed out") asyncio.run(with_timeout())
2. Cancellation Handling
import asyncio async def cancellable_task(): """Task that can be cancelled.""" try: for i in range(10): print(f"Working: {i}") await asyncio.sleep(1) return "Complete" except asyncio.CancelledError: print("Task was cancelled") # Cleanup raise # Re-raise to propagate cancellation async def cancel_example(): """Example of task cancellation.""" task = asyncio.create_task(cancellable_task()) # Let it run for a bit await asyncio.sleep(3) # Cancel the task task.cancel() try: await task except asyncio.CancelledError: print("Confirmed: task was cancelled") asyncio.run(cancel_example())
3. Resource Cleanup with Context Managers
import asyncio class AsyncResource: """Async context manager for resource management.""" async def __aenter__(self): """Async setup.""" print("Acquiring resource") await asyncio.sleep(1) # Simulate async setup self.connection = "connected" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async cleanup.""" print("Releasing resource") await asyncio.sleep(1) # Simulate async cleanup self.connection = None async def use_resource(): """Use async resource.""" async with AsyncResource() as resource: print(f"Using resource: {resource.connection}") await asyncio.sleep(1) # Resource automatically cleaned up asyncio.run(use_resource())
4. Debouncing and Throttling
import asyncio from datetime import datetime class Debouncer: """Debounce async function calls.""" def __init__(self, delay: float): self.delay = delay self.task = None async def call(self, func, *args, **kwargs): """Debounced function call.""" # Cancel previous task if self.task: self.task.cancel() # Create new task async def delayed_call(): await asyncio.sleep(self.delay) await func(*args, **kwargs) self.task = asyncio.create_task(delayed_call()) async def api_call(query: str): """Simulated API call.""" print(f"{datetime.now()}: API call with query: {query}") async def debounce_example(): """Example of debouncing.""" debouncer = Debouncer(delay=1.0) # Rapid calls - only last one executes await debouncer.call(api_call, "query1") await asyncio.sleep(0.1) await debouncer.call(api_call, "query2") await asyncio.sleep(0.1) await debouncer.call(api_call, "query3") # Wait for debounced call await asyncio.sleep(2) asyncio.run(debounce_example()) # Output: Only "query3" API call executes
5. Rate Limiting
import asyncio from datetime import datetime class RateLimiter: """Limit rate of async operations.""" def __init__(self, max_calls: int, period: float): self.max_calls = max_calls self.period = period self.semaphore = asyncio.Semaphore(max_calls) self.calls = [] async def __aenter__(self): """Acquire rate limit slot.""" await self.semaphore.acquire() now = asyncio.get_event_loop().time() # Remove old calls outside period self.calls = [t for t in self.calls if now - t < self.period] if len(self.calls) >= self.max_calls: # Wait until oldest call expires sleep_time = self.period - (now - self.calls[0]) if sleep_time > 0: await asyncio.sleep(sleep_time) self.calls.append(now) return self async def __aexit__(self, *args): """Release semaphore.""" self.semaphore.release() async def rate_limited_operation(limiter, name): """Operation with rate limiting.""" async with limiter: print(f"{datetime.now()}: {name}") await asyncio.sleep(0.1) async def rate_limit_example(): """Example of rate limiting.""" # Max 3 calls per 2 seconds limiter = RateLimiter(max_calls=3, period=2.0) # Try to make 6 calls await asyncio.gather(*[ rate_limited_operation(limiter, f"Call-{i}") for i in range(6) ]) asyncio.run(rate_limit_example())
Debugging Async Code
1. Enable Debug Mode
import asyncio import logging # Enable asyncio debug mode asyncio.run(main(), debug=True) # Or set environment variable: # PYTHONASYNCIODEBUG=1 python script.py # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) async def debug_example(): logger.debug("Starting operation") await asyncio.sleep(1) logger.debug("Operation complete")
2. Detect Blocking Code
import asyncio import time async def problematic_code(): """Code with blocking operation.""" print("Starting") # BAD: Blocking sleep time.sleep(2) # This blocks the event loop! print("Complete") # Run with debug mode to detect blocking asyncio.run(problematic_code(), debug=True) # Warning: Executing <Task> took 2.001 seconds
3. Track Pending Tasks
import asyncio async def track_tasks(): """Track all pending tasks.""" # Get all tasks tasks = asyncio.all_tasks() print(f"Total tasks: {len(tasks)}") for task in tasks: print(f" - {task.get_name()}: {task}") # Check if task is done if task.done(): try: result = task.result() print(f" Result: {result}") except Exception as e: print(f" Exception: {e}") # Create some tasks async def main(): task1 = asyncio.create_task(asyncio.sleep(5), name="sleep-task") task2 = asyncio.create_task(track_tasks(), name="tracking") await task2 task1.cancel() asyncio.run(main())
Testing Async Code
1. pytest-asyncio Setup
# test_async.py import pytest import asyncio # Mark test as async @pytest.mark.asyncio async def test_async_function(): """Test async function.""" result = await some_async_function() assert result == "expected" @pytest.mark.asyncio async def test_async_http(): """Test async HTTP client.""" async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: assert response.status == 200 data = await response.json() assert "slideshow" in data # Async fixture @pytest.fixture async def async_client(): """Async test fixture.""" client = await create_async_client() yield client await client.close() @pytest.mark.asyncio async def test_with_fixture(async_client): """Test using async fixture.""" result = await async_client.fetch_data() assert result is not None
2. Mocking Async Functions
import pytest from unittest.mock import AsyncMock, patch @pytest.mark.asyncio async def test_with_mock(): """Test with async mock.""" # Create async mock mock_func = AsyncMock(return_value="mocked result") result = await mock_func() assert result == "mocked result" mock_func.assert_called_once() @pytest.mark.asyncio @patch("module.async_function", new_callable=AsyncMock) async def test_with_patch(mock_async): """Test with patched async function.""" mock_async.return_value = {"status": "success"} result = await some_function_that_calls_async() assert result["status"] == "success" mock_async.assert_called_once()
Performance Optimization
1. Use asyncio.gather() for Parallelism
import asyncio import time async def slow_task(n): await asyncio.sleep(1) return n * 2 async def optimized(): """Parallel execution.""" start = time.time() # Sequential (slow) - 5 seconds # results = [] # for i in range(5): # result = await slow_task(i) # results.append(result) # Parallel (fast) - 1 second results = await asyncio.gather(*[slow_task(i) for i in range(5)]) elapsed = time.time() - start print(f"Time: {elapsed:.2f}s, Results: {results}") asyncio.run(optimized())
2. Connection Pooling
import asyncio import aiohttp # BAD: Create new session for each request async def bad_pattern(): for i in range(10): async with aiohttp.ClientSession() as session: async with session.get("https://httpbin.org/json") as response: await response.json() # GOOD: Reuse session with connection pool async def good_pattern(): async with aiohttp.ClientSession() as session: tasks = [ session.get("https://httpbin.org/json") for i in range(10) ] responses = await asyncio.gather(*tasks) for response in responses: await response.json()
3. Avoid Blocking Operations
import asyncio # BAD: Blocking I/O in async function async def bad_file_read(): with open("large_file.txt") as f: # Blocks event loop! data = f.read() return data # GOOD: Use async file I/O or run in executor async def good_file_read(): loop = asyncio.get_running_loop() # Run blocking operation in thread pool data = await loop.run_in_executor( None, lambda: open("large_file.txt").read() ) return data # BETTER: Use aiofiles for async file I/O import aiofiles async def better_file_read(): async with aiofiles.open("large_file.txt") as f: data = await f.read() return data
Common Pitfalls
❌ Anti-Pattern 1: Not Awaiting Coroutines
# WRONG async def bad(): result = async_function() # Returns coroutine, doesn't execute! print(result) # Prints: <coroutine object> # CORRECT async def good(): result = await async_function() # Actually executes print(result)
❌ Anti-Pattern 2: Blocking the Event Loop
# WRONG import time async def bad(): time.sleep(5) # Blocks entire event loop! # CORRECT async def good(): await asyncio.sleep(5) # Non-blocking
❌ Anti-Pattern 3: Not Handling Cancellation
# WRONG async def bad(): await asyncio.sleep(10) # No cleanup if cancelled # CORRECT async def good(): try: await asyncio.sleep(10) except asyncio.CancelledError: # Cleanup resources await cleanup() raise # Re-raise to propagate
❌ Anti-Pattern 4: Creating Event Loop Incorrectly
# WRONG (Python 3.7+) loop = asyncio.get_event_loop() loop.run_until_complete(main()) # CORRECT (Python 3.7+) asyncio.run(main())
❌ Anti-Pattern 5: Not Closing Resources
# WRONG async def bad(): session = aiohttp.ClientSession() response = await session.get(url) # Session never closed - resource leak! # CORRECT async def good(): async with aiohttp.ClientSession() as session: response = await session.get(url) # Session automatically closed
Best Practices
- Use asyncio.run() for entry point (Python 3.7+)
- Always await coroutines - don't forget await
- Use async context managers for resource cleanup
- Connection pooling for HTTP and database clients
- Handle CancelledError for graceful shutdown
- Use asyncio.gather() for parallel operations
- Avoid blocking operations in async functions
- Use timeouts to prevent hanging operations
- Debug mode during development to catch issues
- Test async code with pytest-asyncio
Quick Reference
Common Commands
# Run async script python script.py # Run with debug mode PYTHONASYNCIODEBUG=1 python script.py # Run tests pytest -v --asyncio-mode=auto # Install async dependencies pip install aiohttp asyncpg motor pytest-asyncio
Essential Imports
import asyncio import aiohttp import asyncpg from typing import List, Dict, Any
Resources
- Official Documentation: https://docs.python.org/3/library/asyncio.html
- aiohttp: https://docs.aiohttp.org/
- asyncpg: https://magicstack.github.io/asyncpg/
- FastAPI Async: https://fastapi.tiangolo.com/async/
- pytest-asyncio: https://pytest-asyncio.readthedocs.io/
Related Skills
When using asyncio, consider these complementary skills:
- fastapi-local-dev: FastAPI async server patterns and production deployment
- pytest: Testing async code with pytest-asyncio and fixtures
- systematic-debugging: Debugging async race conditions and deadlocks
Quick FastAPI Async Patterns (Inlined for Standalone Use)
# FastAPI async endpoint pattern from fastapi import FastAPI, Depends from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker app = FastAPI() # Async database setup engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db") AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async def get_db(): async with AsyncSessionLocal() as session: yield session @app.get("/users/{user_id}") async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): # Async database query result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail="User not found") return user # Background tasks with asyncio @app.post("/send-email") async def send_email_endpoint(email: EmailSchema): # Non-blocking background task asyncio.create_task(send_email_async(email)) return {"status": "email queued"}
Quick pytest-asyncio Patterns (Inlined for Standalone Use)
# Testing async functions with pytest import pytest import pytest_asyncio from httpx import AsyncClient # Async test fixture @pytest_asyncio.fixture async def async_client(): async with AsyncClient(app=app, base_url="http://test") as client: yield client # Async test function @pytest.mark.asyncio async def test_get_user(async_client): response = await async_client.get("/users/1") assert response.status_code == 200 assert response.json()["id"] == 1 # Testing concurrent operations @pytest.mark.asyncio async def test_concurrent_requests(): async with AsyncClient(app=app, base_url="http://test") as client: # Run 10 requests concurrently responses = await asyncio.gather( *[client.get(f"/users/{i}") for i in range(1, 11)] ) assert all(r.status_code == 200 for r in responses) # Mock async dependencies @pytest_asyncio.fixture async def mock_db(): # Setup mock database db = AsyncMock() yield db # Cleanup
Quick Async Debugging Reference (Inlined for Standalone Use)
Common Async Pitfalls:
-
Blocking the Event Loop
# ❌ WRONG: Blocking call in async function async def bad_function(): time.sleep(5) # Blocks entire event loop! return "done" # ✅ CORRECT: Use asyncio.sleep async def good_function(): await asyncio.sleep(5) # Releases event loop return "done" -
Debugging Race Conditions
# Add logging to track execution order import logging logging.basicConfig(level=logging.DEBUG) async def debug_task(name): logging.debug(f"{name}: Starting") await asyncio.sleep(1) logging.debug(f"{name}: Finished") return name # Run with detailed tracing asyncio.run(asyncio.gather(debug_task("A"), debug_task("B")), debug=True) -
Deadlock Detection
# Use timeout to detect deadlocks try: result = await asyncio.wait_for(some_async_function(), timeout=5.0) except asyncio.TimeoutError: logging.error("Deadlock detected: operation timed out") # Investigate what's blocking -
Inspecting Running Tasks
# Check all pending tasks tasks = asyncio.all_tasks() for task in tasks: print(f"Task: {task.get_name()}, Done: {task.done()}") if not task.done(): print(f" Current coro: {task.get_coro()}")
[Full FastAPI, pytest, and debugging patterns available in respective skills if deployed together]
Python Version Compatibility: This skill covers asyncio in Python 3.7+ and reflects current best practices for async programming in 2025.