Claude-skill-registry fastapi-backend
FastAPI routes, services, auth JWT, streaming, error handling, OpenAI integration
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/fastapi-backend" ~/.claude/skills/majiayu000-claude-skill-registry-fastapi-backend && rm -rf "$T"
manifest:
skills/data/fastapi-backend/SKILL.mdsource content
FastAPI Backend — CEI-001
Architecture Pattern
Route Layer (FastAPI) ↓ (validates request) Service Layer (business logic) ↓ (Pydantic schema validation) Data Layer (SQLAlchemy ORM) ↓ Response (Pydantic schema)
Route Patterns
Basic CRUD Route
from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.schemas.evaluation import EvaluationCreate, EvaluationResponse from app.services.evaluation_service import EvaluationService from app.api.deps import get_db, get_current_user router = APIRouter(prefix="/api/evaluations", tags=["evaluations"]) @router.post("", response_model=EvaluationResponse, status_code=status.HTTP_201_CREATED) async def create_evaluation( eval_data: EvaluationCreate, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ) -> EvaluationResponse: """ Create new evaluation for user Example: ``` POST /api/evaluations { "company_id": "123e4567-e89b-12d3-a456-426614174000", "project_type": "new_erp" } ``` """ service = EvaluationService(db) return await service.create(current_user.id, eval_data) @router.get("/{eval_id}", response_model=EvaluationResponse) async def get_evaluation( eval_id: str, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ) -> EvaluationResponse: """Get evaluation by ID""" service = EvaluationService(db) evaluation = await service.get(eval_id, current_user.id) if not evaluation: raise HTTPException(status_code=404, detail="Evaluation not found") return evaluation @router.get("", response_model=list[EvaluationResponse]) async def list_evaluations( db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user), skip: int = 0, limit: int = 10 ) -> list[EvaluationResponse]: """List user's evaluations""" service = EvaluationService(db) return await service.list(current_user.id, skip, limit) @router.put("/{eval_id}", response_model=EvaluationResponse) async def update_evaluation( eval_id: str, eval_data: EvaluationUpdate, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ) -> EvaluationResponse: """Update evaluation""" service = EvaluationService(db) evaluation = await service.update(eval_id, current_user.id, eval_data) if not evaluation: raise HTTPException(status_code=404, detail="Evaluation not found") return evaluation @router.delete("/{eval_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_evaluation( eval_id: str, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """Delete evaluation""" service = EvaluationService(db) success = await service.delete(eval_id, current_user.id) if not success: raise HTTPException(status_code=404, detail="Evaluation not found")
Streaming Route (Chat)
from fastapi.responses import StreamingResponse @router.post("/api/chat/message") async def send_message( message_data: ChatMessageCreate, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ) -> StreamingResponse: """ Send message to chat agent, stream response Streams back JSON lines: {"type": "content", "data": "text"} """ service = ChatService(db) async def generate(): async for chunk in service.stream_response( conversation_id=message_data.conversation_id, user_id=current_user.id, message=message_data.content ): # Send as JSON line yield f"data: {json.dumps(chunk)}\n\n" return StreamingResponse(generate(), media_type="text/event-stream")
Admin Protected Route
from app.api.deps import get_admin_user @router.post("/api/admin/documents/{doc_id}/pipeline") async def start_pipeline( doc_id: str, config: PipelineConfig, db: AsyncSession = Depends(get_db), admin_user = Depends(get_admin_user) ): """Start document pipeline (admin only)""" service = DocumentPipelineService(db) return await service.start(doc_id, config)
Service Layer Pattern
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select class EvaluationService: def __init__(self, db: AsyncSession): self.db = db async def create(self, user_id: str, eval_data: EvaluationCreate): """Create evaluation with validation""" # Validate user exists user = await self.db.get(User, user_id) if not user: raise ValueError("User not found") # Create record evaluation = Evaluation( user_id=user_id, **eval_data.dict() ) self.db.add(evaluation) await self.db.commit() await self.db.refresh(evaluation) return evaluation async def get(self, eval_id: str, user_id: str): """Get evaluation (owner only)""" stmt = select(Evaluation).where( (Evaluation.id == eval_id) & (Evaluation.user_id == user_id) ) result = await self.db.execute(stmt) return result.scalar_one_or_none() async def list(self, user_id: str, skip: int = 0, limit: int = 10): """List user evaluations""" stmt = select(Evaluation)\ .where(Evaluation.user_id == user_id)\ .offset(skip)\ .limit(limit) result = await self.db.execute(stmt) return result.scalars().all()
Auth & Dependencies
# app/api/deps.py from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from sqlalchemy.ext.asyncio import AsyncSession from app.db.database import get_session from app.core.security import decode_access_token from app.models.user import User security = HTTPBearer() async def get_db() -> AsyncSession: """Get async session""" async with get_session() as session: yield session async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncSession = Depends(get_db) ) -> User: """Get authenticated user""" token = credentials.credentials user_id = decode_access_token(token) if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token invalide ou expiré" ) user = await db.get(User, user_id) if not user: raise HTTPException(status_code=401, detail="User not found") return user async def get_admin_user( current_user = Depends(get_current_user) ) -> User: """Get admin user (verify role)""" if current_user.role != "admin": raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required" ) return current_user
OpenAI Integration
from openai import AsyncOpenAI class ChatService: def __init__(self, db: AsyncSession): self.db = db self.client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY) self.rag_service = RAGService() async def stream_response(self, conversation_id: str, user_id: str, message: str): """Stream chat response with RAG context""" # Save user message user_msg = Message( conversation_id=conversation_id, role="user", content=message ) self.db.add(user_msg) await self.db.commit() # Get RAG context context = await self.rag_service.search(message, limit=3) # Build prompt system_prompt = self._build_system_prompt(context) # Stream from OpenAI async with self.client.messages.stream( model="gpt-4-turbo-preview", max_tokens=1024, system=system_prompt, messages=[ {"role": "user", "content": message} ] ) as stream: full_response = "" async for text in stream.text_stream: full_response += text yield {"type": "content", "data": text} # Save assistant response assistant_msg = Message( conversation_id=conversation_id, role="assistant", content=full_response, sources=context ) self.db.add(assistant_msg) await self.db.commit() yield {"type": "done"} def _build_system_prompt(self, context: list) -> str: """Build system prompt with RAG context""" sources = "\n".join([f"- {c['title']}: {c['content'][:200]}..." for c in context]) return f"""Tu es un expert ERP pour PME manufacturières. Contexte de connaissances: {sources} Réponds aux questions de l'utilisateur en utilisant ce contexte. Cite les sources quand tu utilises le contexte. Sois concis et pratique."""
Error Handling
@router.post("/api/chat/message") async def send_message( message_data: ChatMessageCreate, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user) ): """Send message with proper error handling""" try: service = ChatService(db) async for chunk in service.stream_response(...): yield chunk except ValueError as e: yield {"type": "error", "message": str(e)} except HTTPException as e: raise e except Exception as e: logger.error(f"Chat error: {e}") yield {"type": "error", "message": "Internal server error"}
Conventions
- Routes start with
/api/ - Use descriptive HTTP methods (POST create, PUT update, DELETE delete)
- Always return status codes (201 create, 200 ok, 404 not found, 422 validation, 500 error)
- Docstring required on all route functions
- Use Pydantic schemas for request/response validation
- Service layer handles business logic
- Type hints everywhere
- Use
for async operationsasync/await - Error handling explicit with HTTPException
- Log important operations
- Pagination:
andskip
parameterslimit