Claude-skill-registry fastapi-async-patterns
Use when FastAPI async patterns for building high-performance APIs. Use when handling concurrent requests and async operations.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/fastapi-async-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-fastapi-async-patterns && rm -rf "$T"
skills/data/fastapi-async-patterns/SKILL.mdFastAPI Async Patterns
Master async patterns in FastAPI for building high-performance, concurrent APIs with optimal resource usage.
Basic Async Route Handlers
Understanding async vs sync endpoints in FastAPI.
from fastapi import FastAPI import time import asyncio app = FastAPI() # Sync endpoint (blocks the event loop) @app.get('/sync') def sync_endpoint(): time.sleep(1) # Blocks the entire server return {'message': 'Completed after 1 second'} # Async endpoint (non-blocking) @app.get('/async') async def async_endpoint(): await asyncio.sleep(1) # Other requests can be handled return {'message': 'Completed after 1 second'} # CPU-bound work (use sync) @app.get('/cpu-intensive') def cpu_intensive(): result = sum(i * i for i in range(10000000)) return {'result': result} # I/O-bound work (use async) @app.get('/io-intensive') async def io_intensive(): async with httpx.AsyncClient() as client: response = await client.get('https://api.example.com/data') return response.json()
Async Database Operations
Async database patterns with popular ORMs and libraries.
from fastapi import FastAPI, Depends, HTTPException from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from sqlalchemy import select import asyncpg from motor.motor_asyncio import AsyncIOMotorClient from tortoise import Tortoise from tortoise.contrib.fastapi import register_tortoise app = FastAPI() # SQLAlchemy async setup DATABASE_URL = 'postgresql+asyncpg://user:pass@localhost/db' engine = create_async_engine(DATABASE_URL, echo=True, future=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False ) async def get_db() -> AsyncSession: async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise @app.get('/users/{user_id}') async def get_user(user_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(User).where(User.id == user_id)) user = result.scalar_one_or_none() if not user: raise HTTPException(status_code=404, detail='User not found') return user # Direct asyncpg (lower level, faster) async def get_asyncpg_pool(): pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20 ) try: yield pool finally: await pool.close() @app.get('/users-fast/{user_id}') async def get_user_fast(user_id: int, pool = Depends(get_asyncpg_pool)): async with pool.acquire() as conn: row = await conn.fetchrow( 'SELECT * FROM users WHERE id = $1', user_id ) if not row: raise HTTPException(status_code=404, detail='User not found') return dict(row) # MongoDB with Motor mongo_client = AsyncIOMotorClient('mongodb://localhost:27017') db = mongo_client.mydatabase @app.get('/documents/{doc_id}') async def get_document(doc_id: str): document = await db.collection.find_one({'_id': doc_id}) if not document: raise HTTPException(status_code=404, detail='Document not found') return document @app.post('/documents') async def create_document(data: dict): result = await db.collection.insert_one(data) return {'id': str(result.inserted_id)} # Tortoise ORM async register_tortoise( app, db_url='postgres://user:pass@localhost/db', modules={'models': ['app.models']}, generate_schemas=True, add_exception_handlers=True, ) from tortoise.models import Model from tortoise import fields class UserModel(Model): id = fields.IntField(pk=True) name = fields.CharField(max_length=255) email = fields.CharField(max_length=255) @app.get('/tortoise-users/{user_id}') async def get_tortoise_user(user_id: int): user = await UserModel.get_or_none(id=user_id) if not user: raise HTTPException(status_code=404, detail='User not found') return user
Background Tasks
Fire-and-forget tasks without blocking the response.
from fastapi import BackgroundTasks, FastAPI import asyncio from datetime import datetime app = FastAPI() # Simple background task async def send_email(email: str, message: str): await asyncio.sleep(2) # Simulate email sending print(f'Email sent to {email}: {message}') @app.post('/send-email') async def send_email_endpoint( email: str, message: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, message) return {'status': 'Email will be sent in background'} # Multiple background tasks async def log_activity(user_id: int, action: str): await asyncio.sleep(0.5) print(f'[{datetime.now()}] User {user_id} performed: {action}') async def update_analytics(action: str): await asyncio.sleep(1) print(f'Analytics updated for action: {action}') @app.post('/users/{user_id}/action') async def perform_action( user_id: int, action: str, background_tasks: BackgroundTasks ): # Add multiple tasks background_tasks.add_task(log_activity, user_id, action) background_tasks.add_task(update_analytics, action) return {'status': 'Action logged'} # Background cleanup async def cleanup_temp_files(file_path: str): await asyncio.sleep(60) # Wait before cleanup import os if os.path.exists(file_path): os.remove(file_path) print(f'Cleaned up: {file_path}') @app.post('/upload') async def upload_file( file: UploadFile, background_tasks: BackgroundTasks ): temp_path = f'/tmp/{file.filename}' with open(temp_path, 'wb') as f: content = await file.read() f.write(content) # Schedule cleanup background_tasks.add_task(cleanup_temp_files, temp_path) return {'filename': file.filename, 'path': temp_path}
WebSocket Handling
Real-time bidirectional communication patterns.
from fastapi import WebSocket, WebSocketDisconnect, Depends from typing import List import json app = FastAPI() # Simple WebSocket @app.websocket('/ws') async def websocket_endpoint(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_text() await websocket.send_text(f'Echo: {data}') except WebSocketDisconnect: print('Client disconnected') # WebSocket with authentication async def get_current_user_ws(websocket: WebSocket): token = websocket.query_params.get('token') if not token or not verify_token(token): await websocket.close(code=1008) # Policy violation raise HTTPException(status_code=401, detail='Unauthorized') return decode_token(token) @app.websocket('/ws/authenticated') async def authenticated_websocket( websocket: WebSocket, user = Depends(get_current_user_ws) ): await websocket.accept() try: await websocket.send_text(f'Welcome {user["name"]}') while True: data = await websocket.receive_text() await websocket.send_text(f'{user["name"]}: {data}') except WebSocketDisconnect: print(f'User {user["name"]} disconnected') # Broadcasting to multiple connections class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket('/ws/chat/{client_id}') async def chat_endpoint(websocket: WebSocket, client_id: str): await manager.connect(websocket) await manager.broadcast(f'Client {client_id} joined the chat') try: while True: data = await websocket.receive_text() await manager.broadcast(f'Client {client_id}: {data}') except WebSocketDisconnect: manager.disconnect(websocket) await manager.broadcast(f'Client {client_id} left the chat') # WebSocket with JSON messages @app.websocket('/ws/json') async def json_websocket(websocket: WebSocket): await websocket.accept() try: while True: data = await websocket.receive_json() message_type = data.get('type') if message_type == 'ping': await websocket.send_json({'type': 'pong'}) elif message_type == 'message': await websocket.send_json({ 'type': 'response', 'data': f'Received: {data.get("content")}' }) except WebSocketDisconnect: print('Client disconnected')
Server-Sent Events (SSE)
One-way streaming from server to client.
from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse import asyncio app = FastAPI() @app.get('/sse') async def sse_endpoint(): async def event_generator(): for i in range(10): await asyncio.sleep(1) yield { 'event': 'message', 'data': f'Message {i}' } return EventSourceResponse(event_generator()) # SSE with real-time updates @app.get('/sse/updates') async def sse_updates(): async def update_generator(): while True: # Simulate fetching updates await asyncio.sleep(2) update = await fetch_latest_update() yield { 'event': 'update', 'data': json.dumps(update) } return EventSourceResponse(update_generator()) # SSE with heartbeat @app.get('/sse/heartbeat') async def sse_heartbeat(): async def heartbeat_generator(): try: while True: await asyncio.sleep(30) yield { 'event': 'heartbeat', 'data': datetime.now().isoformat() } except asyncio.CancelledError: print('SSE connection closed') return EventSourceResponse(heartbeat_generator())
Streaming Responses
Stream large files or generated content.
from fastapi import FastAPI from fastapi.responses import StreamingResponse import io import csv app = FastAPI() # Stream large file @app.get('/download/{filename}') async def download_file(filename: str): async def file_stream(): with open(f'/data/{filename}', 'rb') as f: while chunk := f.read(8192): yield chunk return StreamingResponse( file_stream(), media_type='application/octet-stream', headers={'Content-Disposition': f'attachment; filename={filename}'} ) # Stream generated CSV @app.get('/export/users') async def export_users(): async def csv_stream(): output = io.StringIO() writer = csv.writer(output) # Write header writer.writerow(['ID', 'Name', 'Email']) yield output.getvalue() output.truncate(0) output.seek(0) # Stream users in batches offset = 0 batch_size = 100 while True: users = await fetch_users_batch(offset, batch_size) if not users: break for user in users: writer.writerow([user.id, user.name, user.email]) yield output.getvalue() output.truncate(0) output.seek(0) offset += batch_size return StreamingResponse( csv_stream(), media_type='text/csv', headers={'Content-Disposition': 'attachment; filename=users.csv'} ) # Stream generated content @app.get('/generate/report') async def generate_report(): async def report_stream(): yield b'<html><body><h1>Report</h1>' for section in ['users', 'orders', 'analytics']: await asyncio.sleep(0.5) # Simulate processing data = await fetch_section_data(section) yield f'<h2>{section.title()}</h2>'.encode() yield f'<pre>{data}</pre>'.encode() yield b'</body></html>' return StreamingResponse(report_stream(), media_type='text/html')
Concurrent Request Handling
Parallel processing patterns for multiple operations.
from fastapi import FastAPI import asyncio import httpx app = FastAPI() # Parallel API calls @app.get('/aggregate/user/{user_id}') async def aggregate_user_data(user_id: int): async with httpx.AsyncClient() as client: # Fetch from multiple sources in parallel profile_task = client.get(f'https://api.example.com/users/{user_id}') posts_task = client.get(f'https://api.example.com/users/{user_id}/posts') comments_task = client.get(f'https://api.example.com/users/{user_id}/comments') profile, posts, comments = await asyncio.gather( profile_task, posts_task, comments_task ) return { 'profile': profile.json(), 'posts': posts.json(), 'comments': comments.json() } # Parallel database queries @app.get('/dashboard') async def get_dashboard(db: AsyncSession = Depends(get_db)): # Execute multiple queries in parallel users_query = db.execute(select(User).limit(10)) orders_query = db.execute(select(Order).limit(10)) stats_query = db.execute(select(func.count(User.id))) users, orders, stats = await asyncio.gather( users_query, orders_query, stats_query ) return { 'users': users.scalars().all(), 'orders': orders.scalars().all(), 'total_users': stats.scalar() } # Race condition (first to complete wins) @app.get('/fastest-price/{product_id}') async def get_fastest_price(product_id: str): async with httpx.AsyncClient() as client: tasks = [ client.get(f'https://store1.com/price/{product_id}'), client.get(f'https://store2.com/price/{product_id}'), client.get(f'https://store3.com/price/{product_id}') ] done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) # Cancel pending requests for task in pending: task.cancel() result = done.pop().result() return result.json()
Async Context Managers
Resource management with async context managers.
from contextlib import asynccontextmanager from fastapi import FastAPI import asyncio # Async context manager for lifespan events @asynccontextmanager async def lifespan(app: FastAPI): # Startup print('Starting up...') db_pool = await create_db_pool() redis_client = await create_redis_client() # Store in app state app.state.db_pool = db_pool app.state.redis = redis_client yield # Shutdown print('Shutting down...') await db_pool.close() await redis_client.close() app = FastAPI(lifespan=lifespan) # Custom async context manager class AsyncDatabaseSession: def __init__(self, pool): self.pool = pool self.conn = None async def __aenter__(self): self.conn = await self.pool.acquire() return self.conn async def __aexit__(self, exc_type, exc_val, exc_tb): await self.pool.release(self.conn) if exc_type is not None: # Handle exception await self.conn.rollback() return False @app.get('/data') async def get_data(): async with AsyncDatabaseSession(app.state.db_pool) as conn: result = await conn.fetch('SELECT * FROM data') return result
Connection Pooling
Efficient connection management for databases and HTTP clients.
from fastapi import FastAPI, Depends import asyncpg import httpx from typing import AsyncGenerator app = FastAPI() # Database connection pool class DatabasePool: def __init__(self): self.pool = None async def create_pool(self): self.pool = await asyncpg.create_pool( 'postgresql://user:pass@localhost/db', min_size=10, max_size=20, command_timeout=60, max_queries=50000, max_inactive_connection_lifetime=300 ) async def close_pool(self): await self.pool.close() async def get_connection(self): async with self.pool.acquire() as connection: yield connection db_pool = DatabasePool() @app.on_event('startup') async def startup(): await db_pool.create_pool() @app.on_event('shutdown') async def shutdown(): await db_pool.close_pool() @app.get('/users') async def get_users(conn = Depends(db_pool.get_connection)): rows = await conn.fetch('SELECT * FROM users') return [dict(row) for row in rows] # HTTP client pool class HTTPClientPool: def __init__(self): self.client = None async def get_client(self) -> AsyncGenerator[httpx.AsyncClient, None]: if self.client is None: self.client = httpx.AsyncClient( limits=httpx.Limits(max_keepalive_connections=20, max_connections=100), timeout=httpx.Timeout(10.0) ) yield self.client async def close(self): if self.client: await self.client.aclose() http_pool = HTTPClientPool() @app.get('/external-api') async def call_external_api(client: httpx.AsyncClient = Depends(http_pool.get_client)): response = await client.get('https://api.example.com/data') return response.json()
Performance Optimization
Async patterns for optimal performance.
from fastapi import FastAPI import asyncio from functools import lru_cache app = FastAPI() # Cache expensive async operations from aiocache import Cache from aiocache.serializers import JsonSerializer cache = Cache(Cache.MEMORY, serializer=JsonSerializer()) @app.get('/expensive-data/{key}') async def get_expensive_data(key: str): # Check cache first cached = await cache.get(key) if cached: return {'data': cached, 'cached': True} # Expensive operation await asyncio.sleep(2) data = compute_expensive_result(key) # Store in cache await cache.set(key, data, ttl=300) return {'data': data, 'cached': False} # Batch operations @app.post('/users/batch') async def create_users_batch(users: List[UserCreate], db = Depends(get_db)): # Create users in batch (more efficient than one-by-one) user_objects = [User(**user.dict()) for user in users] db.add_all(user_objects) await db.flush() return user_objects # Debouncing with asyncio class Debouncer: def __init__(self, delay: float): self.delay = delay self.task = None async def debounce(self, coro): if self.task: self.task.cancel() async def delayed(): await asyncio.sleep(self.delay) await coro self.task = asyncio.create_task(delayed()) await self.task debouncer = Debouncer(delay=1.0) # Prefetching related data @app.get('/posts/{post_id}') async def get_post_with_relations(post_id: int, db = Depends(get_db)): # Fetch post and related data in parallel post_task = db.get(Post, post_id) comments_task = db.execute( select(Comment).where(Comment.post_id == post_id) ) author_task = db.execute( select(User).where(User.id == Post.author_id) ) post, comments_result, author_result = await asyncio.gather( post_task, comments_task, author_task ) return { 'post': post, 'comments': comments_result.scalars().all(), 'author': author_result.scalar_one() }
When to Use This Skill
Use fastapi-async-patterns when:
- Building high-throughput APIs that handle many concurrent requests
- Working with I/O-bound operations (database, external APIs, file operations)
- Implementing real-time features (WebSockets, SSE)
- Processing multiple operations in parallel
- Streaming large datasets or files
- Building microservices that communicate with other services
- Optimizing API response times and resource usage
- Handling background tasks without blocking responses
FastAPI Async Best Practices
- Use async for I/O - Always use async for database, HTTP requests, and file operations
- Avoid blocking calls - Never use blocking calls in async functions (time.sleep, requests library)
- Connection pooling - Use connection pools for databases and HTTP clients
- Proper cleanup - Always clean up resources with try/finally or async context managers
- Concurrent operations - Use asyncio.gather for parallel operations when possible
- Background tasks - Use BackgroundTasks for fire-and-forget operations
- Stream large data - Use StreamingResponse for large files or generated content
- Timeout handling - Set timeouts on all external calls to prevent hanging
- Error propagation - Handle exceptions properly in async code
- Monitor performance - Use tools like aiomonitor to debug async issues
FastAPI Async Common Pitfalls
- Blocking the event loop - Using synchronous I/O in async functions kills performance
- Missing await - Forgetting await on async functions causes coroutine warnings
- Creating too many tasks - Spawning unlimited tasks can exhaust resources
- Not closing connections - Resource leaks from unclosed database/HTTP connections
- Mixing sync and async - Incorrect mixing causes event loop issues
- Race conditions - Shared state in async code without proper locking
- Timeout issues - No timeouts on external calls can hang the server
- Memory leaks - Background tasks that never complete accumulate
- Error swallowing - Silent failures in background tasks and event handlers
- Deadlocks - Circular waits in async dependencies or locks