Claude-skill-registry async-sqlalchemy-patterns
Async SQLAlchemy 2.0+ database patterns for FastAPI including session management, connection pooling, Alembic migrations, relationship loading strategies, and query optimization. Use when implementing database models, configuring async sessions, setting up migrations, optimizing queries, managing relationships, or when user mentions SQLAlchemy, async database, ORM, Alembic, database performance, or connection pooling.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/async-sqlalchemy-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-async-sqlalchemy-patterns && rm -rf "$T"
skills/data/async-sqlalchemy-patterns/SKILL.mdAsync SQLAlchemy Patterns
Purpose: Implement production-ready async SQLAlchemy 2.0+ patterns in FastAPI with proper session management, migrations, and performance optimization.
Activation Triggers:
- Database model implementation
- Async session configuration
- Alembic migration setup
- Query performance optimization
- Relationship loading issues
- Connection pool configuration
- Transaction management
- Database schema migrations
Key Resources:
- Initialize Alembic with async supportscripts/setup-alembic.sh
- Create migrations from model changesscripts/generate-migration.sh
- Base model with common patternstemplates/base_model.py
- Async session factory and dependencytemplates/session_manager.py
- Alembic configuration for asynctemplates/alembic.ini
- Complete model with relationshipsexamples/user_model.py
- Session usage patternsexamples/async_context_examples.py
Core Patterns
1. Async Engine and Session Setup
Database Configuration:
# app/core/database.py from sqlalchemy.ext.asyncio import ( AsyncSession, create_async_engine, async_sessionmaker ) from sqlalchemy.orm import declarative_base from app.core.config import settings # Create async engine engine = create_async_engine( settings.DATABASE_URL, echo=settings.DEBUG, pool_pre_ping=True, # Verify connections before using pool_size=5, # Base number of connections max_overflow=10, # Additional connections when pool is full pool_recycle=3600, # Recycle connections after 1 hour pool_timeout=30, # Wait 30s for available connection ) # Create async session factory AsyncSessionLocal = async_sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, # Keep objects usable after commit autoflush=False, # Manual flush control autocommit=False, # Explicit commits only ) Base = declarative_base()
Dependency Injection Pattern:
# app/core/deps.py from typing import AsyncGenerator from sqlalchemy.ext.asyncio import AsyncSession from app.core.database import AsyncSessionLocal async def get_db() -> AsyncGenerator[AsyncSession, None]: """ FastAPI dependency for database sessions. Automatically handles cleanup. """ async with AsyncSessionLocal() as session: try: yield session await session.commit() except Exception: await session.rollback() raise finally: await session.close()
2. Base Model Pattern
Use template from
templates/base_model.py with:
- UUID primary keys by default
- Automatic created_at/updated_at timestamps
- Soft delete support
- Common query methods
Essential Mixins:
from datetime import datetime from sqlalchemy import DateTime, Boolean, func from sqlalchemy.orm import Mapped, mapped_column class TimestampMixin: """Auto-managed timestamps""" created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), nullable=False ) updated_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False ) class SoftDeleteMixin: """Soft delete support""" is_deleted: Mapped[bool] = mapped_column( Boolean, default=False, nullable=False ) deleted_at: Mapped[datetime | None] = mapped_column( DateTime(timezone=True), nullable=True )
3. Relationship Loading Strategies
Lazy Loading (Default - N+1 Problem Risk):
# Bad: Causes N+1 queries users = await session.execute(select(User)) for user in users.scalars(): print(user.posts) # Separate query per user!
Eager Loading (Recommended):
from sqlalchemy.orm import selectinload, joinedload # selectinload: Separate query, good for one-to-many stmt = select(User).options(selectinload(User.posts)) result = await session.execute(stmt) users = result.scalars().all() # joinedload: SQL JOIN, good for many-to-one stmt = select(Post).options(joinedload(Post.author)) result = await session.execute(stmt) posts = result.scalars().unique().all() # unique() required with joins! # subqueryload: Subquery loading stmt = select(User).options(subqueryload(User.posts))
Relationship Configuration:
from sqlalchemy.orm import relationship class User(Base): __tablename__ = "users" # One-to-many with cascade posts: Mapped[list["Post"]] = relationship( back_populates="author", cascade="all, delete-orphan", lazy="selectin" # Default eager loading ) # Many-to-many roles: Mapped[list["Role"]] = relationship( secondary="user_roles", back_populates="users", lazy="selectin" )
4. Query Patterns
Basic CRUD:
from sqlalchemy import select, update, delete from sqlalchemy.ext.asyncio import AsyncSession # Create async def create_user(session: AsyncSession, user_data: dict): user = User(**user_data) session.add(user) await session.commit() await session.refresh(user) return user # Read with filters async def get_users(session: AsyncSession, skip: int = 0, limit: int = 100): stmt = ( select(User) .where(User.is_deleted == False) .offset(skip) .limit(limit) .order_by(User.created_at.desc()) ) result = await session.execute(stmt) return result.scalars().all() # Update async def update_user(session: AsyncSession, user_id: int, updates: dict): stmt = ( update(User) .where(User.id == user_id) .values(**updates) .returning(User) ) result = await session.execute(stmt) await session.commit() return result.scalar_one() # Delete async def delete_user(session: AsyncSession, user_id: int): stmt = delete(User).where(User.id == user_id) await session.execute(stmt) await session.commit()
Complex Queries:
from sqlalchemy import func, and_, or_ # Aggregations stmt = ( select(User.id, func.count(Post.id)) .join(Post) .group_by(User.id) .having(func.count(Post.id) > 5) ) # Subqueries subq = ( select(func.avg(Post.views)) .where(Post.user_id == User.id) .scalar_subquery() ) stmt = select(User).where(User.id.in_( select(Post.user_id).where(Post.views > subq) )) # Window functions from sqlalchemy import over stmt = select( Post, func.row_number().over( partition_by=Post.user_id, order_by=Post.created_at.desc() ).label('row_num') ).where(over.row_num <= 10)
5. Transaction Management
Nested Transactions:
async def transfer_funds( session: AsyncSession, from_account: int, to_account: int, amount: float ): async with session.begin_nested(): # Savepoint # Debit stmt = ( update(Account) .where(Account.id == from_account) .where(Account.balance >= amount) .values(balance=Account.balance - amount) ) result = await session.execute(stmt) if result.rowcount == 0: raise ValueError("Insufficient funds") # Credit stmt = ( update(Account) .where(Account.id == to_account) .values(balance=Account.balance + amount) ) await session.execute(stmt) await session.commit()
Manual Transaction Control:
async def batch_operation(session: AsyncSession, items: list): try: for item in items: session.add(item) if len(session.new) >= 100: await session.flush() # Flush but don't commit await session.commit() except Exception as e: await session.rollback() raise
6. Alembic Migration Setup
# Initialize Alembic with async template ./scripts/setup-alembic.sh # Creates: # - alembic/ directory # - alembic.ini configured for async # - env.py with async support
Generate Migrations:
# Auto-generate from model changes ./scripts/generate-migration.sh "add user table" # Review generated migration in alembic/versions/ # Always review auto-generated migrations!
Migration Best Practices:
- Always review auto-generated migrations
- Use batch operations for large tables
- Add indexes in separate migrations
- Include both upgrade and downgrade
- Test migrations on staging first
Manual Migration Template:
# alembic/versions/xxx_add_index.py from alembic import op import sqlalchemy as sa def upgrade() -> None: # Use batch for SQLite compatibility with op.batch_alter_table('users') as batch_op: batch_op.create_index( 'ix_users_email', ['email'], unique=True ) def downgrade() -> None: with op.batch_alter_table('users') as batch_op: batch_op.drop_index('ix_users_email')
7. Connection Pool Configuration
Production Settings:
# For web applications engine = create_async_engine( DATABASE_URL, pool_size=20, # Concurrent requests max_overflow=40, # Burst capacity pool_recycle=3600, # 1 hour pool_pre_ping=True, # Verify connections pool_timeout=30, # Wait time echo_pool=False, # Pool debug logging ) # For background tasks engine = create_async_engine( DATABASE_URL, pool_size=5, # Fewer connections max_overflow=10, pool_recycle=7200, # 2 hours )
Connection Health Check:
from sqlalchemy import text async def check_database_health(session: AsyncSession) -> bool: try: await session.execute(text("SELECT 1")) return True except Exception: return False
8. Performance Optimization
Bulk Operations:
# Bulk insert async def bulk_create_users(session: AsyncSession, users: list[dict]): stmt = insert(User).values(users) await session.execute(stmt) await session.commit() # Bulk update async def bulk_update_status(session: AsyncSession, user_ids: list[int]): stmt = ( update(User) .where(User.id.in_(user_ids)) .values(status="active") ) await session.execute(stmt) await session.commit()
Query Result Streaming:
async def stream_large_dataset(session: AsyncSession): stmt = select(User).execution_options(yield_per=100) result = await session.stream(stmt) async for partition in result.partitions(100): users = partition.scalars().all() # Process batch yield users
Index Optimization:
from sqlalchemy import Index class User(Base): __tablename__ = "users" email: Mapped[str] = mapped_column(unique=True, index=True) __table_args__ = ( Index('ix_user_status_created', 'status', 'created_at'), Index('ix_user_email_active', 'email', postgresql_where=~is_deleted), )
Troubleshooting
Common Issues
"Object is not bound to session":
# Bad: Object expires after commit user = await create_user(session, data) print(user.email) # Error if expire_on_commit=True # Fix: Refresh or configure session await session.refresh(user) # Or: AsyncSessionLocal(expire_on_commit=False)
"N+1 Query Problem":
# Enable SQL logging to detect engine = create_async_engine(url, echo=True) # Fix with eager loading stmt = select(User).options(selectinload(User.posts))
"DetachedInstanceError":
# Bad: Accessing relationship outside session async with AsyncSessionLocal() as session: user = await session.get(User, user_id) print(user.posts) # Error! # Fix: Load within session or use eager loading stmt = select(User).options(selectinload(User.posts))
"Connection Pool Exhausted":
# Increase pool size or add timeout engine = create_async_engine( url, pool_size=20, max_overflow=40, pool_timeout=30 )
Resources
Scripts:
scripts/ directory contains:
- Initialize Alembic with async configurationsetup-alembic.sh
- Create migrations from model changesgenerate-migration.sh
- Test database connectivitytest-connection.sh
- Analyze and suggest query optimizationsoptimize-queries.sh
Templates:
templates/ directory includes:
- Base model with UUID, timestamps, soft deletebase_model.py
- Complete session factory and dependenciessession_manager.py
- Production-ready Alembic configurationalembic.ini
- Alembic async environment templateenv.py
Examples:
examples/ directory provides:
- Full model with relationships and indexesuser_model.py
- Session patterns and best practicesasync_context_examples.py
- Common query optimization examplesquery_patterns.py
- Manual migration patternsmigration_examples.py
SQLAlchemy Version: 2.0+ Database Support: PostgreSQL, MySQL, SQLite (async drivers) Async Drivers: asyncpg (PostgreSQL), aiomysql (MySQL), aiosqlite (SQLite) Version: 1.0.0