Tony strands-multi-agent

Strands Agents SDK for multi-agent orchestration patterns. Use this skill when building agent orchestration systems, implementing sequential/graph/swarm/agents-as-tools patterns, creating agent teams, managing shared state, or streaming agent events. Triggers on "strands", "multi-agent", "agent orchestration", "swarm", "graph pattern", "agent handoff", "sequential pipeline".

install
source · Clone the upstream repo
git clone https://github.com/jaydeland/Tony
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/jaydeland/Tony "$T" && mkdir -p ~/.claude/skills && cp -r "$T/.claude/skills/strands-multi-agent" ~/.claude/skills/jaydeland-tony-strands-multi-agent && rm -rf "$T"
manifest: .claude/skills/strands-multi-agent/skill.md
source content

Strands Python SDK — Multi-Agent Orchestration

Purpose: Master the Strands Agents SDK for building orchestration systems where agents communicate and collaborate. The orchestrator never writes code—it manages inputs, plans, creates agents, and assembles teams to complete work.


Overview

The Strands Agents SDK (

strands-agents
) provides four distinct orchestration patterns. Choose based on workflow requirements:

PatternExecution FlowBest For
SequentialLinear A → B → CSimple pipelines
GraphConditional branching, loopsStructured processes with dynamic paths
SwarmAutonomous handoffsCollaborative self-organizing teams
Agents-as-ToolsHierarchical delegationSupervisor → specialist pattern

Installation

python -m venv .venv
source .venv/bin/activate
pip install strands-agents
pip install strands-agents-tools strands-agents-builder  # Optional tools

Requirements: Python 3.10+, AWS credentials for Bedrock (or configure alternative provider)


Core Concepts

Agent Loop

Reasoning → Tool Selection → Tool Execution → Repeat

Model Configuration

from strands.models.anthropic import AnthropicModel

# Default (uses ANTHROPIC_API_KEY env var)
model = AnthropicModel(model_id="claude-sonnet-4-20250514", max_tokens=4096)

# Also supported: BedrockModel, OllamaModel, OpenAIModel, GeminiModel, etc.

Pattern 1: Sequential Pipeline (Basic)

Sequential passing where each agent's output becomes the next agent's input.

from strands import Agent

# Create specialized agents
researcher_agent = Agent(
    system_prompt="You are a Researcher. Find comprehensive information...",
    tools=[http_request],
    callback_handler=None  # Suppress intermediate output
)

analyst_agent = Agent(
    system_prompt="You are an Analyst. Verify facts and extract insights...",
    callback_handler=None
)

writer_agent = Agent(
    system_prompt="You are a Writer. Create clear, well-structured reports..."
)

# Orchestration: Sequential passing
def run_workflow(user_input: str):
    research_response = researcher_agent(f"Research: '{user_input}'")
    research_findings = str(research_response)

    analyst_response = analyst_agent(f"Analyze:\n\n{research_findings}")
    analysis = str(analyst_response)

    final_report = writer_agent(f"Create report based on:\n\n{analysis}")
    return final_report

Key Pattern:

callback_handler=None
suppresses verbose console output from intermediate agents.


Pattern 2: Graph Orchestration (Conditional Branching)

For workflows requiring branching logic, loops, or parallel fan-out/fan-in patterns.

from strands import Agent
from strands.multiagent import GraphBuilder
from strands.multiagent.graph import GraphState

# Create specialized agents
classifier = Agent(name="classifier", system_prompt="Classify as 'technical' or 'business'")
tech_specialist = Agent(name="tech_specialist", system_prompt="Handle technical issues...")
business_specialist = Agent(name="business_specialist", system_prompt="Handle business issues...")

# Define conditional routing functions
def is_technical(state: GraphState) -> bool:
    result = state.results.get("classifier")
    return bool(result and "technical" in str(result.result).lower())

def is_business(state: GraphState) -> bool:
    result = state.results.get("classifier")
    return bool(result and "business" in str(result.result).lower())

# Build graph with conditional edges
builder = GraphBuilder()
builder.add_node(classifier, "classifier")
builder.add_node(tech_specialist, "tech_specialist")
builder.add_node(business_specialist, "business_specialist")

# Conditional branching
builder.add_edge("classifier", "tech_specialist", condition=is_technical)
builder.add_edge("classifier", "business_specialist", condition=is_business)
builder.set_entry_point("classifier")
builder.set_execution_timeout(600)

graph = builder.build()
result = graph("My server is returning 500 errors")

Fan-Out Parallel Pattern (Wait for All Dependencies)

from strands.multiagent.graph import GraphState
from strands.multiagent.base import Status

def all_dependencies_complete(required_nodes: list[str]):
    """Factory function creating AND condition for multiple dependencies."""
    def check(state: GraphState) -> bool:
        return all(
            node_id in state.results and state.results[node_id].status == Status.COMPLETED
            for node_id in required_nodes
        )
    return check

# Build: research → [analysis, fact_check] → report (fan-in waits for both)
builder.add_edge("research", "analysis")
builder.add_edge("research", "fact_check")
builder.add_edge("analysis", "report", condition=all_dependencies_complete(["analysis", "fact_check"]))
builder.add_edge("fact_check", "report", condition=all_dependencies_complete(["analysis", "fact_check"]))

Feedback Loop Pattern (Draft → Review → Revision)

def needs_revision(state: GraphState) -> bool:
    return "revision needed" in str(state.results.get("reviewer", "").result).lower()

def is_approved(state: GraphState) -> bool:
    return "approved" in str(state.results.get("reviewer", "").result).lower()

builder = GraphBuilder()
builder.add_node(draft_writer, "draft_writer")
builder.add_node(reviewer, "reviewer")
builder.add_node(publisher, "publisher")

builder.add_edge("draft_writer", "reviewer")
builder.add_edge("reviewer", "draft_writer", condition=needs_revision)  # Loop back
builder.add_edge("reviewer", "publisher", condition=is_approved)        # Exit

# Safety settings for cyclic graphs
builder.set_max_node_executions(10)   # Prevent infinite loops
builder.set_execution_timeout(300)    # 5 minute timeout

Pattern 3: Swarm Orchestration (Autonomous Collaboration)

For self-organizing agent teams where agents autonomously decide handoffs based on shared context.

from strands import Agent
from strands.multiagent.swarm import Swarm

# Create specialized agents
researcher = Agent(name="researcher", system_prompt="You are a researcher...")
analyst = Agent(name="analyst", system_prompt="You are an analyst...")
writer = Agent(name="writer", system_prompt="You are a writer...")

# Initialize Swarm orchestrator
swarm = Swarm(
    nodes=[researcher, analyst, writer],
    entry_point=researcher,
    max_handoffs=20,
    max_iterations=20,
    execution_timeout=900.0,   # 15 minutes total
    node_timeout=300.0,        # 5 minutes per node
)

# Execute
result = swarm(task="Your complex task here")

Swarm Invocation Methods

# Synchronous
result = swarm(task="Your task")

# Asynchronous with streaming events
async for event in swarm.stream_async(task="Your task"):
    if event.type == "multi_agent_node_start":
        print(f"Starting: {event.node_id}")
    elif event.type == "multi_agent_handoff":
        print(f"Handoff to: {event.handoff_node}")

# State serialization for resume
state = swarm.serialize_state()
swarm.deserialize_state(payload=state)

Swarm Streaming Events

EventDescription
multi_agent_node_start
Node begins execution
multi_agent_node_stream
Forwarded agent events with node context
multi_agent_handoff
Control transferred between agents
multi_agent_node_stop
Node stops execution
result
Final swarm result

Pattern 4: Agents-as-Tools (Supervisor Delegation)

For hierarchical orchestration where a supervisor agent delegates to specialists wrapped as tools.

from strands import Agent, tool

# Create specialist agents
math_assistant = Agent(name="math_assistant", system_prompt="Solve math problems step by step.")
language_assistant = Agent(name="language_assistant", system_prompt="Help with writing and grammar.")

# Wrap specialists as tools
@tool
def math_helper(problem: str) -> str:
    """Solve mathematical problems."""
    return str(math_assistant(problem))

@tool
def language_helper(text: str) -> str:
    """Help with writing questions."""
    return str(language_assistant(text))

# Orchestrator with delegated tools
orchestrator = Agent(
    system_prompt=(
        "You are an orchestrator. Delegate tasks to the appropriate specialist: "
        "Use math_helper for math problems, language_helper for writing tasks."
    ),
    tools=[math_helper, language_helper]
)

Shared State Pattern

Both Graph and Swarm support shared state for context propagation:

# Define shared state
shared_state = {
    "user_id": "user123",
    "session_id": "sess456",
    "debug_mode": True,
}

# Pass to invocation
result = graph("Analyze data", invocation_state=shared_state)
# or
result = swarm("Analyze data", invocation_state=shared_state)

# Access in tools via ToolContext
from strands import tool, ToolContext

@tool(context=True)
def query_data(query: str, tool_context: ToolContext) -> str:
    user_id = tool_context.invocation_state.get("user_id")
    debug_mode = tool_context.invocation_state.get("debug_mode", False)
    # Use context for personalized queries...

Hooks for Monitoring & Extension

from strands.types import BeforeNodeCallEvent, AfterNodeCallEvent

# Add monitoring hooks to graph or swarm
graph.addHook(BeforeNodeCallEvent, lambda event: print(f"Starting: {event.node_id}"))
graph.addHook(AfterNodeCallEvent, lambda event: print(f"Completed: {event.node_id}"))

# Enable debug logging
import logging
logging.getLogger("strands").setLevel(logging.DEBUG)
logging.basicConfig(format="%(levelname)s | %(name)s | %(message)s")

Debugging Multi-Agent Systems

  1. Enable debug logging:

    logging.getLogger("strands").setLevel(logging.DEBUG)
    
  2. Suppress intermediate output:

    agent = Agent(..., callback_handler=None)
    
  3. Stream events for visibility:

    async for event in swarm.stream_async(task="..."):
        print(event)
    
  4. Inspect execution order:

    print(f"Execution order: {[node.node_id for node in result.execution_order]}")
    
  5. Serialize state for investigation:

    state = swarm.serialize_state()  # Inspect this for debugging
    swarm.deserialize_state(payload=state)
    

When to Use Each Pattern

ScenarioRecommended Pattern
Linear pipeline (A → B → C)Sequential Pipeline
Conditional branchingGraph
Autonomous agent collaborationSwarm
Supervisor delegates to specialistsAgents-as-Tools
Fan-out parallel → fan-inGraph
Draft → review → revision loopGraph
Repeatable data pipelineWorkflow

Key Architecture Differences

AspectGraphSwarmWorkflow
Execution FlowControlled but DynamicSequential & AutonomousDeterministic & Parallel
Path DeterminationLLM decides at each nodeAgents autonomously hand offFixed by dependency graph
Cycles AllowedYesYesNo
State SharingFull shared state objectShared context with task historyTask outputs passed to dependents
Error HandlingDeveloper-defined error edgesAgent-driven with timeoutsSystemic (task failure halts dependents)

Resources

Local API Reference

  • API-REF.md — Complete API reference for Strands Agents SDK v1.31 including:
    • Core Agent API (
      Agent
      ,
      AgentWithMemory
      ,
      AgentHarness
      ,
      AgentConfig
      )
    • Model Providers (
      AnthropicModel
      ,
      OpenAIModel
      ,
      OllamaModel
      ,
      BedrockModel
      ,
      LiteLLMModel
      )
    • Multi-Agent Patterns (Sequential, Graph, Swarm, Agents-as-Tools)
    • Tool System (
      @tool
      decorator,
      ToolContext
      , built-in tools)
    • Types & Events (Event types,
      GraphState
      ,
      Status
      enum)
    • Utilities (Debug logging, hooks, metrics)
    • Error handling and best practices

External Documentation