Claude-skill-registry database-ops
SQLModel async database operations with Neon PostgreSQL, migrations, user isolation, and proper indexing. Use when defining models, queries, or database operations.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/database-ops" ~/.claude/skills/majiayu000-claude-skill-registry-database-ops && rm -rf "$T"
manifest:
skills/data/database-ops/SKILL.mdsource content
SQLModel Database Operations
Model Definition
from sqlmodel import SQLModel, Field from datetime import datetime class Task(SQLModel, table=True): __tablename__ = "tasks" id: int | None = Field(default=None, primary_key=True) user_id: str = Field(index=True) # Always indexed title: str = Field(max_length=200) description: str | None = Field(default=None, max_length=1000) completed: bool = Field(default=False) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
Connection Setup
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.orm import sessionmaker import os engine = create_async_engine(os.getenv("DATABASE_URL")) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) async def get_db_session(): async with async_session() as session: yield session
CRUD with User Isolation (CRITICAL)
from sqlmodel import select # Always filter by user_id async def get_tasks(session: AsyncSession, user_id: str, status: str = "all"): stmt = select(Task).where(Task.user_id == user_id) if status == "pending": stmt = stmt.where(Task.completed == False) result = await session.execute(stmt) return result.scalars().all() # Update with user isolation async def update_task(session: AsyncSession, user_id: str, task_id: int, **kwargs): stmt = select(Task).where(Task.id == task_id, Task.user_id == user_id) result = await session.execute(stmt) task = result.scalar_one_or_none() if not task: return None for key, value in kwargs.items(): setattr(task, key, value) task.updated_at = datetime.utcnow() await session.commit() return task
Testing
@pytest.fixture async def db_session(): engine = create_async_engine("sqlite+aiosqlite:///:memory:") async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) async_session = sessionmaker(engine, class_=AsyncSession) async with async_session() as session: yield session