Claude-skill-registry AI Workflow Orchestrator
Expert guidance for building AI-powered workflows with n8n, Zapier, and custom orchestration systems. Use when automating workflows, integrating AI agents, or building no-code/low-code automation.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/ai-workflow-orchestrator" ~/.claude/skills/majiayu000-claude-skill-registry-ai-workflow-orchestrator && rm -rf "$T"
manifest:
skills/data/ai-workflow-orchestrator/SKILL.mdsafety · automated scan (medium risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
- eval/exec/Function constructor
- references .env files
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content
AI Workflow Orchestrator
Enterprise AI workflow automation with n8n, Zapier, and custom orchestration.
n8n Workflow Patterns
AI Agent Workflow
{ "nodes": [ { "name": "Webhook", "type": "n8n-nodes-base.webhook", "typeVersion": 1, "position": [250, 300], "webhookId": "user-query" }, { "name": "OpenAI Chat", "type": "@n8n/n8n-nodes-langchain.lmChatOpenAi", "typeVersion": 1, "position": [450, 300], "parameters": { "model": "gpt-4-turbo", "temperature": 0.7, "systemMessage": "You are a helpful assistant." } }, { "name": "Vector Store Query", "type": "@n8n/n8n-nodes-langchain.vectorStorePinecone", "typeVersion": 1, "position": [450, 150], "parameters": { "operation": "retrieve", "topK": 5 } }, { "name": "Response Formatter", "type": "n8n-nodes-base.function", "typeVersion": 1, "position": [650, 300], "parameters": { "functionCode": "return items.map(item => ({\n json: {\n response: item.json.response,\n sources: item.json.sources,\n confidence: item.json.confidence\n }\n}));" } } ], "connections": { "Webhook": { "main": [[{ "node": "Vector Store Query", "type": "main", "index": 0 }]] }, "Vector Store Query": { "main": [[{ "node": "OpenAI Chat", "type": "main", "index": 0 }]] }, "OpenAI Chat": { "main": [[{ "node": "Response Formatter", "type": "main", "index": 0 }]] } } }
Multi-Agent Orchestration
// n8n Custom Node for Agent Orchestration import { IExecuteFunctions } from 'n8n-core'; import { INodeExecutionData, INodeType, INodeTypeDescription } from 'n8n-workflow'; export class MultiAgentOrchestrator implements INodeType { description: INodeTypeDescription = { displayName: 'Multi-Agent Orchestrator', name: 'multiAgentOrchestrator', group: ['transform'], version: 1, description: 'Orchestrate multiple AI agents', defaults: { name: 'Multi-Agent Orchestrator', }, inputs: ['main'], outputs: ['main'], properties: [ { displayName: 'Agents', name: 'agents', type: 'fixedCollection', typeOptions: { multipleValues: true, }, default: {}, options: [ { name: 'agentValues', displayName: 'Agent', values: [ { displayName: 'Agent Name', name: 'name', type: 'string', default: '', }, { displayName: 'Agent Type', name: 'type', type: 'options', options: [ { name: 'Researcher', value: 'researcher' }, { name: 'Writer', value: 'writer' }, { name: 'Reviewer', value: 'reviewer' }, ], default: 'researcher', }, ], }, ], }, ], }; async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> { const items = this.getInputData(); const returnData: INodeExecutionData[] = []; for (let itemIndex = 0; itemIndex < items.length; itemIndex++) { const input = this.getNodeParameter('input', itemIndex, '') as string; const agents = this.getNodeParameter('agents', itemIndex, {}) as any; const results: any[] = []; for (const agent of agents.agentValues || []) { const result = await this.executeAgent(agent.type, input); results.push({ agent: agent.name, result }); } returnData.push({ json: { input, results, summary: this.summarizeResults(results), }, }); } return [returnData]; } private async executeAgent(type: string, input: string): Promise<string> { // Agent execution logic return `Result from ${type} agent`; } private summarizeResults(results: any[]): string { return results.map(r => r.result).join(' '); } }
Zapier Integration Patterns
AI-Powered Email Automation
// Zapier Custom Code Action const inputData = inputData || {}; const { email_content, sender } = inputData; // Call OpenAI API const response = await fetch('https://api.openai.com/v1/chat/completions', { method: 'POST', headers: { 'Authorization': `Bearer ${process.env.OPENAI_API_KEY}`, 'Content-Type': 'application/json', }, body: JSON.stringify({ model: 'gpt-4-turbo', messages: [ { role: 'system', content: 'You are an email assistant. Categorize and draft responses.', }, { role: 'user', content: `Categorize and draft a response to this email:\n\n${email_content}`, }, ], }), }); const data = await response.json(); const ai_response = data.choices[0].message.content; // Parse AI response const category = ai_response.match(/Category: (.*)/)?.[1] || 'General'; const draft_response = ai_response.match(/Draft:([\s\S]*)/)?.[1]?.trim() || ''; output = { category, draft_response, priority: category === 'Urgent' ? 'high' : 'normal', };
Custom Orchestration System
Workflow Engine
from typing import Dict, List, Callable, Any from pydantic import BaseModel import asyncio class WorkflowStep(BaseModel): name: str function: str inputs: Dict[str, str] = {} condition: str | None = None class Workflow(BaseModel): name: str steps: List[WorkflowStep] class WorkflowEngine: def __init__(self): self.functions: Dict[str, Callable] = {} self.context: Dict[str, Any] = {} def register_function(self, name: str, func: Callable): """Register a function that can be used in workflows.""" self.functions[name] = func async def execute_workflow(self, workflow: Workflow, initial_context: Dict[str, Any]): """Execute a workflow with the given context.""" self.context = initial_context.copy() for step in workflow.steps: # Check condition if step.condition and not self._evaluate_condition(step.condition): continue # Prepare inputs inputs = { key: self._resolve_value(value) for key, value in step.inputs.items() } # Execute function func = self.functions.get(step.function) if not func: raise ValueError(f"Function not found: {step.function}") result = await func(**inputs) if asyncio.iscoroutinefunction(func) else func(**inputs) # Store result self.context[step.name] = result return self.context def _resolve_value(self, value: str) -> Any: """Resolve context variables in string values.""" if value.startswith('$'): return self.context.get(value[1:]) return value def _evaluate_condition(self, condition: str) -> bool: """Evaluate a condition expression.""" return eval(condition, {}, self.context) # Usage engine = WorkflowEngine() # Register AI functions async def analyze_sentiment(text: str) -> str: # Call AI API return "positive" async def generate_response(sentiment: str, text: str) -> str: # Call AI API return f"Response based on {sentiment} sentiment" engine.register_function('analyze_sentiment', analyze_sentiment) engine.register_function('generate_response', generate_response) # Define workflow workflow = Workflow( name="Email Response", steps=[ WorkflowStep( name="sentiment", function="analyze_sentiment", inputs={"text": "$email_content"} ), WorkflowStep( name="response", function="generate_response", inputs={"sentiment": "$sentiment", "text": "$email_content"}, condition="sentiment in ['positive', 'neutral']" ), ] ) # Execute result = await engine.execute_workflow( workflow, {"email_content": "Thank you for your help!"} )
Agent Chain Pattern
interface Agent { name: string; execute: (input: any) => Promise<any>; } class AgentChain { private agents: Agent[] = []; add(agent: Agent): this { this.agents.push(agent); return this; } async execute(initialInput: any): Promise<any> { let result = initialInput; for (const agent of this.agents) { console.log(`Executing ${agent.name}...`); result = await agent.execute(result); } return result; } } // Usage const chain = new AgentChain() .add({ name: 'Researcher', execute: async (query: string) => { // Research logic return { query, sources: ['source1', 'source2'] }; }, }) .add({ name: 'Summarizer', execute: async (data: any) => { // Summarization logic return { ...data, summary: 'Summary of research' }; }, }) .add({ name: 'Formatter', execute: async (data: any) => { // Formatting logic return { title: data.query, content: data.summary, references: data.sources, }; }, }); const result = await chain.execute('What is quantum computing?');
Integration Patterns
Webhook Handlers
from fastapi import FastAPI, Request from pydantic import BaseModel app = FastAPI() class ZapierWebhook(BaseModel): event: str data: dict @app.post("/zapier/webhook") async def handle_zapier_webhook(webhook: ZapierWebhook): """Handle incoming Zapier webhooks.""" if webhook.event == "new_lead": await process_lead(webhook.data) elif webhook.event == "support_ticket": await process_ticket(webhook.data) return {"status": "processed"} @app.post("/n8n/webhook/{workflow_id}") async def handle_n8n_webhook(workflow_id: str, request: Request): """Handle incoming n8n webhooks.""" data = await request.json() result = await execute_workflow(workflow_id, data) return result
Task Queues
from celery import Celery from typing import Dict celery = Celery('tasks', broker='redis://localhost:6379') @celery.task def process_with_ai(data: Dict) -> Dict: """Process data with AI agent in background.""" result = ai_agent.process(data) notify_completion(result) return result # Trigger from workflow process_with_ai.delay({"text": "Process this"})
Best Practices
✅ Use idempotent operations ✅ Implement error handling and retries ✅ Add logging and monitoring ✅ Use task queues for long-running operations ✅ Implement rate limiting ✅ Version your workflows ✅ Test workflows thoroughly ✅ Use environment variables for secrets ✅ Implement rollback mechanisms ✅ Monitor workflow performance
When to Use: AI workflow automation, n8n/Zapier integrations, multi-agent orchestration, business process automation.