Claude-skills orchestrating-agents
Orchestrates parallel API instances, delegated sub-tasks, and multi-agent workflows with streaming and tool-enabled delegation patterns. Use for parallel analysis, multi-perspective reviews, or complex task decomposition.
git clone https://github.com/oaustegard/claude-skills
T=$(mktemp -d) && git clone --depth=1 https://github.com/oaustegard/claude-skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/orchestrating-agents" ~/.claude/skills/oaustegard-claude-skills-orchestrating-agents && rm -rf "$T"
orchestrating-agents/SKILL.mdOrchestrating Agents
This skill enables programmatic API invocations for advanced workflows including parallel processing, task delegation, and multi-agent analysis using the Anthropic API.
When to Use This Skill
Primary use cases:
- Parallel sub-tasks: Break complex analysis into simultaneous independent streams
- Multi-perspective analysis: Get 3-5 different expert viewpoints concurrently
- Delegation: Offload specific subtasks to specialized API instances
- Recursive workflows: Orchestrator coordinating multiple API instances
- High-volume processing: Batch process multiple items concurrently
Trigger patterns:
- "Parallel analysis", "multi-perspective review", "concurrent processing"
- "Delegate subtasks", "coordinate multiple agents"
- "Run analyses from different perspectives"
- "Get expert opinions from multiple angles"
Quick Start
Single Invocation
import sys sys.path.append('/home/user/claude-skills/orchestrating-agents/scripts') from claude_client import invoke_claude response = invoke_claude( prompt="Analyze this code for security vulnerabilities: ...", model="claude-sonnet-4-6" ) print(response)
Parallel Multi-Perspective Analysis
from claude_client import invoke_parallel prompts = [ { "prompt": "Analyze from security perspective: ...", "system": "You are a security expert" }, { "prompt": "Analyze from performance perspective: ...", "system": "You are a performance optimization expert" }, { "prompt": "Analyze from maintainability perspective: ...", "system": "You are a software architecture expert" } ] results = invoke_parallel(prompts, model="claude-sonnet-4-6") for i, result in enumerate(results): print(f"\n=== Perspective {i+1} ===") print(result)
Parallel with Shared Cached Context (Recommended)
For parallel operations with shared base context, use caching to reduce costs by up to 90%:
from claude_client import invoke_parallel # Large context shared across all sub-agents (e.g., codebase, documentation) base_context = """ <codebase> ...large codebase or documentation (1000+ tokens)... </codebase> """ prompts = [ {"prompt": "Find security vulnerabilities in the authentication module"}, {"prompt": "Identify performance bottlenecks in the API layer"}, {"prompt": "Suggest refactoring opportunities in the database layer"} ] # First sub-agent creates cache, subsequent ones reuse it results = invoke_parallel( prompts, shared_system=base_context, cache_shared_system=True # 90% cost reduction for cached content )
Multi-Turn Conversation with Auto-Caching
For sub-agents that need multiple rounds of conversation:
from claude_client import ConversationThread # Create a conversation thread (auto-caches history) agent = ConversationThread( system="You are a code refactoring expert with access to the codebase", cache_system=True ) # Turn 1: Initial analysis response1 = agent.send("Analyze the UserAuth class for issues") print(response1) # Turn 2: Follow-up (reuses cached system + turn 1) response2 = agent.send("How would you refactor the login method?") print(response2) # Turn 3: Implementation (reuses all previous context) response3 = agent.send("Show me the refactored code") print(response3)
Streaming Responses
For real-time feedback from sub-agents:
from claude_client import invoke_claude_streaming def show_progress(chunk): print(chunk, end='', flush=True) response = invoke_claude_streaming( "Write a comprehensive security analysis...", callback=show_progress )
Parallel Streaming
Monitor multiple sub-agents simultaneously:
from claude_client import invoke_parallel_streaming def agent1_callback(chunk): print(f"[Security] {chunk}", end='', flush=True) def agent2_callback(chunk): print(f"[Performance] {chunk}", end='', flush=True) results = invoke_parallel_streaming( [ {"prompt": "Security review: ..."}, {"prompt": "Performance review: ..."} ], callbacks=[agent1_callback, agent2_callback] )
Interruptible Operations
Cancel long-running parallel operations:
from claude_client import invoke_parallel_interruptible, InterruptToken import threading import time token = InterruptToken() # Run in background def run_analysis(): results = invoke_parallel_interruptible( prompts=[...], interrupt_token=token ) return results thread = threading.Thread(target=run_analysis) thread.start() # Interrupt after 5 seconds time.sleep(5) token.interrupt()
Core Functions
invoke_claude()
invoke_claude()Single synchronous invocation with full control:
invoke_claude( prompt: str | list[dict], model: str = "claude-sonnet-4-6", system: str | list[dict] | None = None, max_tokens: int = 4096, temperature: float = 1.0, streaming: bool = False, cache_system: bool = False, cache_prompt: bool = False, messages: list[dict] | None = None, **kwargs ) -> str
Parameters:
: The user message (string or list of content blocks)prompt
: Claude model to use (default: claude-sonnet-4-6)model
: Optional system prompt (string or list of content blocks)system
: Maximum tokens in response (default: 4096)max_tokens
: Randomness 0-1 (default: 1.0)temperature
: Enable streaming response (default: False)streaming
: Add cache_control to system prompt (requires 1024+ tokens, default: False)cache_system
: Add cache_control to user prompt (requires 1024+ tokens, default: False)cache_prompt
: Pre-built messages list for multi-turn (overrides prompt)messages
: Additional API parameters (top_p, top_k, etc.)**kwargs
Returns: Response text as string
Note: Caching requires minimum 1,024 tokens per cache breakpoint. Cache lifetime is 5 minutes (refreshed on use).
invoke_parallel()
invoke_parallel()Concurrent invocations using lightweight workflow pattern:
invoke_parallel( prompts: list[dict], model: str = "claude-sonnet-4-6", max_tokens: int = 4096, max_workers: int = 5, shared_system: str | list[dict] | None = None, cache_shared_system: bool = False ) -> list[str]
Parameters:
: List of dicts with 'prompt' (required) and optional 'system', 'temperature', 'cache_system', 'cache_prompt', etc.prompts
: Claude model for all invocationsmodel
: Max tokens per responsemax_tokens
: Max concurrent API calls (default: 5, max: 10)max_workers
: System context shared across ALL invocations (for cache efficiency)shared_system
: Add cache_control to shared_system (default: False)cache_shared_system
Returns: List of response strings in same order as prompts
Note: For optimal cost savings, put large common context (1024+ tokens) in
shared_system with cache_shared_system=True. First invocation creates cache, subsequent ones reuse it (90% cost reduction).
invoke_claude_streaming()
invoke_claude_streaming()Stream responses in real-time with optional callbacks:
invoke_claude_streaming( prompt: str | list[dict], callback: callable = None, model: str = "claude-sonnet-4-6", system: str | list[dict] | None = None, max_tokens: int = 4096, temperature: float = 1.0, cache_system: bool = False, cache_prompt: bool = False, **kwargs ) -> str
Parameters:
: Optional function called with each text chunk (str) as it arrivescallback- (other parameters same as invoke_claude)
Returns: Complete accumulated response text
invoke_parallel_streaming()
invoke_parallel_streaming()Parallel invocations with per-agent streaming callbacks:
invoke_parallel_streaming( prompts: list[dict], callbacks: list[callable] = None, model: str = "claude-sonnet-4-6", max_tokens: int = 4096, max_workers: int = 5, shared_system: str | list[dict] | None = None, cache_shared_system: bool = False ) -> list[str]
Parameters:
: Optional list of callback functions, one per promptcallbacks- (other parameters same as invoke_parallel)
invoke_parallel_interruptible()
invoke_parallel_interruptible()Parallel invocations with cancellation support:
invoke_parallel_interruptible( prompts: list[dict], interrupt_token: InterruptToken = None, # ... same other parameters as invoke_parallel ) -> list[str]
Parameters:
: Optional InterruptToken to signal cancellationinterrupt_token- (other parameters same as invoke_parallel)
Returns: List of response strings (None for interrupted tasks)
ConversationThread
ConversationThreadManages multi-turn conversations with automatic caching:
thread = ConversationThread( system: str | list[dict] | None = None, model: str = "claude-sonnet-4-6", max_tokens: int = 4096, temperature: float = 1.0, cache_system: bool = True ) response = thread.send( user_message: str | list[dict], cache_history: bool = True ) -> str
Methods:
: Send message and get responsesend(message, cache_history=True)
: Get conversation historyget_messages()
: Clear conversation historyclear()
: Get number of turns__len__()
New in 0.3.0:
property: Number of completed turn pairsturn_count
: Lightweight continuation turn (requires priorsend_continuation(guidance, cache_history)
)send()
constructor parameter: Optional turn limitmax_turns
constructor parameter: Default continuation guidancecontinuation_prompt
StallDetector
StallDetectorMonitors activity timestamps and detects unresponsive operations:
from claude_client import StallDetector def handle_stall(task_id, idle_seconds): print(f"Task {task_id} stalled for {idle_seconds:.1f}s") detector = StallDetector(timeout=60.0, on_stall=handle_stall) detector.register("task-1") detector.start_monitoring(poll_interval=5.0) # Call heartbeat() during streaming/progress detector.heartbeat("task-1") # When done detector.unregister("task-1") detector.stop_monitoring()
TaskTracker
(task_state module)
TaskTrackerFormal task lifecycle state machine with enforced transitions:
from task_state import TaskTracker, TaskState tracker = TaskTracker(max_retries=3) tracker.add("task-1", category="security") tracker.claim("task-1") # UNCLAIMED → CLAIMED tracker.start("task-1") # CLAIMED → RUNNING (increments attempt) tracker.complete("task-1") # RUNNING → COMPLETED # On failure with retry: tracker.fail("task-2", error="timeout") tracker.retry("task-2") # FAILED → RETRY_QUEUED (if under max_retries) tracker.claim("task-2") # RETRY_QUEUED → CLAIMED # Query state tracker.active_count(category="security") tracker.get_by_state(TaskState.RUNNING) tracker.summary() # {"completed": 1, "running": 1, ...}
invoke_with_retry()
(orchestration module)
invoke_with_retry()Single invocation with exponential backoff:
from orchestration import invoke_with_retry response = invoke_with_retry( "Analyze this code...", max_retries=3, base_delay_ms=1000, # 1s, 2s, 4s backoff max_delay_ms=10000, # capped at 10s )
invoke_parallel_managed()
(orchestration module)
invoke_parallel_managed()Full-featured parallel invocations with all Symphony patterns:
from orchestration import invoke_parallel_managed, ConcurrencyLimiter limiter = ConcurrencyLimiter( global_limit=10, category_limits={"security": 3, "perf": 3} ) def reconcile(prompts, tracker): # Filter out invalid/duplicate work before dispatch return [p for p in prompts if should_run(p)] results = invoke_parallel_managed( prompts=[ {"prompt": "Security review...", "task_id": "sec-1", "category": "security"}, {"prompt": "Perf review...", "task_id": "perf-1", "category": "perf"}, ], reconcile=reconcile, concurrency_limiter=limiter, max_retries=3, stall_timeout=60.0, on_stall=lambda tid, idle: print(f"{tid} stalled"), )
Example Workflows
See references/workflows.md for detailed examples including:
- Multi-expert code review
- Parallel document analysis
- Recursive task delegation
- Advanced Agent SDK delegation patterns
- Prompt caching workflows
Execute Mode (Default Sub-Agent Prompt)
For autonomous sub-agents that should execute without asking questions:
from claude_client import invoke_claude, EXECUTE_MODE response = invoke_claude( prompt="Review auth.py for SQL injection vulnerabilities", system=f"You are a security expert.\n\n{EXECUTE_MODE}" )
EXECUTE_MODE encodes these principles (adapted from OpenAI Codex):
- Make assumptions instead of asking questions; state them briefly
- Think ahead: what else might be needed?
- Report failures with what you tried and what you'll do next
- Summarize deliverables and how to validate them
Agent Pool (Named Agents with Messaging)
For workflows where multiple agents need to communicate:
from agent_pool import AgentPool pool = AgentPool( shared_system="You are reviewing the auth module of a web app.", max_depth=3, # prevent recursive spawn explosion max_agents=10, ) # Spawn named agents with roles pool.spawn("security", system=f"Focus on vulnerabilities.\n\n{pool.EXECUTE_MODE}") pool.spawn("perf", system=f"Focus on performance.\n\n{pool.EXECUTE_MODE}") # Run turns (pending inter-agent messages auto-injected) sec_result = pool.run("security", "Review the login flow") # Agent-to-agent messaging pool.send("security", to="perf", content="Auth does N+1 queries in the session check loop", trigger_turn=True) # auto-runs perf with this context # Broadcast to all agents pool.broadcast("security", "Auth uses bcrypt cost=12, 200ms per hash") # Query pool state pool.agents() # ["security", "perf"] pool.agent_info("perf") # {name, depth, children, pending_messages, turns}
Spawn Reservation (Atomic Agent Creation)
For complex workflows where agent creation might fail:
from agent_pool import AgentPool pool = AgentPool(shared_system="Code review team") # Reservation pattern: name is reserved, rolled back on exception with pool.reserve("analyst", parent="lead") as res: res.configure(system="You analyze code complexity.", model="claude-opus-4-6") # If configure or any other work raises, the name is released # Agent "analyst" is now live # Depth limits prevent unbounded recursion pool.spawn("sub-analyst", parent="analyst") # depth=2, OK pool.spawn("sub-sub", parent="sub-analyst") # depth=3, raises ValueError
When to Use AgentPool vs invoke_parallel
| Pattern | Use When |
|---|---|
| Independent tasks, no inter-agent communication needed |
| Agents need to share findings, build on each other's work, or have parent/child relationships |
| Independent tasks with retry, stall detection, concurrency limits |
Setup
Prerequisites:
-
Install anthropic library:
uv pip install anthropic -
Configure API key via project knowledge file:
Option 1 (recommended): Individual file
- Create document:
ANTHROPIC_API_KEY.txt - Content: Your API key (e.g.,
)sk-ant-api03-...
Option 2: Combined file
- Create document:
API_CREDENTIALS.json - Content:
{ "anthropic_api_key": "sk-ant-api03-..." }
Get your API key: https://console.anthropic.com/settings/keys
- Create document:
Installation check:
python3 -c "import anthropic; print(f'✓ anthropic {anthropic.__version__}')"
Error Handling
The module provides comprehensive error handling:
from claude_client import invoke_claude, ClaudeInvocationError try: response = invoke_claude("Your prompt here") except ClaudeInvocationError as e: print(f"API Error: {e}") print(f"Status: {e.status_code}") print(f"Details: {e.details}") except ValueError as e: print(f"Configuration Error: {e}")
Common errors:
- API key missing: Add ANTHROPIC_API_KEY.txt to project knowledge (see Setup above)
- Rate limits: Reduce max_workers or add delays
- Token limits: Reduce prompt size or max_tokens
- Network errors: Automatic retry with exponential backoff
Prompt Caching
For detailed caching workflows and best practices, see references/workflows.md.
Performance Considerations
Token efficiency:
- Parallel calls use more tokens but save wall-clock time
- Use prompt caching for shared context (90% cost reduction)
- Use concise system prompts to reduce overhead
- Consider token budgets when setting max_tokens
Rate limits:
- Anthropic API has per-minute rate limits
- Default max_workers=5 is safe for most tiers
- Adjust based on your API tier and rate limits
Cost management:
- Each invocation consumes API credits
- Monitor usage in Anthropic Console
- Use smaller models (haiku) for simple tasks
- Use prompt caching for repeated context (90% savings)
- Cache lifetime: 5 minutes, refreshed on each use
Best Practices
-
Use parallel invocations for independent tasks only
- Don't parallelize sequential dependencies
- Each parallel task should be self-contained
-
Set appropriate system prompts
- Define clear roles/expertise for each instance
- Keeps responses focused and relevant
-
Handle errors gracefully
- Always wrap invocations in try-except
- Provide fallback behavior for failures
-
Test with small batches first
- Verify prompts work before scaling
- Check token usage and costs
-
Consider alternatives
- Not all tasks benefit from multiple instances
- Sometimes sequential with context is better
Token Efficiency
This skill uses ~800 tokens when loaded but enables powerful multi-agent patterns that can dramatically improve complex analysis quality and speed.
See Also
- references/api-reference.md - Detailed API documentation
- Anthropic API Docs - Official documentation