Claude-initial-setup async-fastapi
install
source · Clone the upstream repo
git clone https://github.com/VersoXBT/claude-initial-setup
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/VersoXBT/claude-initial-setup "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/fastapi/async-fastapi" ~/.claude/skills/versoxbt-claude-initial-setup-async-fastapi && rm -rf "$T"
manifest:
skills/fastapi/async-fastapi/SKILL.mdsource content
Async FastAPI
Build high-performance async APIs with FastAPI. Async endpoints handle concurrent requests efficiently without blocking the event loop, which is critical for I/O-bound workloads like database queries, HTTP calls, and file operations.
When to Use
- User creates or modifies FastAPI endpoints
- User needs concurrent I/O operations
- User implements WebSockets or streaming
- User asks about background processing
- User encounters event loop blocking or performance issues
Core Patterns
Async Endpoints
Use
async def for I/O-bound endpoints. Use plain def for CPU-bound work (FastAPI
runs sync handlers in a threadpool automatically).
from fastapi import FastAPI import httpx app = FastAPI() # Async -- for I/O-bound operations (DB, HTTP, file) @app.get("/users/{user_id}") async def get_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() # Sync -- for CPU-bound operations (FastAPI runs in threadpool) @app.get("/compute/{n}") def compute_heavy(n: int): return {"result": sum(i * i for i in range(n))} # Concurrent async operations import asyncio @app.get("/dashboard/{user_id}") async def get_dashboard(user_id: int): user_task = get_user_from_db(user_id) orders_task = get_orders_from_db(user_id) notifications_task = get_notifications(user_id) user, orders, notifications = await asyncio.gather( user_task, orders_task, notifications_task ) return {"user": user, "orders": orders, "notifications": notifications}
Lifespan Events
Use the lifespan context manager to handle startup and shutdown. This replaces the deprecated
on_event decorators.
from contextlib import asynccontextmanager from fastapi import FastAPI import httpx @asynccontextmanager async def lifespan(app: FastAPI): # Startup: initialize shared resources app.state.http_client = httpx.AsyncClient(timeout=30.0) app.state.db_pool = await create_db_pool() yield # Shutdown: clean up resources await app.state.http_client.aclose() await app.state.db_pool.close() app = FastAPI(lifespan=lifespan) @app.get("/fetch") async def fetch_data(url: str): response = await app.state.http_client.get(url) return response.json()
Background Tasks
Use
BackgroundTasks for fire-and-forget operations that should not block the response.
from fastapi import BackgroundTasks async def send_welcome_email(email: str, name: str) -> None: async with httpx.AsyncClient() as client: await client.post( "https://email-service.example.com/send", json={"to": email, "template": "welcome", "name": name}, ) async def log_signup(user_id: int) -> None: async with get_db_session() as session: await session.execute( insert(AuditLog).values(action="signup", user_id=user_id) ) @app.post("/signup") async def signup(user: UserCreate, background_tasks: BackgroundTasks): new_user = await create_user(user) background_tasks.add_task(send_welcome_email, new_user.email, new_user.name) background_tasks.add_task(log_signup, new_user.id) return {"id": new_user.id, "status": "created"}
Middleware
import time from fastapi import Request, Response from starlette.middleware.base import BaseHTTPMiddleware class TimingMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next) -> Response: start = time.perf_counter() response = await call_next(request) duration = time.perf_counter() - start response.headers["X-Process-Time"] = f"{duration:.4f}" return response app.add_middleware(TimingMiddleware) # Pure ASGI middleware (more performant, no BaseHTTPMiddleware overhead) from starlette.types import ASGIApp, Receive, Scope, Send class CORSHeaderMiddleware: def __init__(self, app: ASGIApp): self.app = app async def __call__(self, scope: Scope, receive: Receive, send: Send): if scope["type"] == "http": async def send_with_cors(message): if message["type"] == "http.response.start": headers = dict(message.get("headers", [])) headers[b"access-control-allow-origin"] = b"*" message["headers"] = list(headers.items()) await send(message) await self.app(scope, receive, send_with_cors) else: await self.app(scope, receive, send)
WebSockets
from fastapi import WebSocket, WebSocketDisconnect class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.websocket("/ws/{room_id}") async def websocket_endpoint(websocket: WebSocket, room_id: str): await manager.connect(websocket) try: while True: data = await websocket.receive_text() await manager.broadcast(f"Room {room_id}: {data}") except WebSocketDisconnect: manager.disconnect(websocket)
Streaming Responses
from fastapi.responses import StreamingResponse async def generate_report_rows(query_params: dict): """Stream large dataset row by row.""" async with get_db_session() as session: result = await session.stream(build_query(query_params)) async for row in result: yield f"{row.id},{row.name},{row.value}\n" @app.get("/export/csv") async def export_csv(): return StreamingResponse( generate_report_rows({"status": "active"}), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=report.csv"}, ) # Server-sent events async def event_stream(): while True: data = await get_latest_event() yield f"data: {data}\n\n" await asyncio.sleep(1) @app.get("/events") async def sse(): return StreamingResponse(event_stream(), media_type="text/event-stream")
Anti-Patterns
- Blocking the event loop: Never call synchronous I/O (e.g.,
,requests.get
, blocking DB drivers) insidetime.sleep
. Use async libraries orasync def
for unavoidable sync code.asyncio.to_thread() - Creating new event loops: Never call
orasyncio.run()
inside an async handler. The event loop is already running.loop.run_until_complete() - Using
decorators: These are deprecated. Use theon_event
context manager instead.lifespan - Shared mutable state without locks: If multiple async tasks access shared state,
use
to prevent race conditions.asyncio.Lock() - Not closing async clients: Always use
or clean up in lifespan shutdown. Leaked connections cause resource exhaustion.async with
Quick Reference
| Pattern | Use Case |
|---|---|
| I/O-bound request handlers |
| CPU-bound (auto-threadpooled) |
| Concurrent async operations |
| Fire-and-forget after response |
context manager | App startup/shutdown |
| Large files, SSE, CSV export |
| Real-time bidirectional comms |
| Run sync code without blocking |