Claude-skill-registry DB Schema Surgeon

Gestión del esquema PostgreSQL, Auto-Healing y arquitectura RAG híbrida.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/db-evolution" ~/.claude/skills/majiayu000-claude-skill-registry-db-schema-surgeon && rm -rf "$T"
manifest: skills/data/db-evolution/SKILL.md
source content

DB Schema Surgeon - Platform AI Solutions

1. Filosofía Auto-Healing (NO Alembic)

El Protocolo

Platform AI Solutions usa Self-Healing: el esquema se auto-repara en cada startup.

# orchestrator_service/main.py
from app.models import Base
from app.db import engine

@app.on_event("startup")
async def startup_event():
    async with engine.begin() as conn:
        # Crea SOLO las tablas que no existen
        await conn.run_sync(Base.metadata.create_all)
    
    # Ejecutar migraciones de esquema
    await run_migration_steps()

Ventajas

  • Sin historial de migraciones complejo
  • Desarrollo rápido (cambios inmediatos)
  • Funciona igual en local, staging y producción
  • Idempotente (
    create_all
    solo crea lo que falta)

2. Crear Nuevas Tablas

Paso 1: Definir Modelo SQLAlchemy

# app/models/business_asset.py
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer, JSONB, ForeignKey
from datetime import datetime
from .base import Base

class BusinessAsset(Base):
    __tablename__ = "business_assets"
    
    id: Mapped[str] = mapped_column(String, primary_key=True)  # UUID
    tenant_id: Mapped[str] = mapped_column(String(50), nullable=False)
    asset_type: Mapped[str] = mapped_column(String(50))  # branding, scripts, roi
    content: Mapped[dict] = mapped_column(JSONB, nullable=False)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    is_active: Mapped[bool] = mapped_column(default=True)

Paso 2: Importar en main.py

# main.py
from app.models import (
    Base,
    Tenant,
    Agent,
    BusinessAsset,  # ← Agregar aquí
    Credential
)

# Esto asegura que SQLAlchemy conozca todos los modelos

Paso 3: Reiniciar

Al reiniciar el servicio,

create_all
ejecutará y la tabla se creará automáticamente.

3. Modificar Tablas Existentes

El Problema

create_all
NO altera tablas existentes.

Para agregar columnas, cambiar tipos, o renombrar: Migration Script

Solución: Migration Steps

# orchestrator_service/scripts/migration_steps.py
from sqlalchemy import text

async def run_migration_steps(engine):
    """Migraciones idempotentes"""
    
    async with engine.begin() as conn:
        # Migración 1: Agregar columna
        await conn.execute(text("""
            ALTER TABLE agents 
            ADD COLUMN IF NOT EXISTS template_type VARCHAR(50) DEFAULT 'custom'
        """))
        
        # Migración 2: Crear índice
        await conn.execute(text("""
            CREATE INDEX IF NOT EXISTS idx_agents_tenant_active 
            ON agents(tenant_id, is_active)
        """))
        
        # Migración 3: Agregar columna JSONB
        await conn.execute(text("""
            ALTER TABLE tenants 
            ADD COLUMN IF NOT EXISTS tool_config JSONB DEFAULT '{}'::jsonb
        """))

Invocar en Startup

# main.py
from scripts.migration_steps import run_migration_steps

@app.on_event("startup")
async def startup_event():
    # 1. Crear tablas nuevas
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    # 2. Ejecutar migraciones
    await run_migration_steps(engine)

4. Arquitectura RAG Híbrida

Separación de Responsabilidades

AlmacenamientoResponsabilidadTecnología
PostgreSQL LocalMetadata (filename, collection, file_path)
rag_documents
table
Supabase RemoteVectores (embeddings)pgvector extension

Modelo PostgreSQL

# app/models/rag_document.py
class RAGDocument(Base):
    __tablename__ = "rag_documents"
    
    id: Mapped[str] = mapped_column(String, primary_key=True)  # UUID
    tenant_id: Mapped[int] = mapped_column(
        Integer, 
        ForeignKey("tenants.id"),
        index=True
    )
    filename: Mapped[str] = mapped_column(String(255))
    collection: Mapped[str] = mapped_column(String(100))  # General, ADN Personal, Shadow RAG
    file_type: Mapped[str] = mapped_column(String(50))  # pdf, txt, docx
    file_path: Mapped[str] = mapped_column(String(500), nullable=True)
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)

Flujo de Creación

# 1. Guardar metadata en PostgreSQL
doc = RAGDocument(
    id=str(uuid.uuid4()),
    tenant_id=tenant_id,
    filename=filename,
    collection="General",
    file_type=file_extension,
    file_path=storage_path
)
session.add(doc)
await session.flush()  # Obtener ID sin commit final

# 2. Procesar documento
chunks = process_document(file_content)

# 3. Vectorizar y guardar en Supabase
from app.services.rag.vector_store import SupabaseVectorStore

vector_store = SupabaseVectorStore(tenant_id)
await vector_store.add_documents(
    chunks,
    metadata={
        "tenant_id": tenant_id,
        "source_id": doc.id,
        "collection": collection
    }
)

# 4. Commit
await session.commit()

Flujo de Eliminación (Dual Delete Protocol)

CRÍTICO: Mantener coherencia entre PostgreSQL y Supabase

# Paso 1: Surgical Strike (Remoto) - Eliminar vectores de Supabase
await supabase.from_("documents").delete().eq(
    "metadata->>source_id", doc_id
).execute()

# Paso 2: Metadata Cleanup (Local) - Eliminar de PostgreSQL
stmt = delete(RAGDocument).where(
    RAGDocument.id == doc_id,
    RAGDocument.tenant_id == tenant_id
)
await session.execute(stmt)

# Paso 3: Physical Sweep (Disco) - Best effort
try:
    os.remove(file_path)
except FileNotFoundError:
    logger.warning(f"File {file_path} not found, skipping physical delete")

await session.commit()

5. Tablas Core (Schema Reference)

credentials (The Vault)

CREATE TABLE IF NOT EXISTS credentials (
    id_uuid UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    tenant_id INTEGER REFERENCES tenants(id),
    name TEXT NOT NULL,
    value TEXT NOT NULL,  -- Encrypted AES-256
    category TEXT DEFAULT 'general',  -- openai, google, smtp
    scope TEXT DEFAULT 'global',  -- global o tenant
    created_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(name, tenant_id)  -- Unicidad multi-tenant
);

agents (AI Configuration)

CREATE TABLE IF NOT EXISTS agents (
    id SERIAL PRIMARY KEY,
    tenant_id INTEGER REFERENCES tenants(id),
    name TEXT NOT NULL,
    role TEXT DEFAULT 'sales',
    model_provider TEXT DEFAULT 'openai',
    model_version TEXT DEFAULT 'gpt-5-mini',
    temperature FLOAT DEFAULT 0.7,
    system_prompt_template TEXT NOT NULL,
    enabled_tools JSONB DEFAULT '[]',
    channels JSONB DEFAULT '["whatsapp", "instagram", "facebook", "web"]',
    config JSONB DEFAULT '{}',
    template_type VARCHAR(50) DEFAULT 'custom',
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

chat_conversations (Omnichannel)

CREATE TABLE IF NOT EXISTS chat_conversations (
    id UUID PRIMARY KEY,
    tenant_id INTEGER REFERENCES tenants(id),
    channel VARCHAR(32) NOT NULL,  -- whatsapp, instagram, facebook, web
    channel_source VARCHAR(32) DEFAULT 'whatsapp',
    display_name VARCHAR(255),
    meta JSONB DEFAULT '{}',
    last_message_preview TEXT,
    last_message_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

tools (Brain Extensions)

CREATE TABLE IF NOT EXISTS tools (
    id SERIAL PRIMARY KEY,
    tenant_id INTEGER REFERENCES tenants(id),  -- NULL para Global
    name VARCHAR(255) NOT NULL,
    type VARCHAR(32) NOT NULL,  -- http, internal
    description TEXT,
    prompt_injection TEXT,
    response_guide TEXT,
    config JSONB DEFAULT '{}',
    service_url TEXT,
    created_at TIMESTAMPTZ DEFAULT NOW(),
    UNIQUE(tenant_id, name)
);

6. Tipos de Datos Comunes

from sqlalchemy import String, Integer, Boolean, DateTime, Text, JSONB, ARRAY

class ExampleModel(Base):
    __tablename__ = "example"
    
    # IDs
    id_uuid: Mapped[str] = mapped_column(String, primary_key=True)
    id_int: Mapped[int] = mapped_column(Integer, primary_key=True)
    
    # Texto
    short_text: Mapped[str] = mapped_column(String(255))
    long_text: Mapped[str] = mapped_column(Text)
    
    # JSON
    metadata: Mapped[dict] = mapped_column(JSONB, default={})
    
    # Arrays
    tags: Mapped[list] = mapped_column(JSONB, default=[])
    
    # Booleanos
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

7. Identity Resolution (UUID vs INTEGER)

El Problema Crítico

  • Tenants:
    id
    es INTEGER
  • Users:
    id
    es UUID,
    tenant_id
    es INTEGER
  • Agents, Tools, Credentials:
    tenant_id
    es INTEGER (foreign key)

La Solución

# NUNCA asumir que current_user.tenant_id es UUID
# SIEMPRE resolver desde tabla users

user_row = await db.pool.fetchrow(
    "SELECT tenant_id FROM users WHERE id = $1",
    current_user.id  # UUID
)
real_tenant_int = user_row['tenant_id']  # INTEGER

# Usar en queries
stmt = delete(Agent).where(Agent.tenant_id == real_tenant_int)

8. Índices y Performance

Crear Índices

from sqlalchemy import Index

class Message(Base):
    __tablename__ = "messages"
    
    # ... columnas ...
    
    __table_args__ = (
        Index('idx_messages_conversation', 'conversation_id', 'created_at'),
        Index('idx_messages_tenant', 'tenant_id'),
    )

Eager Loading (Evitar N+1)

from sqlalchemy.orm import selectinload

# ❌ MAL - N+1 queries
conversations = await session.execute(select(Conversation))
for conv in conversations:
    messages = conv.messages  # Query adicional

# ✅ BIEN - Single query
stmt = select(Conversation).options(
    selectinload(Conversation.messages)
)
conversations = await session.execute(stmt)

9. Colecciones RAG

Tipos de Colecciones

  • General: Manuales técnicos, políticas (PDF/DOCX)
  • ADN Personal: Historiales de conversación (.txt) para clonación de estilo
  • Shadow RAG: Chats vectorizados automáticamente (memoria largo plazo)

WhatsApp Parser (Identity Engine)

# Al subir .txt a "ADN Personal", se activa parser
if collection == "ADN Personal" and file_ext == ".txt":
    from app.services.parsers import WhatsAppParser
    
    parser = WhatsAppParser(hero_name=hero_name)
    parsed = parser.parse(file_content)
    
    # Genera pares context/response para stored patterns

10. Reset Industrial (Development Only)

-- Limpiar plataforma completa
TRUNCATE TABLE 
    users, tenants, credentials, agents, tools, 
    business_assets, chat_conversations, chat_messages,
    rag_documents
RESTART IDENTITY CASCADE;

11. Checklist Pre-Deploy

  • ¿El modelo está importado en
    main.py
    ?
  • ¿Las columnas tienen tipos explícitos (
    Mapped[type]
    )?
  • ¿Las foreign keys tienen índices?
  • ¿
    tenant_id
    está indexado?
  • ¿Las migraciones usan
    IF NOT EXISTS
    ?
  • ¿En RAG, se sincroniza PostgreSQL + Supabase?
  • ¿Los scripts son idempotentes?
  • ¿Se resuelve UUID→INT para tenant_id?