Claude-skill-registry agent-orchestration-patterns
Automatically applies when designing multi-agent systems. Ensures proper tool schema design with Pydantic, agent state management, error handling for tool execution, and orchestration patterns.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/agent-orchestration-patterns" ~/.claude/skills/majiayu000-claude-skill-registry-agent-orchestration-patterns && rm -rf "$T"
manifest:
skills/data/agent-orchestration-patterns/SKILL.mdsource content
Agent Orchestration Patterns
When building multi-agent systems and tool-calling workflows, follow these patterns for reliable, maintainable orchestration.
Trigger Keywords: agent, multi-agent, tool calling, orchestration, subagent, tool schema, function calling, agent state, agent routing, agent graph, LangChain, LlamaIndex, Anthropic tools
Agent Integration: Used by
ml-system-architect, agent-orchestrator-engineer, llm-app-engineer, security-and-privacy-engineer-ml
✅ Correct Pattern: Tool Schema with Pydantic
from pydantic import BaseModel, Field from typing import List, Literal, Optional from enum import Enum class SearchQuery(BaseModel): """Tool input for search.""" query: str = Field(..., description="Search query string") max_results: int = Field( 10, ge=1, le=100, description="Maximum number of results to return" ) filter_domain: Optional[str] = Field( None, description="Optional domain to filter results (e.g., 'python.org')" ) class SearchResult(BaseModel): """Individual search result.""" title: str url: str snippet: str relevance_score: float = Field(ge=0.0, le=1.0) class SearchResponse(BaseModel): """Tool output for search.""" results: List[SearchResult] total_found: int query_time_ms: float async def search_tool(input: SearchQuery) -> SearchResponse: """ Search the web and return relevant results. Args: input: Validated search parameters Returns: Search results with metadata Example: >>> result = await search_tool(SearchQuery( ... query="Python async patterns", ... max_results=5 ... )) >>> print(result.results[0].title) """ # Implementation results = await perform_search( query=input.query, limit=input.max_results, domain_filter=input.filter_domain ) return SearchResponse( results=results, total_found=len(results), query_time_ms=123.45 ) # Convert to Claude tool schema def tool_to_anthropic_schema(func, input_model: type[BaseModel]) -> dict: """Convert Pydantic model to Anthropic tool schema.""" return { "name": func.__name__.replace("_tool", ""), "description": func.__doc__.strip().split("\n")[0], "input_schema": input_model.model_json_schema() } # Register tool SEARCH_TOOL = tool_to_anthropic_schema(search_tool, SearchQuery)
Agent State Management
from typing import List, Dict, Any, Optional from datetime import datetime from pydantic import BaseModel, Field import uuid class Message(BaseModel): """A single message in conversation.""" role: Literal["user", "assistant", "system"] content: str timestamp: datetime = Field(default_factory=datetime.utcnow) metadata: Dict[str, Any] = Field(default_factory=dict) class ToolCall(BaseModel): """Record of a tool execution.""" tool_name: str input: Dict[str, Any] output: Any duration_ms: float success: bool error: Optional[str] = None timestamp: datetime = Field(default_factory=datetime.utcnow) class AgentState(BaseModel): """State for an agent conversation.""" session_id: str = Field(default_factory=lambda: str(uuid.uuid4())) messages: List[Message] = Field(default_factory=list) tool_calls: List[ToolCall] = Field(default_factory=list) metadata: Dict[str, Any] = Field(default_factory=dict) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) def add_message(self, role: str, content: str, **metadata): """Add message to conversation history.""" self.messages.append( Message(role=role, content=content, metadata=metadata) ) self.updated_at = datetime.utcnow() def add_tool_call(self, tool_call: ToolCall): """Record tool execution.""" self.tool_calls.append(tool_call) self.updated_at = datetime.utcnow() def get_conversation_history(self) -> List[Dict[str, str]]: """Get messages in format for LLM API.""" return [ {"role": msg.role, "content": msg.content} for msg in self.messages if msg.role != "system" ] class AgentStateManager: """Manage agent states with persistence.""" def __init__(self): self._states: Dict[str, AgentState] = {} async def get_or_create(self, session_id: str | None = None) -> AgentState: """Get existing state or create new one.""" if session_id and session_id in self._states: return self._states[session_id] state = AgentState(session_id=session_id or str(uuid.uuid4())) self._states[state.session_id] = state return state async def save(self, state: AgentState): """Persist agent state.""" self._states[state.session_id] = state # Could also save to database/redis here async def load(self, session_id: str) -> Optional[AgentState]: """Load agent state from storage.""" return self._states.get(session_id)
Tool Execution with Error Handling
from typing import Callable, Any, Type import asyncio import logging from datetime import datetime logger = logging.getLogger(__name__) class ToolError(Exception): """Base tool execution error.""" pass class ToolTimeoutError(ToolError): """Tool execution timeout.""" pass class ToolValidationError(ToolError): """Tool input validation error.""" pass class ToolExecutor: """Execute tools with validation and error handling.""" def __init__(self, timeout: float = 30.0): self.timeout = timeout self.tools: Dict[str, tuple[Callable, Type[BaseModel]]] = {} def register_tool( self, name: str, func: Callable, input_model: Type[BaseModel] ): """Register a tool with its input schema.""" self.tools[name] = (func, input_model) async def execute( self, tool_name: str, tool_input: Dict[str, Any] ) -> ToolCall: """ Execute tool with validation and error handling. Args: tool_name: Name of tool to execute tool_input: Raw input dict from LLM Returns: ToolCall record with result or error Raises: ToolError: If tool execution fails unrecoverably """ if tool_name not in self.tools: error_msg = f"Unknown tool: {tool_name}" logger.error(error_msg) return ToolCall( tool_name=tool_name, input=tool_input, output=None, duration_ms=0.0, success=False, error=error_msg ) func, input_model = self.tools[tool_name] start_time = datetime.utcnow() try: # Validate input try: validated_input = input_model(**tool_input) except Exception as e: raise ToolValidationError( f"Invalid input for {tool_name}: {str(e)}" ) from e # Execute with timeout try: output = await asyncio.wait_for( func(validated_input), timeout=self.timeout ) except asyncio.TimeoutError: raise ToolTimeoutError( f"Tool {tool_name} exceeded timeout of {self.timeout}s" ) duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 logger.info( f"Tool executed successfully", extra={ "tool_name": tool_name, "duration_ms": duration_ms } ) return ToolCall( tool_name=tool_name, input=tool_input, output=output, duration_ms=duration_ms, success=True ) except ToolError as e: duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000 logger.error( f"Tool execution failed", extra={ "tool_name": tool_name, "error": str(e), "duration_ms": duration_ms } ) return ToolCall( tool_name=tool_name, input=tool_input, output=None, duration_ms=duration_ms, success=False, error=str(e) )
Agent Orchestration Patterns
Pattern 1: Sequential Agent Chain
from typing import List class SequentialOrchestrator: """Execute agents in sequence, passing output to next.""" def __init__(self, agents: List[Callable]): self.agents = agents async def run(self, initial_input: str) -> str: """ Run agents sequentially. Args: initial_input: Input for first agent Returns: Output from final agent """ current_input = initial_input for i, agent in enumerate(self.agents): logger.info(f"Running agent {i + 1}/{len(self.agents)}") current_input = await agent(current_input) return current_input # Example usage async def research_agent(query: str) -> str: """Research a topic.""" # Search and gather information return "research results..." async def synthesis_agent(research: str) -> str: """Synthesize research into summary.""" # Analyze and synthesize return "synthesized summary..." async def writer_agent(summary: str) -> str: """Write final article.""" # Generate polished content return "final article..." # Chain agents orchestrator = SequentialOrchestrator([ research_agent, synthesis_agent, writer_agent ]) result = await orchestrator.run("Tell me about Python async patterns")
Pattern 2: Parallel Agent Execution
import asyncio class ParallelOrchestrator: """Execute multiple agents concurrently.""" def __init__(self, agents: List[Callable]): self.agents = agents async def run(self, input: str) -> List[Any]: """ Run all agents in parallel with same input. Args: input: Input for all agents Returns: List of outputs from each agent """ tasks = [agent(input) for agent in self.agents] results = await asyncio.gather(*tasks, return_exceptions=True) # Handle any failures for i, result in enumerate(results): if isinstance(result, Exception): logger.error(f"Agent {i} failed: {result}") return results # Example: Multiple specialized agents async def technical_reviewer(code: str) -> str: """Review code for technical issues.""" return "technical review..." async def security_reviewer(code: str) -> str: """Review code for security issues.""" return "security review..." async def performance_reviewer(code: str) -> str: """Review code for performance issues.""" return "performance review..." # Run reviewers in parallel orchestrator = ParallelOrchestrator([ technical_reviewer, security_reviewer, performance_reviewer ]) reviews = await orchestrator.run(code_to_review)
Pattern 3: Router-Based Orchestration
from enum import Enum class AgentType(str, Enum): """Available agent types.""" TECHNICAL = "technical" CREATIVE = "creative" ANALYTICAL = "analytical" class RouterOrchestrator: """Route requests to appropriate specialized agent.""" def __init__(self): self.agents: Dict[AgentType, Callable] = {} def register(self, agent_type: AgentType, agent: Callable): """Register an agent.""" self.agents[agent_type] = agent async def classify_request(self, request: str) -> AgentType: """ Classify request to determine which agent to use. Args: request: User request Returns: Agent type to handle request """ # Use LLM to classify prompt = f"""Classify this request into one of: - technical: Code, debugging, technical implementation - creative: Writing, brainstorming, creative content - analytical: Data analysis, research, evaluation Request: {request} Return only the category name.""" category = await llm_classify(prompt) return AgentType(category.lower().strip()) async def route(self, request: str) -> str: """ Route request to appropriate agent. Args: request: User request Returns: Response from selected agent """ agent_type = await self.classify_request(request) agent = self.agents.get(agent_type) if not agent: raise ValueError(f"No agent registered for type: {agent_type}") logger.info(f"Routing to {agent_type} agent") return await agent(request)
Pattern 4: Hierarchical Agent System
class SupervisorAgent: """Supervisor that delegates to specialized sub-agents.""" def __init__(self): self.sub_agents: Dict[str, Callable] = {} self.state_manager = AgentStateManager() async def delegate( self, task: str, state: AgentState ) -> str: """ Decompose task and delegate to sub-agents. Args: task: High-level task description state: Current conversation state Returns: Final result after delegation """ # Plan decomposition using LLM plan = await self.plan_task(task, state) results = [] for subtask in plan.subtasks: # Find appropriate sub-agent agent = self.find_agent_for_task(subtask) # Execute subtask result = await agent(subtask.description, state) results.append(result) # Update state state.add_message("assistant", f"Subtask result: {result}") # Synthesize final result return await self.synthesize_results(task, results, state) async def plan_task(self, task: str, state: AgentState) -> TaskPlan: """Decompose task into subtasks.""" # Use LLM to plan ... def find_agent_for_task(self, subtask: SubTask) -> Callable: """Select appropriate sub-agent for subtask.""" # Match subtask to agent capabilities ...
❌ Anti-Patterns
# ❌ No input validation async def tool(input: dict) -> dict: # Raw dict! return await do_something(input["query"]) # ✅ Better: Use Pydantic for validation async def tool(input: SearchQuery) -> SearchResponse: return await do_something(input.query) # ❌ No error handling in tool execution async def execute_tool(name: str, input: dict): func = tools[name] return await func(input) # Can fail! # ✅ Better: Comprehensive error handling async def execute_tool(name: str, input: dict) -> ToolCall: try: validated = InputModel(**input) result = await func(validated) return ToolCall(success=True, output=result) except ValidationError as e: return ToolCall(success=False, error=str(e)) # ❌ No timeout on tool execution result = await long_running_tool(input) # Could hang forever! # ✅ Better: Add timeout result = await asyncio.wait_for( long_running_tool(input), timeout=30.0 ) # ❌ Stateless conversations async def handle_request(prompt: str) -> str: return await agent.run(prompt) # No history! # ✅ Better: Maintain conversation state async def handle_request(prompt: str, session_id: str) -> str: state = await state_manager.load(session_id) state.add_message("user", prompt) response = await agent.run(state.get_conversation_history()) state.add_message("assistant", response) await state_manager.save(state) return response # ❌ No logging of tool calls result = await tool(input) # ✅ Better: Log all tool executions logger.info("Executing tool", extra={ "tool_name": tool.__name__, "input": input.model_dump() }) result = await tool(input) logger.info("Tool completed", extra={ "duration_ms": duration, "success": True })
Best Practices Checklist
- ✅ Define tool schemas with Pydantic models
- ✅ Validate all tool inputs before execution
- ✅ Set timeouts on tool execution
- ✅ Handle tool errors gracefully (don't crash)
- ✅ Maintain conversation state across turns
- ✅ Log all tool executions with inputs and outputs
- ✅ Use typed responses from tools
- ✅ Implement retry logic for transient failures
- ✅ Redact sensitive data in tool logs
- ✅ Use async/await throughout agent code
- ✅ Structure agent output as Pydantic models
- ✅ Track agent performance metrics
Auto-Apply
When building multi-agent systems:
- Define tool schemas with Pydantic
- Implement ToolExecutor for safe execution
- Maintain AgentState for conversations
- Add comprehensive error handling
- Log all agent and tool interactions
- Use appropriate orchestration pattern (sequential, parallel, router, hierarchical)
- Set timeouts on all agent operations
Related Skills
- For tool schema definitionpydantic-models
- For async agent patternsasync-await-checker
- For LLM integrationllm-app-architecture
- For error handlingstructured-errors
- For agent loggingobservability-logging
- For type-safe tool definitionstype-safety