Software_development_department fastapi-pro
Production FastAPI patterns — async endpoints, SQLAlchemy 2.0 async, Pydantic V2, dependency injection, JWT auth, testing. Use for Python 3.11+ FastAPI backends. NOT for Django (→ `django-patterns`) or Node.js (→ `backend-patterns`).
install
source · Clone the upstream repo
git clone https://github.com/tranhieutt/software_development_department
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/tranhieutt/software_development_department "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/fastapi-pro" ~/.claude/skills/tranhieutt-software-development-department-fastapi-pro && rm -rf "$T"
manifest:
.claude/skills/fastapi-pro/SKILL.mdsource content
FastAPI Production Patterns
Critical rules (non-obvious)
endpoint blocking sync DB call → blocks entire event loop. Either useasync def
DB driver (asyncpg/aiomysql) throughout OR switch endpoint to plainasync
(FastAPI runs it in threadpool).def- Pydantic V2
replaces V1model_config = ConfigDict(...)
. Forgetting this silently loses settings likeclass Config
needed for ORM → DTO conversion.from_attributes=True
caches per-request: same dependency called twice in one request returns same instance. Don't rely on this for cross-request state — use app state / Redis instead.Depends()- SQLAlchemy 2.0 async session must not leak across requests: always scope via
withDepends
— raw module-level session causesasync with AsyncSession(...)
under load.GreenletError
runs AFTER response sent in the same worker process: if worker dies mid-task the work is lost. For durable background jobs use Celery / Dramatiq / ARQ.BackgroundTasks- Uvicorn
forks processes — can't share in-memory state. Use Redis or DB for any shared state (rate-limit counters, cache).--workers N
Project layout
app/ ├── main.py # FastAPI() instance + lifespan ├── api/ │ ├── deps.py # shared Depends (get_db, get_current_user) │ └── v1/ │ ├── users.py # APIRouter │ └── products.py ├── core/ │ ├── config.py # Pydantic Settings │ ├── security.py # JWT encode/decode, password hashing │ └── db.py # engine + AsyncSession factory ├── models/ # SQLAlchemy ORM models ├── schemas/ # Pydantic DTOs (Request/Response) ├── services/ # business logic (no framework coupling) └── tests/
Pydantic V2 settings + config
# app/core/config.py from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): model_config = SettingsConfigDict(env_file=".env", env_prefix="APP_") database_url: str = Field(..., description="postgresql+asyncpg://...") jwt_secret: str = Field(..., min_length=32) jwt_algorithm: str = "HS256" jwt_exp_minutes: int = 30 cors_origins: list[str] = Field(default_factory=list) settings = Settings() # fails fast at import if required vars missing
SQLAlchemy 2.0 async session (per-request)
# app/core/db.py from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker engine = create_async_engine( settings.database_url, pool_size=20, max_overflow=10, pool_pre_ping=True, # reconnect on stale conns (LB idle timeout) echo=False, ) SessionLocal = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) # app/api/deps.py from typing import AsyncIterator from fastapi import Depends async def get_db() -> AsyncIterator[AsyncSession]: async with SessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise # close is automatic via `async with`
Lifespan + startup/shutdown
# app/main.py from contextlib import asynccontextmanager from fastapi import FastAPI @asynccontextmanager async def lifespan(app: FastAPI): # Startup: warm caches, test DB async with engine.begin() as conn: await conn.execute(text("SELECT 1")) yield # Shutdown: drain connections await engine.dispose() app = FastAPI(title="My API", lifespan=lifespan)
JWT auth with OAuth2PasswordBearer
# app/core/security.py from datetime import datetime, timedelta, timezone from jose import jwt, JWTError from passlib.context import CryptContext pwd = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(p: str) -> str: return pwd.hash(p) def verify_password(p: str, h: str) -> bool: return pwd.verify(p, h) def create_access_token(sub: str) -> str: exp = datetime.now(timezone.utc) + timedelta(minutes=settings.jwt_exp_minutes) return jwt.encode({"sub": sub, "exp": exp}, settings.jwt_secret, settings.jwt_algorithm) # app/api/deps.py from fastapi import HTTPException, status from fastapi.security import OAuth2PasswordBearer oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") async def get_current_user( token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db), ) -> User: try: payload = jwt.decode(token, settings.jwt_secret, algorithms=[settings.jwt_algorithm]) user_id: str = payload.get("sub") except JWTError: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Invalid token") user = await db.get(User, user_id) if not user: raise HTTPException(status.HTTP_401_UNAUTHORIZED, "User not found") return user
Endpoint pattern (thin controller, service below)
# app/api/v1/users.py from fastapi import APIRouter, Depends, HTTPException, status router = APIRouter(prefix="/users", tags=["users"]) @router.post("", response_model=UserOut, status_code=status.HTTP_201_CREATED) async def create_user( payload: UserCreate, db: AsyncSession = Depends(get_db), ) -> UserOut: try: user = await user_service.create(db, payload) except DuplicateEmailError: raise HTTPException(status.HTTP_409_CONFLICT, "Email taken") return UserOut.model_validate(user) # V2 from-attributes @router.get("/{user_id}", response_model=UserOut) async def get_user( user_id: int, db: AsyncSession = Depends(get_db), current: User = Depends(get_current_user), ) -> UserOut: user = await db.get(User, user_id) if not user: raise HTTPException(status.HTTP_404_NOT_FOUND) if user.id != current.id and not current.is_admin: raise HTTPException(status.HTTP_403_FORBIDDEN) return UserOut.model_validate(user)
Pydantic V2 schemas with from_attributes
from_attributesfrom pydantic import BaseModel, ConfigDict, EmailStr, Field class UserBase(BaseModel): model_config = ConfigDict(from_attributes=True) email: EmailStr full_name: str = Field(min_length=1, max_length=100) class UserCreate(UserBase): password: str = Field(min_length=8, max_length=128) class UserOut(UserBase): id: int created_at: datetime
Global exception handler
# app/main.py from fastapi import Request from fastapi.responses import JSONResponse class AppError(Exception): def __init__(self, msg: str, status_code: int = 400): self.msg, self.status_code = msg, status_code @app.exception_handler(AppError) async def app_error_handler(req: Request, exc: AppError): return JSONResponse(status_code=exc.status_code, content={"error": exc.msg}) @app.exception_handler(Exception) async def unhandled(req: Request, exc: Exception): logger.exception("Unhandled error", extra={"path": req.url.path}) return JSONResponse(status_code=500, content={"error": "Internal server error"})
Testing with pytest-asyncio
# tests/conftest.py import pytest_asyncio from httpx import AsyncClient, ASGITransport @pytest_asyncio.fixture async def client(): async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: yield c @pytest_asyncio.fixture async def db_session(): async with SessionLocal() as s: yield s await s.rollback() # isolate test # tests/test_users.py @pytest.mark.asyncio async def test_create_user(client: AsyncClient): r = await client.post("/api/v1/users", json={"email": "a@b.com", "full_name": "A", "password": "pw12345678"}) assert r.status_code == 201 assert r.json()["email"] == "a@b.com"
Production deployment (Uvicorn + Gunicorn)
# Dockerfile CMD — recommended for production gunicorn app.main:app \ --workers 4 \ --worker-class uvicorn.workers.UvicornWorker \ --bind 0.0.0.0:8000 \ --timeout 60 \ --keep-alive 5 \ --access-logfile -
Workers =
(2 × CPU) + 1 for CPU-bound; lower for IO-heavy async (async workers share event loop already).
Common pitfalls
| Pitfall | Fix |
|---|---|
+ sync / | Use / OR drop on endpoint |
V1 pattern () | V2 uses |
missing → validation error from ORM instance | Add to on every DTO reading from ORM |
Forgetting on SQLAlchemy 2.0 async query | Linter — use type stubs + mypy |
| Returning ORM object → leaks relationships (N+1 on serialize) | Always to scoped Pydantic DTO |
for durable work | Switch to Celery / Dramatiq / ARQ |
added AFTER auth middleware | CORS must be OUTERMOST — added first |
+ return extra fields → silently stripped | Use consciously |
Observability hooks
# Structured logging import structlog logger = structlog.get_logger() @app.middleware("http") async def log_requests(request: Request, call_next): start = time.monotonic() response = await call_next(request) logger.info("http_request", method=request.method, path=request.url.path, status=response.status_code, duration_ms=(time.monotonic() - start) * 1000, ) return response
- Add
forprometheus-fastapi-instrumentator
./metrics - Health check: GET
returns/healthz
+ DB{"status": "ok"}
.SELECT 1 - Request ID:
header middleware → bind to contextvars for logging.X-Request-ID