Claude-skill-registry-data Magic Onboarding Orchestrator
Especialista en el proceso 'Hacer Magia': orquestación de agentes IA, SSE streaming y generación de assets de negocio.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry-data
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/magic-onboarding-orchestrator-adriangmrraa-multiagents-platform" ~/.claude/skills/majiayu000-claude-skill-registry-data-magic-onboarding-orchestrator && rm -rf "$T"
manifest:
data/magic-onboarding-orchestrator-adriangmrraa-multiagents-platform/SKILL.mdsource content
Magic Onboarding Orchestrator - Platform AI Solutions
1. Concepto: "Hacer Magia" (The Ignition)
Filosofía
"Hacer Magia" es el proceso de Onboarding Automatizado que transforma una tienda conectada en una operación lista para vender en minutos.
Lo que hace:
- Analiza catálogo de Tienda Nube
- Extrae Brand DNA (misión, visión, tono)
- Vectoriza conocimiento para RAG
- Genera scripts de venta (AIDA, PAS)
- Calcula proyecciones ROI
- Crea visuales publicitarios (Google Imagen 3)
Arquitectura
Frontend (MagicOnboarding.tsx) ↓ POST /admin/onboarding/magic → Background Task (202 Accepted) ↓ SSE Stream (/engine/stream/v2/{tenant_id}) ↓ Orchestrator → 7 Agent Pipeline ↓ Business Assets (DB) + RAG Vectors (Supabase)
2. Los 7 Agentes Especializados
Protocolo Omega (Sequential Pipeline)
# agent_service/app/core/magic_orchestrator.py MAGIC_PIPELINE = [ { "name": "Catalog Analyzer", "role": "Analizar productos y categorías", "output": "product_catalog.json" }, { "name": "Brand DNA Extractor", "role": "Identificar identidad de marca", "output": "branding.json" }, { "name": "Knowledge Vectorizer", "role": "Crear embeddings para RAG", "output": "rag_vectors (Supabase)" }, { "name": "Sales Script Generator", "role": "Crear copys de venta (AIDA, PAS)", "output": "scripts.json" }, { "name": "ROI Projector", "role": "Proyecciones financieras", "output": "roi.json" }, { "name": "Visual Artist", "role": "Generar imágenes publicitarias", "output": "visuals.json" }, { "name": "Compliance Guardian", "role": "Validar coherencia y legalidad", "output": "validation_report.json" } ]
3. Frontend: Iniciar Magia
MagicOnboarding Component
const MagicOnboarding: React.FC = () => { const [loading, setLoading] = useState(false); const [logs, setLogs] = useState<string[]>([]); const [progress, setProgress] = useState(0); const handleIgnite = async () => { setLoading(true); // Iniciar proceso (background task) const response = await useApi({ method: 'POST', url: '/admin/onboarding/magic', data: { mode: 'full', // 'full', 'catalog_only', 'branding_only' options: { generate_visuals: true, vectorize_catalog: true } } }); if (response.status === 'started') { // Conectar a SSE stream listenToStream(response.tenant_id); } }; const listenToStream = (tenantId: number) => { const eventSource = new EventSource( `/api/admin/engine/stream/v2/${tenantId}` ); eventSource.addEventListener('log', (e) => { const data = JSON.parse(e.data); setLogs(prev => [...prev, data.message]); }); eventSource.addEventListener('progress', (e) => { const data = JSON.parse(e.data); setProgress(data.percentage); }); eventSource.addEventListener('asset_generated', (e) => { const data = JSON.parse(e.data); console.log('Asset generated:', data.type, data.id); }); eventSource.addEventListener('done', (e) => { eventSource.close(); setLoading(false); // Redirigir a Business Forge navigate('/forge'); }); eventSource.addEventListener('error', (e) => { console.error('SSE Error:', e); eventSource.close(); setLoading(false); }); }; return ( <div className="magic-container"> <button onClick={handleIgnite} disabled={loading} className="ignite-button" > {loading ? 'Creando Magia...' : 'Hacer Magia ✨'} </button> {/* Progress bar */} <div className="progress-bar"> <div style={{ width: `${progress}%` }} /> </div> {/* Live logs */} <div className="log-console"> {logs.map((log, i) => ( <div key={i}>{log}</div> ))} </div> </div> ); };
4. Backend: Orchestrator
Endpoint de Inicio
# orchestrator_service/app/api/v1/endpoints/magic.py @router.post("/onboarding/magic", status_code=202) async def start_magic_onboarding( payload: MagicOnboardingRequest, current_user = Depends(verify_admin_token), background_tasks: BackgroundTasks, session: AsyncSession = Depends(get_session) ): """ Inicia el proceso de Hacer Magia en background """ tenant_id = await resolve_tenant(current_user.id) # Validar credenciales necesarias await validate_prerequisites(tenant_id, session) # Crear task en background background_tasks.add_task( execute_magic_pipeline, tenant_id=tenant_id, mode=payload.mode, options=payload.options ) return { "status": "started", "tenant_id": tenant_id, "stream_url": f"/api/admin/engine/stream/v2/{tenant_id}" }
Validación de Prerequisites
async def validate_prerequisites( tenant_id: int, session: AsyncSession ): """ Verifica que estén configuradas las credenciales necesarias """ # 1. Tienda Nube conectada tn_token = await get_tenant_credential( tenant_id=tenant_id, category="tiendanube" ) if not tn_token: raise HTTPException( status_code=400, detail="Tienda Nube not connected. Connect your store first." ) # 2. OpenAI API Key openai_key = await get_tenant_credential( tenant_id=tenant_id, category="openai" ) if not openai_key: raise HTTPException( status_code=400, detail="OpenAI API key not configured" ) # 3. Google API Key (para imágenes) google_key = await get_tenant_credential( tenant_id=tenant_id, category="google" ) if not google_key: raise HTTPException( status_code=400, detail="Google API key not configured (required for visuals)" )
5. SSE Stream (Server-Sent Events)
Endpoint SSE
# orchestrator_service/app/api/v1/endpoints/sse.py from sse_starlette.sse import EventSourceResponse @router.get("/engine/stream/v2/{tenant_id}") async def stream_magic_progress( tenant_id: int, current_user = Depends(verify_admin_token) ): """ Stream de eventos en tiempo real del proceso de Magia """ async def event_generator(): # Suscribirse a Redis channel pubsub = redis_client.pubsub() channel = f"magic_progress:{tenant_id}" await pubsub.subscribe(channel) try: async for message in pubsub.listen(): if message['type'] == 'message': data = json.loads(message['data']) # Enviar evento al frontend yield { "event": data.get('event', 'log'), "data": json.dumps(data) } # Si terminó, cerrar stream if data.get('event') == 'done': break finally: await pubsub.unsubscribe(channel) return EventSourceResponse(event_generator())
Redis Broadcast (desde Pipeline)
async def broadcast_progress( tenant_id: int, event: str, data: dict ): """ Envía evento a todos los clientes suscritos """ channel = f"magic_progress:{tenant_id}" payload = { "event": event, **data } await redis_client.publish( channel, json.dumps(payload) )
6. Ejecución del Pipeline
execute_magic_pipeline (Background Task)
async def execute_magic_pipeline( tenant_id: int, mode: str, options: dict ): """ Ejecuta los 7 agentes secuencialmente """ try: # 1. Catalog Analyzer await broadcast_progress( tenant_id, "log", {"message": "📦 Analizando catálogo de productos..."} ) catalog = await analyze_catalog(tenant_id) await broadcast_progress( tenant_id, "progress", {"percentage": 14, "step": "catalog"} ) # 2. Brand DNA Extractor await broadcast_progress( tenant_id, "log", {"message": "🧬 Extrayendo ADN de marca..."} ) branding = await extract_brand_dna(tenant_id, catalog) await save_asset( tenant_id=tenant_id, type="branding", content=branding ) await broadcast_progress( tenant_id, "asset_generated", {"type": "branding", "id": branding_asset.id} ) await broadcast_progress( tenant_id, "progress", {"percentage": 28, "step": "branding"} ) # 3. Knowledge Vectorizer if options.get('vectorize_catalog', True): await broadcast_progress( tenant_id, "log", {"message": "🧠 Vectorizando conocimiento para RAG..."} ) await vectorize_catalog(tenant_id, catalog) await broadcast_progress( tenant_id, "progress", {"percentage": 42, "step": "vectorization"} ) # 4. Sales Script Generator await broadcast_progress( tenant_id, "log", {"message": "📝 Generando scripts de venta..."} ) scripts = await generate_sales_scripts( tenant_id, catalog, branding ) await save_asset( tenant_id=tenant_id, type="scripts", content=scripts ) await broadcast_progress( tenant_id, "progress", {"percentage": 56, "step": "scripts"} ) # 5. ROI Projector await broadcast_progress( tenant_id, "log", {"message": "💰 Calculando proyecciones ROI..."} ) roi = await calculate_roi(tenant_id, catalog) await save_asset( tenant_id=tenant_id, type="roi", content=roi ) await broadcast_progress( tenant_id, "progress", {"percentage": 70, "step": "roi"} ) # 6. Visual Artist if options.get('generate_visuals', True): await broadcast_progress( tenant_id, "log", {"message": "🎨 Generando visuales publicitarios..."} ) visuals = await generate_visuals( tenant_id, catalog, branding ) await save_asset( tenant_id=tenant_id, type="visuals", content=visuals ) await broadcast_progress( tenant_id, "progress", {"percentage": 85, "step": "visuals"} ) # 7. Compliance Guardian await broadcast_progress( tenant_id, "log", {"message": "✅ Validando coherencia..."} ) validation = await validate_compliance(tenant_id) await broadcast_progress( tenant_id, "progress", {"percentage": 100, "step": "validation"} ) # Finalizar await broadcast_progress( tenant_id, "done", {"message": "🎉 Magia completada!"} ) except Exception as e: await broadcast_progress( tenant_id, "error", {"message": f"Error: {str(e)}"} ) raise
7. Agentes Específicos
Catalog Analyzer
async def analyze_catalog(tenant_id: int) -> dict: """ Descarga y analiza catálogo de Tienda Nube """ # Obtener credenciales tn_token = await get_tenant_credential( tenant_id=tenant_id, category="tiendanube" ) tn_user_id = await get_tenant_credential( tenant_id=tenant_id, category="tiendanube", name="user_id" ) # Llamar API de Tienda Nube response = await httpx.get( f"https://api.tiendanube.com/v1/{tn_user_id}/products", headers={"Authorization": f"Bearer {tn_token}"}, params={"per_page": 200} ) products = response.json() # Análisis estructural categories = extract_categories(products) price_range = calculate_price_range(products) top_products = identify_top_products(products) return { "products": products, "categories": categories, "price_range": price_range, "top_products": top_products, "total_products": len(products) }
Brand DNA Extractor
async def extract_brand_dna( tenant_id: int, catalog: dict ) -> dict: """ Usa IA para analizar la identidad de marca """ # Llamar a agent_service response = await httpx.post( "http://agent_service:8004/analyze-brand", json={ "tenant_id": tenant_id, "catalog": catalog, "prompt": """ Analiza este catálogo de productos y extrae: 1. MISIÓN: ¿Qué problema resuelve esta tienda? 2. VISIÓN: ¿Qué aspiración tiene? 3. VOZ: ¿Cómo habla la marca? (formal, casual, técnica) 4. VALORES: ¿Qué principios transmite? Formato JSON. """ } ) brand_dna = response.json() return { "mission": brand_dna.get('mission'), "vision": brand_dna.get('vision'), "voice": brand_dna.get('voice'), "values": brand_dna.get('values'), "color_palette": brand_dna.get('color_palette'), "typography_vibe": brand_dna.get('typography_vibe') }
Visual Artist
async def generate_visuals( tenant_id: int, catalog: dict, branding: dict ) -> dict: """ Genera imágenes publicitarias con Google Imagen 3 """ google_key = await get_tenant_credential( tenant_id=tenant_id, category="google" ) # Seleccionar top 3 productos top_products = catalog['top_products'][:3] visuals = [] for product in top_products: # Construir prompt prompt = f""" Professional advertising photography of {product['name']}. Brand vibe: {branding['voice']}. Colors: {branding['color_palette']}. High-end commercial lighting, cinematic composition. """ # Llamar a Google Imagen API response = await httpx.post( "https://generativelanguage.googleapis.com/v1beta/models/imagen-3.0:generate", headers={"Authorization": f"Bearer {google_key}"}, json={ "prompt": prompt, "num_images": 1, "aspect_ratio": "1:1" } ) image_url = response.json()['images'][0]['url'] visuals.append({ "product_id": product['id'], "product_name": product['name'], "image_url": image_url, "prompt_used": prompt }) return { "social_posts": visuals, "generated_at": datetime.utcnow().isoformat() }
8. Persistencia de Assets
business_assets Table
CREATE TABLE business_assets ( id SERIAL PRIMARY KEY, tenant_id INTEGER REFERENCES tenants(id), type VARCHAR(50) NOT NULL, -- 'branding', 'scripts', 'roi', 'visuals' content JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() );
Guardar Asset
async def save_asset( tenant_id: int, type: str, content: dict ) -> BusinessAsset: """ Persiste asset generado en DB """ asset = BusinessAsset( tenant_id=tenant_id, type=type, content=content ) session.add(asset) await session.commit() await session.refresh(asset) return asset
9. Troubleshooting
"Stream se desconecta"
Causa: Timeout de SSE (servidor cierra conexión) Solución: Enviar "heartbeat" cada 15 segundos desde backend
"Error: Tienda Nube API 401"
Causa: Token expirado o inválido Solución: Re-autenticar Tienda Nube en Settings
"No se generan imágenes"
Causa: Google API key faltante o quota excedida Solución: Configurar credencial 'google' en Vault
"RAG vectorization falla"
Causa: Supabase URL incorrecta o pgvector no habilitado Solución: Verificar SUPABASE_DB_URL y extensión pgvector
10. Checklist de Implementación
Frontend
- Botón "Hacer Magia" funcional
- SSE listener conectado
- Progress bar actualizada en tiempo real
- Log console visible
- Redirección a Forge al completar
- Error handling (stream disconnect)
Backend
- Background task ejecutándose
- SSE endpoint con Redis pubsub
- 7 agentes implementados
- Assets guardados en business_assets
- Validación de prerequisites
- Broadcast de eventos correcto
Credenciales Requeridas
- Tienda Nube (access_token + user_id)
- OpenAI (API key)
- Google (API key para Imagen 3)
- Supabase (DB URL para RAG)
Tip: Para debugging, usar Redis CLI para monitorear mensajes:
SUBSCRIBE magic_progress:*