Awesome-omni-skill fastapi-development
Modern Python API development with FastAPI covering async patterns, Pydantic validation, dependency injection, and production deployment
git clone https://github.com/diegosouzapw/awesome-omni-skill
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/fastapi-development-alchimie-di-circe" ~/.claude/skills/diegosouzapw-awesome-omni-skill-fastapi-development-73bc85 && rm -rf "$T"
skills/development/fastapi-development-alchimie-di-circe/SKILL.mdFastAPI Development
A comprehensive skill for building modern, high-performance Python APIs with FastAPI. Master async/await patterns, Pydantic data validation, dependency injection, authentication, database integration, and production-ready deployment strategies.
When to Use This Skill
Use this skill when:
- Building RESTful APIs with Python for web, mobile, or microservices
- Developing high-performance, asynchronous backend services
- Creating APIs with automatic interactive documentation (OpenAPI/Swagger)
- Implementing OAuth2, JWT authentication, or other security patterns
- Integrating with SQL or NoSQL databases in Python applications
- Building APIs that require strong data validation and type safety
- Developing microservices with automatic request/response validation
- Creating APIs with WebSocket support for real-time features
- Migrating from Flask, Django REST Framework, or other Python frameworks
- Building production-ready APIs with proper error handling and testing
- create FastAPI endpoint with SSE streaming for agent communication
- implement Unix socket IPC for osxphotos sandbox integration
- add circuit breaker for sidecar management and crash recovery
Core Concepts
FastAPI Philosophy
FastAPI is built on three foundational principles:
- Fast to Code: Reduce development time with automatic validation and documentation
- Fast to Run: High performance comparable to NodeJS and Go (via Starlette and Pydantic)
- Fewer Bugs: Automatic validation reduces human errors by about 40%
- Standards-Based: Built on OpenAPI and JSON Schema standards
- Editor Support: Full autocomplete, type checking, and inline documentation
Key FastAPI Features
- Type Hints: Python 3.6+ type hints for validation and documentation
- Async Support: Native async/await for high-performance I/O operations
- Pydantic Models: Automatic request/response validation and serialization
- Dependency Injection: Elegant system for sharing logic across endpoints
- OpenAPI Docs: Automatic interactive API documentation
- Security: Built-in support for OAuth2, JWT, API keys, and more
- Testing: Easy to test with TestClient and async test support
Core Architecture Components
- FastAPI App: The main application instance
- Path Operations: Endpoint definitions with HTTP methods
- Pydantic Models: Data validation and serialization schemas
- Dependencies: Reusable logic for authentication, database, etc.
- Routers: Organize endpoints into modules
- Middleware: Process requests/responses globally
- Background Tasks: Execute code after returning responses
Getting Started
Installation
# Basic installation pip install fastapi # With ASGI server for production pip install "fastapi[all]" # Or install separately pip install fastapi uvicorn[standard] # Additional dependencies pip install python-multipart # For form data pip install python-jose[cryptography] # For JWT pip install passlib[bcrypt] # For password hashing pip install sqlalchemy # For SQL databases pip install databases # For async database support
Minimal FastAPI Application
from fastapi import FastAPI app = FastAPI() @app.get("/") async def root(): return {"message": "Hello World"} # Run with: uvicorn main:app --reload
Pydantic Models for Data Validation
Basic Model Definition
from pydantic import BaseModel, Field, EmailStr, HttpUrl from typing import Optional, List from datetime import datetime class User(BaseModel): id: int username: str = Field(..., min_length=3, max_length=50) email: EmailStr full_name: Optional[str] = None is_active: bool = True created_at: datetime = Field(default_factory=datetime.utcnow) class UserCreate(BaseModel): username: str = Field(..., min_length=3, max_length=50) email: EmailStr password: str = Field(..., min_length=8) full_name: Optional[str] = None class UserResponse(BaseModel): id: int username: str email: EmailStr full_name: Optional[str] is_active: bool class Config: orm_mode = True # For SQLAlchemy models
TRAE_Extractor-app: Agent Configuration Models
from pydantic import BaseModel, Field, validator from typing import Optional, List, Literal, Dict, Any class AgentToolConfig(BaseModel): """Configuration for a single agent tool""" name: str = Field(..., description="Tool name") type: Literal["mcp", "native", "api", "think", "memory", "filesystem"] = Field( ..., description="Tool type" ) config: Dict[str, Any] = Field( default_factory=dict, description="Tool-specific configuration" ) command: Optional[str] = Field(None, description="Command for MCP tools") args: Optional[List[str]] = Field(None, description="Command arguments") env: Optional[Dict[str, str]] = Field(None, description="Environment variables") class AgentConfig(BaseModel): """Configuration for a single agent in Cagent team""" name: str = Field(..., min_length=1, max_length=50) model: str = Field(..., description="Model reference from models section") description: str = Field(..., min_length=10, description="Agent purpose") instruction: str = Field(..., min_length=50, description="Agent system prompt") toolsets: List[AgentToolConfig] = Field( default_factory=list, description="List of tool configurations" ) rag: Optional[List[str]] = Field( None, description="List of RAG knowledge bases to use" ) sub_agents: Optional[List[str]] = Field( None, description="List of sub-agent names this agent can delegate to" ) add_prompt_files: Optional[List[str]] = Field( None, description="Additional prompt files to include" ) class CagentTeamConfig(BaseModel): """Complete Cagent team configuration""" version: str = Field(..., pattern=r'^\d+\.\d+$') models: Dict[str, Any] = Field( ..., description="Model definitions (provider, model name, max_tokens)" ) agents: Dict[str, AgentConfig] = Field( ..., description="Agent configurations keyed by agent name" ) rag: Optional[Dict[str, Any]] = Field( None, description="RAG knowledge base configurations" ) metadata: Optional[Dict[str, str]] = Field( None, description="Team metadata (author, license, version)" ) # Example usage: config = CagentTeamConfig( version="1.0", models={ "sonnet": { "provider": "anthropic", "model": "claude-sonnet-4-5", "max_tokens": 64000 } }, agents={ "captioning": AgentConfig( name="captioning", model="sonnet", description="Generates captions and social media copy", instruction="You are a captioning specialist...", toolsets=[ AgentToolConfig( type="mcp", command="npx", args=["-y", "@perplexity-ai/mcp-server"], env={"PERPLEXITY_API_KEY": "${PERPLEXITY_API_KEY}"} ) ], rag=["brand_guidelines", "platform_specs"] ) } )
TRAE_Extractor-app: Brand Knowledge Schemas
from pydantic import BaseModel, Field, validator from typing import Optional, List, Dict from enum import Enum class ToneStyle(str, Enum): """Brand tone styles""" INFORMAL_CONSCIOUS = "informal_conscious" PASSIONATE = "passionate" EDUCATIONAL = "educational" DIRECT = "direct" class BrandGuideline(BaseModel): """Single brand guideline entry""" category: str = Field(..., description="Guideline category") content: str = Field(..., min_length=10, description="Guideline content") priority: Literal["high", "medium", "low"] = Field("medium") examples: Optional[List[str]] = Field(None, description="Example messages") class BrandTone(BaseModel): """Brand tone of voice configuration""" style: ToneStyle = Field(..., description="Primary tone style") characteristics: List[str] = Field( ..., min_items=1, description="Tone characteristics" ) do_examples: List[str] = Field( ..., min_items=1, description="Correct tone examples" ) dont_examples: List[str] = Field( ..., min_items=1, description="Incorrect tone examples to avoid" ) class BrandKnowledge(BaseModel): """Complete brand knowledge base""" brand_name: str = Field(..., min_length=1) mission: str = Field(..., min_length=50) core_values: List[str] = Field(..., min_items=1) tone_guidelines: BrandTone visual_identity: Optional[Dict[str, str]] = Field(None) communication_principles: List[str] = Field(..., min_items=1) key_messages: List[str] = Field(..., min_items=1) audience_segments: Optional[Dict[str, Dict[str, str]]] = Field(None) hashtags: Dict[str, List[str]] = Field( default_factory=dict, description="Hashtag categories and lists" ) # Example usage: slow_food_knowledge = BrandKnowledge( brand_name="Slow Food", mission="Slow Food promuove il diritto al piacere e difende il diritto al cibo buono, pulito e giusto per tutti.", core_values=[ "Qualità e Autenticità", "Sostenibilità Ambientale", "Equità Economica", "Comunità e Cultura" ], tone_guidelines=BrandTone( style=ToneStyle.INFORMAL_CONSCIOUS, characteristics=["informale ma consapevole", "appassionato ma non predicatorio"], do_examples=[ "Un caffè buono non nasce al bar freddo, ma dalle mani di chi coltiva e torrefà con cura ogni chicco" ], dont_examples=[ "Compra locale o sei nemico dell'ambiente", "I nostri prodotti hanno un coefficiente di sostenibilità biocertificato standardizzato secondo ISO 14001" ] ), communication_principles=[ "Trasparenza", "Autenticità", "Educazione", "Inclusività", "Comunità" ], key_messages=[ "Il cibo buono non è un lusso, è un diritto", "Dietro ogni prodotto c'è una storia di territorio" ], hashtags={ "primary": ["#SlowFood", "#BuonoLimpoGiusto"], "sustainability": ["#SoilHealth", "#BiodiversityMatters"], "community": ["#FoodCommunity", "#AgriculturalPride"] } )
TRAE_Extractor-app: Media Metadata Validation
from pydantic import BaseModel, Field, validator from typing import Optional, List, Tuple from datetime import datetime from enum import Enum class MediaType(str, Enum): """Media file types""" IMAGE = "image" VIDEO = "video" class EXIFData(BaseModel): """EXIF metadata from photos""" camera_make: Optional[str] = Field(None, max_length=50) camera_model: Optional[str] = Field(None, max_length=50) date_taken: Optional[datetime] = Field(None) gps_coordinates: Optional[Tuple[float, float]] = Field(None) iso: Optional[int] = Field(None, ge=100, le=6400) aperture: Optional[float] = Field(None, ge=1.0, le=32.0) shutter_speed: Optional[str] = Field(None, max_length=20) focal_length: Optional[int] = Field(None, ge=1, le=1000) class MediaMetadata(BaseModel): """Metadata for extracted media files""" filename: str = Field(..., pattern=r'^[\w\-\.]+\.(jpg|png|mp4|mov|jpeg)$') file_size_bytes: int = Field(..., gt=0) media_type: MediaType = Field(..., description="Type of media") exif_data: Optional[EXIFData] = Field(None) tags: List[str] = Field( default_factory=list, max_length=20, description="User-defined tags" ) album: Optional[str] = Field(None, max_length=100) date_imported: datetime = Field(default_factory=datetime.utcnow) cloudinary_public_id: Optional[str] = Field(None) cloudinary_url: Optional[str] = Field(None, regex=r'^https://res\.cloudinary\.com/.*') @validator('filename') def validate_filename(cls, v): """Ensure filename has valid extension""" valid_extensions = ['.jpg', '.jpeg', '.png', '.mp4', '.mov'] if not any(v.lower().endswith(ext) for ext in valid_extensions): raise ValueError(f'Invalid file extension. Must be one of: {valid_extensions}') return v class MediaExtractionResult(BaseModel): """Result of media extraction operation""" album: str = Field(..., description="Source album name") export_path: str = Field(..., description="Export directory") media_count: int = Field(..., ge=0, description="Number of files extracted") media_files: List[MediaMetadata] = Field(..., description="Extracted media metadata") extraction_time: float = Field(..., gt=0, description="Extraction duration in seconds") success: bool = Field(..., description="Whether extraction succeeded") # Example usage: extraction_result = MediaExtractionResult( album="Summer 2024", export_path="~/Exports/summer-2024", media_count=42, media_files=[ MediaMetadata( filename="sunset-beach.jpg", file_size_bytes=2048576, media_type=MediaType.IMAGE, exif_data=EXIFData( camera_make="Apple", camera_model="iPhone 15 Pro", date_taken=datetime(2024, 7, 15, 18, 30, 0), iso=100, aperture=2.8 ), tags=["sunset", "beach", "summer"], album="Summer 2024" ) ], extraction_time=12.5, success=True )
TRAE_Extractor-app: Post Scheduling Data Structures
from pydantic import BaseModel, Field, validator from typing import Optional, List from datetime import datetime from enum import Enum class SocialPlatform(str, Enum): """Supported social media platforms""" INSTAGRAM = "instagram" FACEBOOK = "facebook" LINKEDIN = "linkedin" TWITTER = "twitter" TIKTOK = "tiktok" class PostStatus(str, Enum): """Post scheduling status""" PENDING = "pending" SCHEDULED = "scheduled" PUBLISHED = "published" FAILED = "failed" CANCELLED = "cancelled" class ScheduledPost(BaseModel): """Scheduled social media post""" post_id: Optional[str] = Field(None, description="Post ID from platform") media_id: str = Field(..., description="Cloudinary public ID or media file ID") caption: str = Field(..., min_length=10, max_length=2200, description="Post caption") platforms: List[SocialPlatform] = Field(..., min_items=1, description="Target platforms") scheduled_at: datetime = Field(..., description="Scheduled publish time") hashtags: List[str] = Field( default_factory=list, max_length=30, description="Hashtags to include" ) status: PostStatus = Field(default=PostStatus.PENDING) created_at: datetime = Field(default_factory=datetime.utcnow) published_at: Optional[datetime] = Field(None) error_message: Optional[str] = Field(None, description="Error if publishing failed") @validator('scheduled_at') def validate_scheduled_time(cls, v): """Ensure scheduled time is in the future""" if v <= datetime.utcnow(): raise ValueError('Scheduled time must be in the future') return v class CaptionVariation(BaseModel): """Caption variation for different platforms""" platform: SocialPlatform = Field(..., description="Target platform") caption: str = Field(..., min_length=10, max_length=2200) hashtags: List[str] = Field(default_factory=list, max_length=30) character_limit: Optional[int] = Field(None, description="Platform-specific character limit") class CaptionGenerationResult(BaseModel): """Result of caption generation by agent""" media_id: str = Field(..., description="Media file ID") variations: List[CaptionVariation] = Field(..., min_items=1) suggested_hashtags: List[str] = Field(default_factory=list, max_length=30) tone_analysis: Optional[Dict[str, str]] = Field(None, description="Tone analysis results") generation_time: float = Field(..., gt=0) # Example usage: scheduled_post = ScheduledPost( media_id="cloudinary/summer-beach-abc123", caption="Beautiful sunset at the beach. The colors of nature never disappoint! 🌅 #sunset #beach #nature", platforms=[SocialPlatform.INSTAGRAM, SocialPlatform.FACEBOOK], scheduled_at=datetime(2024, 8, 1, 18, 0, 0), hashtags=["#sunset", "#beach", "#nature", "#summer"] )
Nested Models
class Image(BaseModel): url: HttpUrl name: str class Item(BaseModel): name: str description: Optional[str] = None price: float = Field(..., gt=0) tax: Optional[float] = None tags: List[str] = [] images: Optional[List[Image]] = None # Request example: # { # "name": "Laptop", # "price": 999.99, # "tags": ["electronics", "computers"], # "images": [ # {"url": "http://example.com/img1.jpg", "name": "Front view"} # ] # }
Model Validation and Examples
from pydantic import BaseModel, Field, validator class Product(BaseModel): name: str = Field(..., example="MacBook Pro") price: float = Field(..., gt=0, example=1999.99) discount: Optional[float] = Field(None, ge=0, le=100, example=10.0) @validator('discount') def discount_check(cls, v, values): if v and 'price' in values: discounted = values['price'] * (1 - v/100) if discounted < 0: raise ValueError('Discounted price cannot be negative') return v class Config: schema_extra = { "example": { "name": "MacBook Pro 16", "price": 2499.99, "discount": 15.0 } }
Path Operations and Routing
HTTP Methods and Path Parameters
from fastapi import FastAPI, Path, Query, Body from typing import Optional app = FastAPI() # GET with path parameter @app.get("/items/{item_id}") async def read_item( item_id: int = Path(..., title="The ID of the item", ge=1), q: Optional[str] = Query(None, max_length=50) ): return {"item_id": item_id, "q": q} # POST with request body @app.post("/items/") async def create_item(item: Item): return {"item": item, "message": "Item created"} # PUT for updates @app.put("/items/{item_id}") async def update_item( item_id: int, item: Item = Body(...), ): return {"item_id": item_id, "item": item} # DELETE @app.delete("/items/{item_id}") async def delete_item(item_id: int): return {"message": f"Item {item_id} deleted"} # PATCH for partial updates @app.patch("/items/{item_id}") async def partial_update_item( item_id: int, item: dict = Body(...) ): return {"item_id": item_id, "updated_fields": item}
Query Parameters with Validation
from fastapi import Query from typing import List, Optional @app.get("/search/") async def search_items( q: str = Query(..., min_length=3, max_length=50), skip: int = Query(0, ge=0), limit: int = Query(10, ge=1, le=100), sort_by: Optional[str] = Query(None, regex="^(name|price|date)$"), tags: List[str] = Query([], description="Filter by tags") ): return { "q": q, "skip": skip, "limit": limit, "sort_by": sort_by, "tags": tags }
Response Models
from typing import List @app.post("/users/", response_model=UserResponse) async def create_user(user: UserCreate): # Hash password, save to DB db_user = { "id": 1, "username": user.username, "email": user.email, "full_name": user.full_name, "is_active": True } return db_user @app.get("/users/", response_model=List[UserResponse]) async def list_users(skip: int = 0, limit: int = 100): users = [...] # Fetch from database return users # Exclude fields from response class UserInDB(User): hashed_password: str @app.get("/users/{user_id}", response_model=UserResponse) async def get_user(user_id: int): user = get_user_from_db(user_id) # Returns UserInDB return user # Password excluded automatically
Async/Await Patterns
When to Use async vs def
# Use async def when: # - Making database queries with async driver # - Calling external APIs with httpx/aiohttp # - Using async I/O operations # - Working with async libraries @app.get("/async-example") async def async_endpoint(): # Can use await inside result = await async_database_query() external_data = await async_http_call() return {"result": result, "external": external_data} # Use def when: # - Working with synchronous libraries # - Performing CPU-bound operations # - No async operations needed @app.get("/sync-example") def sync_endpoint(): # Regular synchronous code result = synchronous_database_query() return {"result": result}
Async Database Operations
import asyncio from databases import Database DATABASE_URL = "postgresql://user:password@localhost/dbname" database = Database(DATABASE_URL) @app.on_event("startup") async def startup(): await database.connect() @app.on_event("shutdown") async def shutdown(): await database.disconnect() @app.get("/users/{user_id}") async def get_user(user_id: int): query = "SELECT * FROM users WHERE id = :user_id" user = await database.fetch_one(query, {"user_id": user_id}) return user @app.post("/users/") async def create_user(user: UserCreate): query = """ INSERT INTO users (username, email, hashed_password) VALUES (:username, :email, :password) RETURNING * """ hashed_password = hash_password(user.password) new_user = await database.fetch_one( query, { "username": user.username, "email": user.email, "password": hashed_password } ) return new_user
Concurrent Operations
import asyncio import httpx async def fetch_user(user_id: int): async with httpx.AsyncClient() as client: response = await client.get(f"https://api.example.com/users/{user_id}") return response.json() @app.get("/users/batch") async def get_multiple_users(user_ids: List[int] = Query(...)): # Fetch all users concurrently users = await asyncio.gather(*[fetch_user(uid) for uid in user_ids]) return {"users": users}
Dependency Injection
Basic Dependencies
from fastapi import Depends from typing import Optional # Simple dependency function async def common_parameters( q: Optional[str] = None, skip: int = 0, limit: int = 100 ): return {"q": q, "skip": skip, "limit": limit} @app.get("/items/") async def read_items(commons: dict = Depends(common_parameters)): return commons @app.get("/users/") async def read_users(commons: dict = Depends(common_parameters)): return commons
Class-Based Dependencies
class Pagination: def __init__( self, skip: int = Query(0, ge=0), limit: int = Query(100, ge=1, le=100) ): self.skip = skip self.limit = limit @app.get("/items/") async def list_items(pagination: Pagination = Depends()): return { "skip": pagination.skip, "limit": pagination.limit, "items": [] # Fetch with pagination }
Database Session Dependency
from sqlalchemy.orm import Session from typing import Generator def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: yield db finally: db.close() @app.get("/users/{user_id}") async def get_user( user_id: int, db: Session = Depends(get_db) ): user = db.query(User).filter(User.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user
Sub-Dependencies
from fastapi import Header, HTTPException async def verify_token(x_token: str = Header(...)): if x_token != "secret-token": raise HTTPException(status_code=400, detail="Invalid token") return x_token async def verify_key(x_key: str = Header(...)): if x_key != "secret-key": raise HTTPException(status_code=400, detail="Invalid key") return x_key async def verify_credentials( token: str = Depends(verify_token), key: str = Depends(verify_key) ): return {"token": token, "key": key} @app.get("/protected/") async def protected_route(credentials: dict = Depends(verify_credentials)): return {"message": "Access granted", "credentials": credentials}
Global Dependencies
async def log_requests(): print("Request received") app = FastAPI(dependencies=[Depends(log_requests)]) # This dependency runs for ALL endpoints
Authentication and Security
OAuth2 Password Bearer with JWT
from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from typing import Optional SECRET_KEY = "your-secret-key-here" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Optional[str] = None def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user_from_db(username=token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user( current_user: User = Depends(get_current_user) ): if not current_user.is_active: raise HTTPException(status_code=400, detail="Inactive user") return current_user @app.post("/token", response_model=Token) async def login(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): return current_user
API Key Authentication
from fastapi import Security from fastapi.security import APIKeyHeader API_KEY = "your-api-key" api_key_header = APIKeyHeader(name="X-API-Key") async def verify_api_key(api_key: str = Security(api_key_header)): if api_key != API_KEY: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Invalid API Key" ) return api_key @app.get("/secure-data") async def get_secure_data(api_key: str = Depends(verify_api_key)): return {"data": "This is secure data"}
OAuth2 with Scopes
from fastapi.security import OAuth2PasswordBearer, SecurityScopes oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "items:read": "Read items", "items:write": "Create and update items", "users:read": "Read user information" } ) async def get_current_user_with_scopes( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ): # Verify token and check scopes if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) # Decode JWT and verify scopes... return user @app.get("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:read"])]) async def read_items(): return [{"item": "Item 1"}, {"item": "Item 2"}] @app.post("/items/", dependencies=[Security(get_current_user_with_scopes, scopes=["items:write"])]) async def create_item(item: Item): return {"item": item}
Database Integration
SQLAlchemy Setup
from sqlalchemy import create_engine, Column, Integer, String, Boolean from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker SQLALCHEMY_DATABASE_URL = "postgresql://user:password@localhost/dbname" engine = create_engine(SQLALCHEMY_DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # Models class UserModel(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) username = Column(String, unique=True, index=True) email = Column(String, unique=True, index=True) hashed_password = Column(String) is_active = Column(Boolean, default=True) Base.metadata.create_all(bind=engine)
CRUD Operations
from sqlalchemy.orm import Session # Create @app.post("/users/", response_model=UserResponse) async def create_user(user: UserCreate, db: Session = Depends(get_db)): db_user = UserModel( username=user.username, email=user.email, hashed_password=get_password_hash(user.password) ) db.add(db_user) db.commit() db.refresh(db_user) return db_user # Read @app.get("/users/{user_id}", response_model=UserResponse) async def read_user(user_id: int, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") return user # Update @app.put("/users/{user_id}", response_model=UserResponse) async def update_user( user_id: int, user_update: UserCreate, db: Session = Depends(get_db) ): user = db.query(UserModel).filter(UserModel.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") user.username = user_update.username user.email = user_update.email if user_update.password: user.hashed_password = get_password_hash(user_update.password) db.commit() db.refresh(user) return user # Delete @app.delete("/users/{user_id}") async def delete_user(user_id: int, db: Session = Depends(get_db)): user = db.query(UserModel).filter(UserModel.id == user_id).first() if not user: raise HTTPException(status_code=404, detail="User not found") db.delete(user) db.commit() return {"message": "User deleted successfully"}
Background Tasks
from fastapi import BackgroundTasks def send_email(email: str, message: str): # Simulate sending email print(f"Sending email to {email}: {message}") def process_file(filename: str): # Simulate file processing print(f"Processing file: {filename}") @app.post("/send-notification/") async def send_notification( email: str, background_tasks: BackgroundTasks ): background_tasks.add_task(send_email, email, "Welcome!") return {"message": "Notification scheduled"} @app.post("/upload/") async def upload_file( file: str, background_tasks: BackgroundTasks ): # Save file first background_tasks.add_task(process_file, file) return {"message": "File uploaded, processing in background"}
SSE Streaming for Agent Communication
Server-Sent Events (SSE) enable real-time streaming of agent updates from the Python sidecar to the Electron renderer. This pattern is essential for providing live feedback during long-running agent operations.
Basic SSE Endpoint
from fastapi import APIRouter from sse_starlette.sse import EventSourceResponse import asyncio from typing import AsyncGenerator agent_router = APIRouter(prefix="/agent", tags=["agent"]) # Global event queues for SSE streams event_queues: dict[str, asyncio.Queue] = {} class StreamEvent(BaseModel): """Event streamed via SSE""" event_type: str # 'thinking', 'tool_call', 'result', 'error', 'keepalive' data: dict timestamp: float async def agent_event_generator(request_id: str) -> AsyncGenerator: """Generate events from agent event queue for SSE streaming""" # Create queue if not exists if request_id not in event_queues: event_queues[request_id] = asyncio.Queue() queue = event_queues[request_id] try: while True: try: # Wait for event with timeout (30s) event = await asyncio.wait_for(queue.get(), timeout=30.0) yield { "event": event.event_type, "data": event.json(), } # Stop streaming on terminal events if event.event_type in ("result", "error"): break except asyncio.TimeoutError: # Send keepalive event to prevent connection timeout yield { "event": "keepalive", "data": "{}", } except asyncio.CancelledError: pass finally: # Cleanup queue if request_id in event_queues: del event_queues[request_id] @agent_router.get("/stream/{request_id}") async def stream_events(request_id: str): """ Stream real-time events from agent execution via Server-Sent Events. Usage: - Client connects to /agent/stream/{request_id} - Server sends events as they occur - Stream ends when terminal event (result/error) is sent """ return EventSourceResponse( agent_event_generator(request_id), media_type="text/event-stream", )
Sending Events to Stream
async def send_agent_event(request_id: str, event_type: str, data: dict): """Send event to agent stream""" if request_id in event_queues: event = StreamEvent( event_type=event_type, data=data, timestamp=time.time() ) await event_queues[request_id].put(event) # Example: Stream agent thinking process async def execute_agent_with_streaming(agent_id: str, request_id: str, input_data: dict): await send_agent_event(request_id, "thinking", {"agent": agent_id, "status": "starting"}) # Execute agent logic result = await run_agent(agent_id, input_data) await send_agent_event(request_id, "result", {"agent": agent_id, "result": result})
Client-Side SSE Consumption
// In Electron renderer const eventSource = new EventSource('http://localhost:8000/agent/stream/req-123'); eventSource.addEventListener('thinking', (e) => { const data = JSON.parse(e.data); console.log('Agent thinking:', data); }); eventSource.addEventListener('result', (e) => { const data = JSON.parse(e.data); console.log('Agent result:', data); eventSource.close(); }); eventSource.onerror = () => { console.error('SSE connection error'); eventSource.close(); };
Unix Socket IPC with osxphotos Sandbox
Unix socket IPC enables secure communication between the Python sidecar and the osxphotos sandbox process. This pattern ensures osxphotos runs in isolation with no network access and strict path whitelisting.
Security Considerations
- NO network access: osxphotos process cannot make HTTP requests
- Read-only on Photos Library: osxphotos can only read from Apple Photos
- Write whitelist: Export paths must be pre-approved directories
- Sandbox isolation: Process runs in restricted environment
Unix Socket Client
import socket import json import os from typing import Optional, Dict, Any class OsxphotosSandboxClient: """Client for communicating with osxphotos sandbox via Unix socket""" def __init__(self, socket_path: str): self.socket_path = socket_path self.timeout = 30 # seconds async def extract_photos( self, album: str, export_path: str, options: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ Extract photos from album via osxphotos sandbox. Args: album: Photos album name export_path: Destination directory (must be in whitelist) options: Additional extraction options Returns: Dict with extraction results Raises: ValueError: If export path is not in whitelist ConnectionError: If socket connection fails """ # Validate export path is in whitelist if not self._is_path_allowed(export_path): raise ValueError( f"Export path not in whitelist: {export_path}. " f"Allowed paths: {self._get_allowed_paths()}" ) request = { "method": "extract", "params": { "album": album, "export_path": export_path, "options": options or {} } } try: with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: s.settimeout(self.timeout) s.connect(self.socket_path) # Send request s.sendall(json.dumps(request).encode('utf-8')) # Receive response response_data = b"" while True: chunk = s.recv(4096) if not chunk: break response_data += chunk response = json.loads(response_data.decode('utf-8')) if response.get("status") == "error": raise RuntimeError(response.get("message", "Unknown error")) return response except socket.timeout: raise ConnectionError(f"Socket timeout after {self.timeout}s") except FileNotFoundError: raise ConnectionError(f"Socket not found: {self.socket_path}") except Exception as e: raise ConnectionError(f"Socket communication failed: {e}") def _is_path_allowed(self, path: str) -> bool: """Check if path is in allowed whitelist""" expanded_path = os.path.expanduser(path) allowed_paths = self._get_allowed_paths() return any(expanded_path.startswith(p) for p in allowed_paths) def _get_allowed_paths(self) -> list[str]: """Get list of allowed export paths""" return [ os.path.expanduser("~/Exports/"), os.path.expanduser("~/Documents/TraeExports/"), os.path.expanduser("~/Desktop/TraeExports/"), ]
Unix Socket Server (osxphotos Side)
import socket import json import os from pathlib import Path class OsxphotosSandboxServer: """Unix socket server for osxphotos sandbox""" def __init__(self, socket_path: str): self.socket_path = socket_path self.running = False def start(self): """Start the Unix socket server""" # Remove existing socket if present if os.path.exists(self.socket_path): os.unlink(self.socket_path) server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) server_socket.bind(self.socket_path) server_socket.listen(5) self.running = True print(f"osxphotos sandbox server listening on {self.socket_path}") while self.running: try: conn, _ = server_socket.accept() self._handle_request(conn) except KeyboardInterrupt: break except Exception as e: print(f"Error handling connection: {e}") server_socket.close() def _handle_request(self, conn: socket.socket): """Handle incoming request""" try: # Receive request request_data = b"" while True: chunk = conn.recv(4096) if not chunk: break request_data += chunk request = json.loads(request_data.decode('utf-8')) # Process request if request["method"] == "extract": response = self._extract_photos(request["params"]) else: response = { "status": "error", "message": f"Unknown method: {request['method']}" } # Send response conn.sendall(json.dumps(response).encode('utf-8')) except Exception as e: error_response = { "status": "error", "message": str(e) } conn.sendall(json.dumps(error_response).encode('utf-8')) finally: conn.close() def _extract_photos(self, params: dict) -> dict: """Extract photos using osxphotos (sandboxed)""" # Import osxphotos here to ensure it's loaded in sandbox import osxphotos album = params["album"] export_path = params["export_path"] options = params.get("options", {}) # Validate export path if not self._is_path_allowed(export_path): return { "status": "error", "message": f"Export path not allowed: {export_path}" } # Extract photos photos = osxphotos.export( album, export_path, **options ) return { "status": "success", "photos": photos, "count": len(photos) } def _is_path_allowed(self, path: str) -> bool: """Check if path is in allowed whitelist""" expanded_path = os.path.expanduser(path) allowed_paths = [ os.path.expanduser("~/Exports/"), os.path.expanduser("~/Documents/TraeExports/"), os.path.expanduser("~/Desktop/TraeExports/"), ] return any(expanded_path.startswith(p) for p in allowed_paths) def stop(self): """Stop the server""" self.running = False if os.path.exists(self.socket_path): os.unlink(self.socket_path)
Integration with FastAPI
from fastapi import FastAPI, HTTPException app = FastAPI() # Initialize osxphotos client osxphotos_client = OsxphotosSandboxClient("/tmp/osxphotos.sock") @app.post("/photos/extract") async def extract_photos(album: str, export_path: str): """Extract photos from album via osxphotos sandbox""" try: result = await osxphotos_client.extract_photos(album, export_path) return result except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) except ConnectionError as e: raise HTTPException(status_code=503, detail="osxphotos sandbox not available")
Circuit Breaker for Sidecar Lifecycle
Circuit breaker pattern prevents cascading failures when the Python sidecar crashes repeatedly. It tracks failures and temporarily stops restart attempts after a threshold.
Circuit Breaker Implementation
from datetime import datetime, timedelta from typing import List, Optional import logging logger = logging.getLogger(__name__) class CircuitBreaker: """Circuit breaker for sidecar lifecycle management""" def __init__( self, max_failures: int = 3, window_minutes: int = 5, cooldown_minutes: int = 10 ): """ Initialize circuit breaker. Args: max_failures: Maximum failures before opening circuit window_minutes: Time window to count failures cooldown_minutes: Time to wait before retrying after opening """ self.max_failures = max_failures self.window_minutes = window_minutes self.cooldown_minutes = cooldown_minutes self.failures: List[datetime] = [] self.state = "closed" # closed, open, half-open self.last_failure_time: Optional[datetime] = None def record_failure(self): """Record a failure and update circuit state""" now = datetime.now() self.failures.append(now) self.last_failure_time = now # Remove failures outside window self.failures = [ f for f in self.failures if now - f < timedelta(minutes=self.window_minutes) ] # Check if threshold exceeded if len(self.failures) >= self.max_failures: self.state = "open" logger.warning( f"Circuit breaker OPEN: {len(self.failures)} failures in " f"{self.window_minutes} minutes. Cooldown for {self.cooldown_minutes} minutes." ) def record_success(self): """Record a success and reset circuit state""" self.failures.clear() self.state = "closed" self.last_failure_time = None logger.info("Circuit breaker CLOSED: Sidecar is healthy") def can_execute(self) -> bool: """ Check if operation can be executed. Returns: True if circuit is closed or half-open, False if open """ if self.state == "closed": return True if self.state == "open": # Check if cooldown period has passed if self.last_failure_time: time_since_failure = datetime.now() - self.last_failure_time if time_since_failure >= timedelta(minutes=self.cooldown_minutes): self.state = "half-open" logger.info("Circuit breaker HALF-OPEN: Attempting recovery") return True return False if self.state == "half-open": return True return False def get_status(self) -> dict: """Get current circuit breaker status""" return { "state": self.state, "failures_in_window": len(self.failures), "max_failures": self.max_failures, "last_failure": self.last_failure_time.isoformat() if self.last_failure_time else None, "can_execute": self.can_execute() }
Integration with Sidecar Manager
from typing import Optional class SidecarManager: """Manages Python sidecar lifecycle with circuit breaker""" def __init__(self): self.circuit_breaker = CircuitBreaker( max_failures=3, window_minutes=5, cooldown_minutes=10 ) self.process: Optional[subprocess.Popen] = None async def start(self) -> bool: """Start sidecar if circuit breaker allows""" if not self.circuit_breaker.can_execute(): logger.warning( f"Cannot start sidecar: Circuit breaker is {self.circuit_breaker.state}" ) return False try: # Start sidecar process self.process = subprocess.Popen( ["python", "main.py"], cwd="python/", stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Wait for health check await self._wait_for_health() # Record success self.circuit_breaker.record_success() return True except Exception as e: logger.error(f"Failed to start sidecar: {e}") self.circuit_breaker.record_failure() return False async def _wait_for_health(self, timeout: int = 30): """Wait for sidecar to become healthy""" import httpx start_time = time.time() while time.time() - start_time < timeout: try: response = await httpx.get("http://localhost:8000/health") if response.status_code == 200: return except Exception: pass await asyncio.sleep(1) raise TimeoutError("Sidecar health check timeout") def stop(self): """Stop sidecar process""" if self.process: self.process.terminate() self.process.wait() self.process = None def get_status(self) -> dict: """Get sidecar status including circuit breaker""" return { "running": self.process is not None, "circuit_breaker": self.circuit_breaker.get_status() }
Usage Example
# Initialize sidecar manager sidecar_manager = SidecarManager() # Try to start sidecar if await sidecar_manager.start(): print("Sidecar started successfully") else: print("Sidecar cannot start (circuit breaker open)") print(f"Status: {sidecar_manager.get_status()}") # Check status status = sidecar_manager.get_status() print(f"Circuit breaker state: {status['circuit_breaker']['state']}") print(f"Can execute: {status['circuit_breaker']['can_execute']}")
Error Handling
Custom Exception Handlers
from fastapi import Request, status from fastapi.responses import JSONResponse from fastapi.exceptions import RequestValidationError class CustomException(Exception): def __init__(self, name: str): self.name = name @app.exception_handler(CustomException) async def custom_exception_handler(request: Request, exc: CustomException): return JSONResponse( status_code=418, content={"message": f"Oops! {exc.name} did something wrong."}, ) @app.exception_handler(RequestValidationError) async def validation_exception_handler(request: Request, exc: RequestValidationError): return JSONResponse( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, content={"detail": exc.errors()}, ) @app.get("/items/{item_id}") async def read_item(item_id: str): if item_id == "error": raise CustomException(name="Item") return {"item_id": item_id}
Testing
Basic Tests with TestClient
from fastapi.testclient import TestClient client = TestClient(app) def test_read_main(): response = client.get("/") assert response.status_code == 200 assert response.json() == {"message": "Hello World"} def test_create_item(): response = client.post( "/items/", json={"name": "Test Item", "price": 10.5} ) assert response.status_code == 200 assert response.json()["name"] == "Test Item" def test_authentication(): response = client.post( "/token", data={"username": "testuser", "password": "testpass"} ) assert response.status_code == 200 assert "access_token" in response.json()
Async Tests
import pytest from httpx import AsyncClient @pytest.mark.anyio async def test_read_items(): async with AsyncClient(app=app, base_url="http://test") as ac: response = await ac.get("/items/") assert response.status_code == 200 assert isinstance(response.json(), list) @pytest.mark.anyio async def test_create_user(): async with AsyncClient(app=app, base_url="http://test") as ac: response = await ac.post( "/users/", json={ "username": "newuser", "email": "new@example.com", "password": "securepass123" } ) assert response.status_code == 200 assert response.json()["username"] == "newuser"
Routers and Organization
APIRouter for Modular Code
# routers/users.py from fastapi import APIRouter, Depends router = APIRouter( prefix="/users", tags=["users"], dependencies=[Depends(verify_token)], responses={404: {"description": "Not found"}}, ) @router.get("/") async def list_users(): return [{"username": "user1"}, {"username": "user2"}] @router.get("/{user_id}") async def get_user(user_id: int): return {"user_id": user_id} # main.py from routers import users app = FastAPI() app.include_router(users.router)
Best Practices
1. Project Structure
my_fastapi_project/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── models/ │ │ ├── __init__.py │ │ ├── user.py │ │ └── item.py │ ├── schemas/ │ │ ├── __init__.py │ │ ├── user.py │ │ └── item.py │ ├── routers/ │ │ ├── __init__.py │ │ ├── users.py │ │ └── items.py │ ├── dependencies/ │ │ ├── __init__.py │ │ ├── auth.py │ │ └── database.py │ └── utils/ │ ├── __init__.py │ └── security.py ├── tests/ │ ├── __init__.py │ ├── test_users.py │ └── test_items.py ├── requirements.txt ├── .env └── README.md
2. Configuration Management
from pydantic import BaseSettings class Settings(BaseSettings): app_name: str = "My FastAPI App" database_url: str secret_key: str algorithm: str = "HS256" access_token_expire_minutes: int = 30 class Config: env_file = ".env" settings = Settings()
3. Documentation
app = FastAPI( title="My API", description="This is a very custom API", version="1.0.0", openapi_tags=[ { "name": "users", "description": "Operations with users.", }, { "name": "items", "description": "Manage items.", }, ] ) @app.post( "/items/", response_model=Item, tags=["items"], summary="Create an item", description="Create an item with all the information", response_description="The created item", ) async def create_item(item: Item): return item
Production Deployment
Docker Setup
# Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY ./app /app CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
Run with Gunicorn and Uvicorn Workers
gunicorn main:app --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Skill Version: 1.0.0 Last Updated: October 2025 Skill Category: Backend Development, API Development, Python Compatible With: FastAPI 0.100+, Python 3.7+, Pydantic 2.0+