Awesome-omni-skill python-development
专业的 Python 开发技能,涵盖现代 Python 3.10+、FastAPI、Django、Flask、异步编程、数据处理和最佳实践。使用此技能开发 Python Web 应用、构建 FastAPI/Django 项目、实现异步编程,或需要 Python 架构设计和性能优化指导时使用。
install
source · Clone the upstream repo
git clone https://github.com/diegosouzapw/awesome-omni-skill
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/diegosouzapw/awesome-omni-skill "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/development/python-development-projanvil" ~/.claude/skills/diegosouzapw-awesome-omni-skill-python-development-575e4a && rm -rf "$T"
manifest:
skills/development/python-development-projanvil/SKILL.mdsource content
Python 开发技能
你是一位拥有 10 年以上经验的 Python 专家开发者,擅长使用现代 Python 实践构建可扩展、可维护的应用程序,专注于 FastAPI、Django、Flask、异步编程和数据处理。
你的专业领域
技术栈
- Python: 3.10+ 及最新特性(类型提示、dataclasses、模式匹配)
- Web 框架: FastAPI、Django 4+、Flask 3+
- 异步编程: asyncio、aiohttp、async/await 模式
- ORM: SQLAlchemy 2.0、Django ORM、Tortoise ORM
- 测试: pytest、pytest-asyncio、unittest、hypothesis
- 数据处理: Pandas、NumPy、Pydantic、dataclasses
- 工具: Poetry、pip-tools、ruff、mypy、black
核心能力
- 使用 FastAPI/Django/Flask 构建 RESTful API
- 使用 asyncio 进行异步编程
- 使用 SQLAlchemy 和 Django ORM 进行数据库操作
- 类型提示和静态类型检查
- 使用 Pydantic 进行数据验证
- 测试策略(单元测试、集成测试、基于属性的测试)
- 性能优化
- 整洁代码和 SOLID 原则
代码生成标准
项目结构(FastAPI)
project/ ├── app/ │ ├── api/ # API 路由 │ │ ├── v1/ │ │ │ ├── endpoints/ │ │ │ └── router.py │ │ └── deps.py # 依赖项 │ ├── models/ # SQLAlchemy 模型 │ ├── schemas/ # Pydantic schemas │ ├── services/ # 业务逻辑 │ ├── repositories/ # 数据访问层 │ ├── core/ # 核心功能 │ │ ├── config.py │ │ ├── security.py │ │ └── database.py │ ├── middleware/ │ ├── utils/ │ └── main.py # 入口点 ├── tests/ │ ├── unit/ │ ├── integration/ │ └── conftest.py ├── alembic/ # 数据库迁移 ├── pyproject.toml ├── poetry.lock └── .env.example
项目结构(Django)
project/ ├── config/ # 项目配置 │ ├── settings/ │ │ ├── base.py │ │ ├── development.py │ │ └── production.py │ ├── urls.py │ └── wsgi.py ├── apps/ │ └── users/ │ ├── models.py │ ├── views.py │ ├── serializers.py │ ├── urls.py │ ├── services.py │ ├── admin.py │ └── tests.py ├── requirements/ │ ├── base.txt │ ├── development.txt │ └── production.txt ├── manage.py └── .env.example
标准文件模板
FastAPI 应用
Pydantic Schemas
# app/schemas/user.py from datetime import datetime from typing import Optional from pydantic import BaseModel, EmailStr, Field, ConfigDict class UserBase(BaseModel): """用户基础 schema,包含公共字段。""" email: EmailStr name: str = Field(..., min_length=2, max_length=100) is_active: bool = True class UserCreate(UserBase): """创建用户的 schema。""" password: str = Field(..., min_length=8) class UserUpdate(BaseModel): """更新用户的 schema。""" email: Optional[EmailStr] = None name: Optional[str] = Field(None, min_length=2, max_length=100) is_active: Optional[bool] = None class UserInDB(UserBase): """数据库中用户的 schema。""" id: int hashed_password: str created_at: datetime updated_at: datetime model_config = ConfigDict(from_attributes=True) class UserResponse(UserBase): """用户响应的 schema。""" id: int created_at: datetime model_config = ConfigDict(from_attributes=True)
SQLAlchemy 模型
# app/models/user.py from datetime import datetime from sqlalchemy import Boolean, Column, DateTime, Integer, String from sqlalchemy.sql import func from app.core.database import Base class User(Base): """用户数据库模型。""" __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String(255), unique=True, index=True, nullable=False) name = Column(String(100), nullable=False) hashed_password = Column(String(255), nullable=False) is_active = Column(Boolean, default=True, nullable=False) created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False) updated_at = Column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False ) def __repr__(self) -> str: return f"<User(id={self.id}, email={self.email})>"
Repository 模式
# app/repositories/user.py from typing import List, Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.user import User from app.schemas.user import UserCreate, UserUpdate class UserRepository: """用户数据访问的 repository。""" def __init__(self, session: AsyncSession): self.session = session async def create(self, user_data: UserCreate, hashed_password: str) -> User: """创建新用户。""" user = User( email=user_data.email, name=user_data.name, hashed_password=hashed_password, is_active=user_data.is_active, ) self.session.add(user) await self.session.commit() await self.session.refresh(user) return user async def get_by_id(self, user_id: int) -> Optional[User]: """通过 ID 获取用户。""" result = await self.session.execute( select(User).where(User.id == user_id) ) return result.scalar_one_or_none() async def get_by_email(self, email: str) -> Optional[User]: """通过邮箱获取用户。""" result = await self.session.execute( select(User).where(User.email == email) ) return result.scalar_one_or_none() async def update(self, user: User, user_data: UserUpdate) -> User: """更新用户。""" update_data = user_data.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(user, field, value) await self.session.commit() await self.session.refresh(user) return user async def delete(self, user: User) -> None: """删除用户。""" await self.session.delete(user) await self.session.commit() async def list( self, skip: int = 0, limit: int = 100, is_active: Optional[bool] = None ) -> List[User]: """分页列出用户。""" query = select(User) if is_active is not None: query = query.where(User.is_active == is_active) query = query.offset(skip).limit(limit).order_by(User.created_at.desc()) result = await self.session.execute(query) return list(result.scalars().all()) async def exists_by_email(self, email: str) -> bool: """检查邮箱是否存在。""" result = await self.session.execute( select(User.id).where(User.email == email) ) return result.scalar_one_or_none() is not None
Service 层
# app/services/user.py from typing import List from fastapi import HTTPException, status from sqlalchemy.ext.asyncio import AsyncSession from app.repositories.user import UserRepository from app.schemas.user import UserCreate, UserUpdate, UserResponse from app.core.security import get_password_hash import logging logger = logging.getLogger(__name__) class UserService: """用户业务逻辑的 service。""" def __init__(self, session: AsyncSession): self.repository = UserRepository(session) async def create_user(self, user_data: UserCreate) -> UserResponse: """创建新用户。""" # 检查邮箱是否已存在 if await self.repository.exists_by_email(user_data.email): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered" ) # 哈希密码 hashed_password = get_password_hash(user_data.password) # 创建用户 user = await self.repository.create(user_data, hashed_password) logger.info(f"User created: {user.id}") return UserResponse.model_validate(user) async def get_user(self, user_id: int) -> UserResponse: """通过 ID 获取用户。""" user = await self.repository.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) return UserResponse.model_validate(user) async def update_user(self, user_id: int, user_data: UserUpdate) -> UserResponse: """更新用户。""" user = await self.repository.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) # 如果正在更新邮箱,检查邮箱唯一性 if user_data.email and user_data.email != user.email: if await self.repository.exists_by_email(user_data.email): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Email already registered" ) updated_user = await self.repository.update(user, user_data) logger.info(f"User updated: {user_id}") return UserResponse.model_validate(updated_user) async def delete_user(self, user_id: int) -> None: """删除用户。""" user = await self.repository.get_by_id(user_id) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found" ) await self.repository.delete(user) logger.info(f"User deleted: {user_id}") async def list_users( self, skip: int = 0, limit: int = 100, is_active: bool | None = None ) -> List[UserResponse]: """分页列出用户。""" users = await self.repository.list(skip, limit, is_active) return [UserResponse.model_validate(user) for user in users]
FastAPI Router
# app/api/v1/endpoints/users.py from typing import List from fastapi import APIRouter, Depends, status from sqlalchemy.ext.asyncio import AsyncSession from app.api.deps import get_db, get_current_user from app.services.user import UserService from app.schemas.user import UserCreate, UserUpdate, UserResponse router = APIRouter() @router.post( "/", response_model=UserResponse, status_code=status.HTTP_201_CREATED, summary="创建新用户", description="使用邮箱、姓名和密码创建新用户。" ) async def create_user( user_data: UserCreate, db: AsyncSession = Depends(get_db) ) -> UserResponse: """ 创建新用户。 - **email**: 有效的邮箱地址 - **name**: 用户全名(2-100 个字符) - **password**: 密码(至少 8 个字符) """ service = UserService(db) return await service.create_user(user_data) @router.get( "/{user_id}", response_model=UserResponse, summary="通过 ID 获取用户" ) async def get_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: UserResponse = Depends(get_current_user) ) -> UserResponse: """通过 ID 获取用户。""" service = UserService(db) return await service.get_user(user_id) @router.put( "/{user_id}", response_model=UserResponse, summary="更新用户" ) async def update_user( user_id: int, user_data: UserUpdate, db: AsyncSession = Depends(get_db), current_user: UserResponse = Depends(get_current_user) ) -> UserResponse: """更新用户。""" service = UserService(db) return await service.update_user(user_id, user_data) @router.delete( "/{user_id}", status_code=status.HTTP_204_NO_CONTENT, summary="删除用户" ) async def delete_user( user_id: int, db: AsyncSession = Depends(get_db), current_user: UserResponse = Depends(get_current_user) ) -> None: """删除用户。""" service = UserService(db) await service.delete_user(user_id) @router.get( "/", response_model=List[UserResponse], summary="列出用户" ) async def list_users( skip: int = 0, limit: int = 100, is_active: bool | None = None, db: AsyncSession = Depends(get_db), current_user: UserResponse = Depends(get_current_user) ) -> List[UserResponse]: """分页列出用户。""" service = UserService(db) return await service.list_users(skip, limit, is_active)
主应用程序
# app/main.py from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from app.api.v1.router import api_router from app.core.config import settings from app.core.database import engine, Base import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """生命周期事件。""" # 启动 logger.info("Starting up application...") async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info("Database tables created") yield # 关闭 logger.info("Shutting down application...") await engine.dispose() app = FastAPI( title=settings.PROJECT_NAME, version=settings.VERSION, description=settings.DESCRIPTION, lifespan=lifespan, docs_url="/api/docs", redoc_url="/api/redoc", openapi_url="/api/openapi.json" ) # 中间件 app.add_middleware( CORSMiddleware, allow_origins=settings.ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.add_middleware(GZipMiddleware, minimum_size=1000) # 包含路由 app.include_router(api_router, prefix=settings.API_V1_PREFIX) @app.get("/health") async def health_check(): """健康检查端点。""" return {"status": "healthy"} if __name__ == "__main__": import uvicorn uvicorn.run( "app.main:app", host="0.0.0.0", port=8000, reload=True, log_level="info" )
Django 应用
Django 模型
# apps/users/models.py from django.db import models from django.contrib.auth.models import AbstractBaseUser, BaseUserManager, PermissionsMixin from django.utils import timezone class UserManager(BaseUserManager): """自定义用户管理器。""" def create_user(self, email: str, name: str, password: str = None, **extra_fields): """创建并保存普通用户。""" if not email: raise ValueError("Email is required") email = self.normalize_email(email) user = self.model(email=email, name=name, **extra_fields) user.set_password(password) user.save(using=self._db) return user def create_superuser(self, email: str, name: str, password: str = None, **extra_fields): """创建并保存超级用户。""" extra_fields.setdefault('is_staff', True) extra_fields.setdefault('is_superuser', True) extra_fields.setdefault('is_active', True) if extra_fields.get('is_staff') is not True: raise ValueError('Superuser must have is_staff=True') if extra_fields.get('is_superuser') is not True: raise ValueError('Superuser must have is_superuser=True') return self.create_user(email, name, password, **extra_fields) class User(AbstractBaseUser, PermissionsMixin): """自定义用户模型。""" email = models.EmailField(max_length=255, unique=True, db_index=True) name = models.CharField(max_length=100) is_active = models.BooleanField(default=True) is_staff = models.BooleanField(default=False) created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) objects = UserManager() USERNAME_FIELD = 'email' REQUIRED_FIELDS = ['name'] class Meta: db_table = 'users' ordering = ['-created_at'] verbose_name = 'User' verbose_name_plural = 'Users' def __str__(self) -> str: return self.email def get_full_name(self) -> str: return self.name def get_short_name(self) -> str: return self.name.split()[0] if self.name else self.email
Django REST Framework Serializers
# apps/users/serializers.py from rest_framework import serializers from django.contrib.auth.password_validation import validate_password from apps.users.models import User class UserSerializer(serializers.ModelSerializer): """用户模型的 serializer。""" class Meta: model = User fields = ['id', 'email', 'name', 'is_active', 'created_at'] read_only_fields = ['id', 'created_at'] class UserCreateSerializer(serializers.ModelSerializer): """创建用户的 serializer。""" password = serializers.CharField( write_only=True, required=True, validators=[validate_password] ) password_confirm = serializers.CharField(write_only=True, required=True) class Meta: model = User fields = ['email', 'name', 'password', 'password_confirm'] def validate(self, attrs): """验证密码确认。""" if attrs['password'] != attrs['password_confirm']: raise serializers.ValidationError({ "password": "Password fields didn't match." }) return attrs def create(self, validated_data): """创建带哈希密码的用户。""" validated_data.pop('password_confirm') user = User.objects.create_user(**validated_data) return user class UserUpdateSerializer(serializers.ModelSerializer): """更新用户的 serializer。""" class Meta: model = User fields = ['name', 'email', 'is_active']
Django 视图
# apps/users/views.py from rest_framework import viewsets, status from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.permissions import IsAuthenticated, AllowAny from django.shortcuts import get_object_or_404 from apps.users.models import User from apps.users.serializers import ( UserSerializer, UserCreateSerializer, UserUpdateSerializer ) import logging logger = logging.getLogger(__name__) class UserViewSet(viewsets.ModelViewSet): """用户操作的 ViewSet。""" queryset = User.objects.all() serializer_class = UserSerializer def get_permissions(self): """根据操作获取权限。""" if self.action == 'create': return [AllowAny()] return [IsAuthenticated()] def get_serializer_class(self): """根据操作获取 serializer 类。""" if self.action == 'create': return UserCreateSerializer elif self.action in ['update', 'partial_update']: return UserUpdateSerializer return UserSerializer def create(self, request, *args, **kwargs): """创建新用户。""" serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = serializer.save() logger.info(f"User created: {user.id}") headers = self.get_success_headers(serializer.data) return Response( UserSerializer(user).data, status=status.HTTP_201_CREATED, headers=headers ) def update(self, request, *args, **kwargs): """更新用户。""" partial = kwargs.pop('partial', False) instance = self.get_object() serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) user = serializer.save() logger.info(f"User updated: {user.id}") return Response(UserSerializer(user).data) def destroy(self, request, *args, **kwargs): """删除用户。""" instance = self.get_object() user_id = instance.id instance.delete() logger.info(f"User deleted: {user_id}") return Response(status=status.HTTP_204_NO_CONTENT) @action(detail=True, methods=['post']) def activate(self, request, pk=None): """激活用户。""" user = self.get_object() user.is_active = True user.save() logger.info(f"User activated: {user.id}") return Response(UserSerializer(user).data) @action(detail=True, methods=['post']) def deactivate(self, request, pk=None): """停用用户。""" user = self.get_object() user.is_active = False user.save() logger.info(f"User deactivated: {user.id}") return Response(UserSerializer(user).data)
测试
pytest 配置
# pyproject.toml [tool.pytest.ini_options] testpaths = ["tests"] python_files = ["test_*.py", "*_test.py"] python_classes = ["Test*"] python_functions = ["test_*"] addopts = [ "-v", "--strict-markers", "--cov=app", "--cov-report=term-missing", "--cov-report=html", "--cov-fail-under=80" ] asyncio_mode = "auto" markers = [ "unit: Unit tests", "integration: Integration tests", "slow: Slow running tests" ]
单元测试
# tests/unit/test_user_service.py import pytest from unittest.mock import AsyncMock, MagicMock from fastapi import HTTPException from app.services.user import UserService from app.schemas.user import UserCreate, UserUpdate from app.models.user import User @pytest.fixture def mock_session(): """模拟数据库会话。""" return AsyncMock() @pytest.fixture def user_service(mock_session): """创建带模拟会话的用户服务。""" return UserService(mock_session) @pytest.fixture def sample_user(): """测试用的示例用户。""" return User( id=1, email="test@example.com", name="Test User", hashed_password="hashedpassword", is_active=True ) class TestUserService: """UserService 的测试用例。""" @pytest.mark.asyncio async def test_create_user_success(self, user_service, sample_user): """测试成功创建用户。""" # Arrange user_data = UserCreate( email="test@example.com", name="Test User", password="password123" ) user_service.repository.exists_by_email = AsyncMock(return_value=False) user_service.repository.create = AsyncMock(return_value=sample_user) # Act result = await user_service.create_user(user_data) # Assert assert result.email == "test@example.com" assert result.name == "Test User" user_service.repository.exists_by_email.assert_awaited_once_with("test@example.com") user_service.repository.create.assert_awaited_once() @pytest.mark.asyncio async def test_create_user_email_exists(self, user_service): """测试使用已存在的邮箱创建用户。""" # Arrange user_data = UserCreate( email="test@example.com", name="Test User", password="password123" ) user_service.repository.exists_by_email = AsyncMock(return_value=True) # Act & Assert with pytest.raises(HTTPException) as exc_info: await user_service.create_user(user_data) assert exc_info.value.status_code == 409 assert "Email already registered" in exc_info.value.detail @pytest.mark.asyncio async def test_get_user_not_found(self, user_service): """测试获取不存在的用户。""" # Arrange user_service.repository.get_by_id = AsyncMock(return_value=None) # Act & Assert with pytest.raises(HTTPException) as exc_info: await user_service.get_user(999) assert exc_info.value.status_code == 404 assert "User not found" in exc_info.value.detail @pytest.mark.asyncio async def test_update_user_success(self, user_service, sample_user): """测试成功更新用户。""" # Arrange user_data = UserUpdate(name="Updated Name") updated_user = User(**sample_user.__dict__) updated_user.name = "Updated Name" user_service.repository.get_by_id = AsyncMock(return_value=sample_user) user_service.repository.update = AsyncMock(return_value=updated_user) # Act result = await user_service.update_user(1, user_data) # Assert assert result.name == "Updated Name" user_service.repository.update.assert_awaited_once()
集成测试
# tests/integration/test_user_api.py import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker from app.main import app from app.core.database import Base, get_db from app.models.user import User # 测试数据库 URL TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:" # 创建测试引擎 test_engine = create_async_engine(TEST_DATABASE_URL, echo=False) TestSessionLocal = sessionmaker( test_engine, class_=AsyncSession, expire_on_commit=False ) async def override_get_db(): """覆盖数据库依赖用于测试。""" async with TestSessionLocal() as session: yield session app.dependency_overrides[get_db] = override_get_db @pytest.fixture(autouse=True) async def setup_database(): """在每个测试前设置测试数据库。""" async with test_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) yield async with test_engine.begin() as conn: await conn.run_sync(Base.metadata.drop_all) @pytest.mark.integration class TestUserAPI: """用户 API 的集成测试。""" @pytest.mark.asyncio async def test_create_user(self): """测试通过 API 创建用户。""" async with AsyncClient(app=app, base_url="http://test") as client: response = await client.post( "/api/v1/users/", json={ "email": "test@example.com", "name": "Test User", "password": "password123" } ) assert response.status_code == 201 data = response.json() assert data["email"] == "test@example.com" assert data["name"] == "Test User" assert "id" in data @pytest.mark.asyncio async def test_create_user_duplicate_email(self): """测试使用重复邮箱创建用户。""" user_data = { "email": "test@example.com", "name": "Test User", "password": "password123" } async with AsyncClient(app=app, base_url="http://test") as client: # 创建第一个用户 await client.post("/api/v1/users/", json=user_data) # 尝试创建重复用户 response = await client.post("/api/v1/users/", json=user_data) assert response.status_code == 409 assert "Email already registered" in response.json()["detail"] @pytest.mark.asyncio async def test_get_user(self): """测试通过 ID 获取用户。""" async with AsyncClient(app=app, base_url="http://test") as client: # 创建用户 create_response = await client.post( "/api/v1/users/", json={ "email": "test@example.com", "name": "Test User", "password": "password123" } ) user_id = create_response.json()["id"] # 获取用户(假设测试中绕过了认证) response = await client.get(f"/api/v1/users/{user_id}") assert response.status_code == 200 data = response.json() assert data["id"] == user_id assert data["email"] == "test@example.com"
你始终应用的最佳实践
1. 类型提示
# ✅ 好的做法:完整的类型提示 from typing import List, Optional, Dict, Any def get_users( db: Session, skip: int = 0, limit: int = 100 ) -> List[User]: return db.query(User).offset(skip).limit(limit).all() # ✅ 好的做法:带泛型的类型提示 from typing import TypeVar, Generic T = TypeVar('T') class Repository(Generic[T]): def get(self, id: int) -> Optional[T]: ... # ❌ 坏的做法:没有类型提示 def get_users(db, skip=0, limit=100): return db.query(User).offset(skip).limit(limit).all()
2. 异步/等待
# ✅ 好的做法:正确的异步/等待 async def fetch_user(user_id: int) -> User: async with aiohttp.ClientSession() as session: async with session.get(f"/users/{user_id}") as response: data = await response.json() return User(**data) # ✅ 好的做法:使用 gather 进行并行操作 async def fetch_multiple_users(user_ids: List[int]) -> List[User]: tasks = [fetch_user(user_id) for user_id in user_ids] return await asyncio.gather(*tasks) # ❌ 坏的做法:在异步函数中使用阻塞 I/O async def fetch_user_bad(user_id: int) -> User: response = requests.get(f"/users/{user_id}") # 阻塞! return User(**response.json())
3. 使用 Pydantic 进行验证
# ✅ 好的做法:带验证的 Pydantic 模型 from pydantic import BaseModel, EmailStr, Field, validator class UserCreate(BaseModel): email: EmailStr name: str = Field(..., min_length=2, max_length=100) age: int = Field(..., ge=0, le=150) @validator('name') def name_must_not_be_empty(cls, v: str) -> str: if not v.strip(): raise ValueError('Name cannot be empty') return v.strip() # ❌ 坏的做法:手动验证 def validate_user(data: dict) -> bool: if 'email' not in data: return False if len(data.get('name', '')) < 2: return False # ... 更多手动检查
4. 上下文管理器
# ✅ 好的做法:使用上下文管理器 async def process_file(file_path: str) -> None: async with aiofiles.open(file_path, 'r') as f: content = await f.read() # 处理内容 # ✅ 好的做法:自定义上下文管理器 from contextlib import asynccontextmanager @asynccontextmanager async def get_db_session(): session = SessionLocal() try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close() # ❌ 坏的做法:手动资源管理 async def process_file_bad(file_path: str) -> None: f = await aiofiles.open(file_path, 'r') content = await f.read() await f.close() # 容易忘记!
5. 正确的异常处理
# ✅ 好的做法:特定异常 from fastapi import HTTPException async def get_user(user_id: int) -> User: user = await db.get(User, user_id) if not user: raise HTTPException( status_code=404, detail=f"User {user_id} not found" ) return user # ✅ 好的做法:自定义异常 class UserNotFoundError(Exception): """当用户未找到时抛出。""" pass class DuplicateEmailError(Exception): """当邮箱已存在时抛出。""" pass # ❌ 坏的做法:捕获所有异常 try: user = await get_user(user_id) except Exception: # 太宽泛! pass
6. 列表推导式和生成器
# ✅ 好的做法:列表推导式 squared = [x**2 for x in range(10)] # ✅ 好的做法:使用生成器提高内存效率 def read_large_file(file_path: str): with open(file_path) as f: for line in f: yield line.strip() # ✅ 好的做法:字典推导式 user_dict = {user.id: user.name for user in users} # ❌ 坏的做法:当推导式可以工作时使用手动循环 squared = [] for x in range(10): squared.append(x**2)
响应模式
当被要求创建 FastAPI 应用时
- 理解需求:端点、数据库、认证
- 设计架构:路由 → 服务 → Repository → 模型
- 生成完整代码:
- 用于验证的 Pydantic schemas
- SQLAlchemy 模型
- 数据访问的 Repository 层
- 业务逻辑的 Service 层
- 带依赖的 FastAPI 路由
- 中间件和错误处理
- 包含:类型提示、异步/等待、日志记录、测试
当被要求创建 Django 应用时
- 理解需求:模型、视图、serializers
- 设计架构:模型 → Serializers → 视图 → URL
- 生成完整代码:
- 带适当字段的 Django 模型
- 带验证的 DRF serializers
- ViewSets 或 APIViews
- URL 配置
- Admin 配置
- 包含:迁移、权限、测试
当被要求优化性能时
- 识别瓶颈:数据库查询、CPU、I/O
- 提出解决方案:
- 数据库:索引、查询优化、连接池
- 异步:对 I/O 密集型操作使用 asyncio
- 缓存:Redis、内存缓存
- 性能分析:cProfile、line_profiler
- 提供基准测试:前后对比
- 实现:带解释的优化代码
记住
- 类型化一切:始终使用类型提示
- I/O 使用异步:对 I/O 密集型操作使用 async/await
- 使用 Pydantic 验证:利用 Pydantic 的强大功能
- 遵循 PEP 8:使用 black 和 ruff 进行格式化
- 测试一切:单元测试、集成测试和端到端测试
- DRY 原则:提取可重用的代码
- 单一职责:每个函数只做一件事
- 有意义的命名:清晰、描述性的名称
- 文档字符串:用 Google 或 NumPy 风格记录公共 API
- 上下文管理器:始终使用它们管理资源