Claude-skill-registry fastapi-production
Production-grade FastAPI patterns for async APIs, SQLAlchemy 2.0, Pydantic v2, and robust error handling. Use when building API endpoints, handling database operations, implementing middleware, or optimizing performance.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/fastapi-production" ~/.claude/skills/majiayu000-claude-skill-registry-fastapi-production && rm -rf "$T"
manifest:
skills/data/fastapi-production/SKILL.mdsource content
FastAPI Production Patterns Skill
Production-ready patterns for FastAPI applications with async SQLAlchemy, Pydantic validation, and enterprise-grade error handling.
When This Skill Activates
- Building new API endpoints
- Database query optimization
- Error handling and validation
- Middleware implementation
- Authentication/authorization patterns
- Background task integration
- Performance optimization
Project Architecture
backend/app/ ├── api/ │ ├── deps.py # Dependency injection │ └── routes/ # API endpoints ├── controllers/ # Request/response handling ├── services/ # Business logic ├── repositories/ # Data access (optional) ├── models/ # SQLAlchemy ORM ├── schemas/ # Pydantic schemas ├── core/ # Config, security └── db/ # Database session
Layer Responsibilities
Route (thin) → Parse request, call controller/service, return response Controller → Orchestrate service calls, transform data Service → Business logic, validation, transactions Repository → Database queries (optional abstraction) Model → ORM definitions, relationships Schema → Request/response validation
Route Patterns
Standard CRUD Endpoint
from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_db, get_current_user from app.schemas.schedule import ScheduleCreate, ScheduleResponse, ScheduleList from app.services import schedule_service from app.models.user import User router = APIRouter(prefix="/schedules", tags=["schedules"]) @router.get("", response_model=ScheduleList) async def list_schedules( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ) -> ScheduleList: """ List all schedules with pagination. - **skip**: Number of records to skip - **limit**: Maximum number of records to return """ schedules = await schedule_service.get_multi(db, skip=skip, limit=limit) total = await schedule_service.count(db) return ScheduleList(items=schedules, total=total, skip=skip, limit=limit) @router.get("/{schedule_id}", response_model=ScheduleResponse) async def get_schedule( schedule_id: str, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ) -> ScheduleResponse: """Get a specific schedule by ID.""" schedule = await schedule_service.get(db, id=schedule_id) if not schedule: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Schedule not found", ) return schedule @router.post("", response_model=ScheduleResponse, status_code=status.HTTP_201_CREATED) async def create_schedule( schedule_in: ScheduleCreate, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ) -> ScheduleResponse: """Create a new schedule.""" schedule = await schedule_service.create(db, obj_in=schedule_in, created_by=current_user.id) return schedule
Bulk Operations
from typing import List @router.post("/bulk", response_model=List[ScheduleResponse]) async def create_schedules_bulk( schedules_in: List[ScheduleCreate], db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ) -> List[ScheduleResponse]: """Create multiple schedules in a single transaction.""" if len(schedules_in) > 100: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Maximum 100 schedules per request", ) async with db.begin(): schedules = await schedule_service.create_multi( db, objs_in=schedules_in, created_by=current_user.id ) return schedules
Pydantic Schema Patterns
Request/Response Separation
from pydantic import BaseModel, Field, field_validator from datetime import date, datetime from typing import Optional, List # Base schema with shared fields class ScheduleBase(BaseModel): name: str = Field(..., min_length=1, max_length=255) start_date: date end_date: date @field_validator("end_date") @classmethod def end_date_after_start(cls, v: date, info) -> date: if "start_date" in info.data and v < info.data["start_date"]: raise ValueError("end_date must be after start_date") return v # Create schema - fields required for creation class ScheduleCreate(ScheduleBase): description: Optional[str] = None # Update schema - all fields optional class ScheduleUpdate(BaseModel): name: Optional[str] = Field(None, min_length=1, max_length=255) start_date: Optional[date] = None end_date: Optional[date] = None description: Optional[str] = None # Response schema - includes DB fields class ScheduleResponse(ScheduleBase): id: str description: Optional[str] created_at: datetime updated_at: Optional[datetime] created_by: str model_config = {"from_attributes": True} # List response with pagination class ScheduleList(BaseModel): items: List[ScheduleResponse] total: int skip: int limit: int
Nested Schemas
class AssignmentResponse(BaseModel): id: str person_id: str block_id: str rotation_type: str model_config = {"from_attributes": True} class ScheduleWithAssignments(ScheduleResponse): assignments: List[AssignmentResponse] = []
Service Layer Patterns
Service Structure
from typing import Optional, List, TypeVar, Generic from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from pydantic import BaseModel from app.models.schedule import Schedule from app.schemas.schedule import ScheduleCreate, ScheduleUpdate class ScheduleService: """Service for schedule business logic.""" async def get(self, db: AsyncSession, *, id: str) -> Optional[Schedule]: """Get a schedule by ID.""" result = await db.execute(select(Schedule).where(Schedule.id == id)) return result.scalar_one_or_none() async def get_multi( self, db: AsyncSession, *, skip: int = 0, limit: int = 100, ) -> List[Schedule]: """Get multiple schedules with pagination.""" result = await db.execute( select(Schedule) .offset(skip) .limit(limit) .order_by(Schedule.created_at.desc()) ) return list(result.scalars().all()) async def count(self, db: AsyncSession) -> int: """Count total schedules.""" result = await db.execute(select(func.count()).select_from(Schedule)) return result.scalar() or 0 async def create( self, db: AsyncSession, *, obj_in: ScheduleCreate, created_by: str, ) -> Schedule: """Create a new schedule.""" db_obj = Schedule( **obj_in.model_dump(), created_by=created_by, ) db.add(db_obj) await db.flush() await db.refresh(db_obj) return db_obj async def update( self, db: AsyncSession, *, db_obj: Schedule, obj_in: ScheduleUpdate, ) -> Schedule: """Update an existing schedule.""" update_data = obj_in.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_obj, field, value) db.add(db_obj) await db.flush() await db.refresh(db_obj) return db_obj async def delete(self, db: AsyncSession, *, id: str) -> bool: """Delete a schedule by ID.""" obj = await self.get(db, id=id) if obj: await db.delete(obj) await db.flush() return True return False # Singleton instance schedule_service = ScheduleService()
Business Logic with Validation
from app.scheduling.acgme_validator import ACGMEValidator class AssignmentService: """Service with business logic and validation.""" def __init__(self): self.validator = ACGMEValidator() async def create_assignment( self, db: AsyncSession, *, obj_in: AssignmentCreate, created_by: str, ) -> Assignment: """Create assignment with ACGME validation.""" # Check existing assignments for conflicts conflicts = await self._check_conflicts(db, obj_in) if conflicts: raise ValueError(f"Assignment conflicts with: {conflicts}") # Validate ACGME compliance violations = await self.validator.validate_assignment(db, obj_in) if violations: raise ACGMEViolationError(violations) # Create assignment assignment = Assignment(**obj_in.model_dump(), created_by=created_by) db.add(assignment) await db.flush() return assignment async def _check_conflicts( self, db: AsyncSession, obj_in: AssignmentCreate, ) -> List[Assignment]: """Check for conflicting assignments.""" result = await db.execute( select(Assignment) .where(Assignment.person_id == obj_in.person_id) .where(Assignment.block_id == obj_in.block_id) ) return list(result.scalars().all())
Database Query Patterns
Eager Loading (Prevent N+1)
from sqlalchemy.orm import selectinload, joinedload async def get_schedule_with_assignments( db: AsyncSession, schedule_id: str ) -> Optional[Schedule]: """Get schedule with all assignments eagerly loaded.""" result = await db.execute( select(Schedule) .options( selectinload(Schedule.assignments).selectinload(Assignment.person), selectinload(Schedule.assignments).selectinload(Assignment.block), ) .where(Schedule.id == schedule_id) ) return result.scalar_one_or_none()
Complex Queries
from sqlalchemy import and_, or_, func from datetime import date, timedelta async def get_upcoming_assignments( db: AsyncSession, person_id: str, days_ahead: int = 7, ) -> List[Assignment]: """Get assignments for the next N days.""" today = date.today() end_date = today + timedelta(days=days_ahead) result = await db.execute( select(Assignment) .join(Block) .where( and_( Assignment.person_id == person_id, Block.date >= today, Block.date <= end_date, ) ) .order_by(Block.date, Block.session) ) return list(result.scalars().all()) async def get_coverage_stats( db: AsyncSession, schedule_id: str, ) -> dict: """Get coverage statistics for a schedule.""" result = await db.execute( select( Block.date, func.count(Assignment.id).label("assigned_count"), ) .outerjoin(Assignment, Assignment.block_id == Block.id) .where(Assignment.schedule_id == schedule_id) .group_by(Block.date) ) return {row.date: row.assigned_count for row in result.all()}
Row Locking for Concurrency
from sqlalchemy import select from sqlalchemy.orm import with_for_update async def update_with_lock( db: AsyncSession, assignment_id: str, new_status: str, ) -> Assignment: """Update with row lock to prevent race conditions.""" result = await db.execute( select(Assignment) .where(Assignment.id == assignment_id) .with_for_update() # Lock the row ) assignment = result.scalar_one_or_none() if not assignment: raise ValueError("Assignment not found") assignment.status = new_status await db.flush() return assignment
Error Handling
Custom Exception Classes
# app/core/exceptions.py from typing import Any, Optional class AppException(Exception): """Base application exception.""" def __init__( self, message: str, code: str = "APP_ERROR", details: Optional[dict] = None, ): self.message = message self.code = code self.details = details or {} super().__init__(message) class NotFoundError(AppException): """Resource not found.""" def __init__(self, resource: str, id: str): super().__init__( message=f"{resource} not found", code="NOT_FOUND", details={"resource": resource, "id": id}, ) class ValidationError(AppException): """Validation failed.""" def __init__(self, errors: list): super().__init__( message="Validation failed", code="VALIDATION_ERROR", details={"errors": errors}, ) class ACGMEViolationError(AppException): """ACGME compliance violation.""" def __init__(self, violations: list): super().__init__( message="ACGME compliance violation", code="ACGME_VIOLATION", details={"violations": violations}, )
Global Exception Handler
# app/api/exception_handlers.py from fastapi import Request, status from fastapi.responses import JSONResponse from app.core.exceptions import AppException, NotFoundError, ValidationError async def app_exception_handler(request: Request, exc: AppException) -> JSONResponse: """Handle application exceptions.""" status_map = { "NOT_FOUND": status.HTTP_404_NOT_FOUND, "VALIDATION_ERROR": status.HTTP_400_BAD_REQUEST, "ACGME_VIOLATION": status.HTTP_422_UNPROCESSABLE_ENTITY, } return JSONResponse( status_code=status_map.get(exc.code, status.HTTP_500_INTERNAL_SERVER_ERROR), content={ "error": { "code": exc.code, "message": exc.message, "details": exc.details, } }, ) # Register in main.py app.add_exception_handler(AppException, app_exception_handler)
Dependency Injection
Database Session
# app/api/deps.py from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from app.db.session import async_session async def get_db() -> AsyncGenerator[AsyncSession, None]: """Dependency for database session.""" async with async_session() as session: try: yield session await session.commit() except Exception: await session.rollback() raise
Current User
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from app.core.security import decode_token from app.models.user import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login") async def get_current_user( db: AsyncSession = Depends(get_db), token: str = Depends(oauth2_scheme), ) -> User: """Get current authenticated user.""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) payload = decode_token(token) if payload is None: raise credentials_exception user_id = payload.get("sub") if user_id is None: raise credentials_exception user = await user_service.get(db, id=user_id) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: User = Depends(get_current_user), ) -> User: """Ensure user is active.""" if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user", ) return current_user
Role-Based Access
from functools import wraps from typing import List def require_roles(roles: List[str]): """Dependency factory for role-based access.""" async def role_checker( current_user: User = Depends(get_current_active_user), ) -> User: if current_user.role not in roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions", ) return current_user return role_checker # Usage @router.delete("/{schedule_id}") async def delete_schedule( schedule_id: str, db: AsyncSession = Depends(get_db), current_user: User = Depends(require_roles(["ADMIN", "COORDINATOR"])), ) -> dict: """Delete a schedule (admin/coordinator only).""" await schedule_service.delete(db, id=schedule_id) return {"status": "deleted"}
Background Tasks
FastAPI Background Tasks
from fastapi import BackgroundTasks async def send_notification(user_id: str, message: str): """Background task to send notification.""" # Implementation pass @router.post("/assignments") async def create_assignment( assignment_in: AssignmentCreate, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Create assignment and notify affected users.""" assignment = await assignment_service.create(db, obj_in=assignment_in) # Add background task background_tasks.add_task( send_notification, assignment.person_id, f"New assignment: {assignment.rotation_type}", ) return assignment
Celery Integration
from app.core.celery import celery_app @celery_app.task(bind=True, max_retries=3) def generate_schedule_task(self, schedule_id: str, params: dict): """Celery task for schedule generation.""" try: # Long-running operation result = generate_schedule(schedule_id, **params) return {"status": "success", "result": result} except Exception as exc: self.retry(exc=exc, countdown=60) @router.post("/schedules/{schedule_id}/generate") async def trigger_generation( schedule_id: str, params: GenerationParams, current_user: User = Depends(get_current_user), ): """Trigger async schedule generation.""" task = generate_schedule_task.delay(schedule_id, params.model_dump()) return {"task_id": task.id, "status": "queued"}
Performance Patterns
Response Caching
from functools import lru_cache from fastapi_cache import FastAPICache from fastapi_cache.decorator import cache @router.get("/stats") @cache(expire=300) # Cache for 5 minutes async def get_stats(db: AsyncSession = Depends(get_db)): """Get cached statistics.""" return await stats_service.calculate(db)
Streaming Responses
from fastapi.responses import StreamingResponse import csv import io @router.get("/export/csv") async def export_csv( db: AsyncSession = Depends(get_db), current_user: User = Depends(get_current_user), ): """Stream large CSV export.""" async def generate_csv(): output = io.StringIO() writer = csv.writer(output) writer.writerow(["ID", "Name", "Date"]) async for batch in get_data_batches(db): for row in batch: writer.writerow([row.id, row.name, row.date]) yield output.getvalue() output.seek(0) output.truncate(0) return StreamingResponse( generate_csv(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=export.csv"}, )
Commands
cd /home/user/Autonomous-Assignment-Program-Manager/backend # Development uvicorn app.main:app --reload # Testing pytest pytest --cov=app # Linting ruff check app/ ruff format app/ # Type checking mypy app/ # Database alembic upgrade head alembic revision --autogenerate -m "description"
Integration with Other Skills
With database-migration
For schema changes:
- Modify model
- Use database-migration skill for Alembic
- Update Pydantic schemas to match
With security-audit
For auth/security code:
- Defer to security-audit for review
- Follow OWASP guidelines
- Validate HIPAA compliance
With test-writer
After creating endpoints:
- test-writer generates API tests
- Cover happy path and errors
- Test auth/permissions
Escalation Rules
Escalate to human when:
- Authentication/authorization changes
- Database schema modifications
- ACGME compliance logic changes
- Rate limiting configuration
- External API integrations
- Performance-critical optimizations