Gsd-skill-creator agent-orchestration

Provides best practices for AI agent orchestration including MCP servers, A2A protocol, multi-agent coordination, and swarm architectures. Use when designing agent systems, configuring MCP servers, setting up agent teams, or when user mentions 'MCP', 'A2A', 'agent orchestration', 'multi-agent', 'swarm', 'agent team', 'LangGraph', 'CrewAI', 'AutoGen'.

install
source · Clone the upstream repo
git clone https://github.com/Tibsfox/gsd-skill-creator
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/Tibsfox/gsd-skill-creator "$T" && mkdir -p ~/.claude/skills && cp -r "$T/examples/skills/orchestration/agent-orchestration" ~/.claude/skills/tibsfox-gsd-skill-creator-agent-orchestration && rm -rf "$T"
manifest: examples/skills/orchestration/agent-orchestration/SKILL.md
source content

Agent Orchestration

Best practices for designing, deploying, and coordinating AI agent systems using MCP servers, A2A protocol, and multi-agent patterns.

Agent Orchestration Patterns

Orchestration determines how agents are coordinated, who makes decisions, and how work flows between them.

PatternDescriptionBest ForDrawback
CentralizedSingle orchestrator dispatches tasks to worker agentsPredictable workflows, clear task boundariesOrchestrator is a bottleneck and single point of failure
HierarchicalManager agents delegate to specialist sub-agentsComplex multi-domain tasksDeep hierarchies add latency and lose context
Peer-to-peerAgents communicate directly, no central coordinatorCollaborative reasoning, brainstormingHard to debug, potential infinite loops
PipelineAgents process sequentially, output feeds next agentData transformation, multi-stage analysisSlow for parallelizable work, rigid ordering
BlackboardShared state space that agents read from and write toProblems requiring incremental refinementContention on shared state, ordering issues
Auction/MarketAgents bid on tasks based on capability and capacityDynamic workload distributionOverhead of bidding, suboptimal for simple tasks
SwarmMany lightweight agents with simple rules, emergent behaviorExploration, search, large-scale parallel tasksUnpredictable outcomes, hard to steer

Choosing the Right Pattern

Is the workflow predictable and linear?
  YES --> Pipeline or Centralized
  NO  --> Does it require specialized domain expertise?
            YES --> Hierarchical (domain managers + specialists)
            NO  --> Do agents need to collaborate on shared output?
                      YES --> Blackboard or Peer-to-peer
                      NO  --> Is the workload dynamic and variable?
                                YES --> Auction/Market
                                NO  --> Centralized (default safe choice)

MCP (Model Context Protocol) for DevOps

MCP provides a standardized way for AI agents to interact with external tools, services, and data sources. Each MCP server exposes capabilities that agents can discover and invoke.

MCP Architecture

Agent (Claude, GPT, etc.)
  |
  +--> MCP Client (built into agent runtime)
         |
         +--> MCP Server: GitHub     (repos, PRs, issues)
         +--> MCP Server: Kubernetes (pods, deployments, services)
         +--> MCP Server: Database   (queries, schema inspection)
         +--> MCP Server: Monitoring (metrics, alerts, dashboards)
         +--> MCP Server: Cloud      (AWS/GCP/Azure resources)

MCP Server Configuration for DevOps Tools

{
  "mcpServers": {
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_PERSONAL_ACCESS_TOKEN": "${GITHUB_TOKEN}"
      }
    },
    "kubernetes": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-kubernetes"],
      "env": {
        "KUBECONFIG": "${HOME}/.kube/config"
      }
    },
    "postgres": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-postgres",
        "postgresql://readonly:${DB_PASSWORD}@db.internal:5432/production"
      ]
    },
    "filesystem": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-filesystem",
        "/opt/configs",
        "/var/log/apps"
      ]
    },
    "prometheus": {
      "command": "python",
      "args": ["-m", "mcp_prometheus"],
      "env": {
        "PROMETHEUS_URL": "http://prometheus.internal:9090"
      }
    }
  }
}

MCP Server Security Rules

RuleRationale
Use read-only credentials where possibleAgents should observe before acting; limit blast radius
Scope tokens to minimum required permissionsA GitHub token for reading PRs should not have admin access
Run MCP servers in isolated environmentsPrevent lateral movement if an MCP server is compromised
Log all MCP tool invocationsAudit trail for agent actions, required for compliance
Set rate limits on MCP server endpointsPrevent runaway agents from overwhelming external services
Validate agent inputs before executionMCP servers must sanitize and validate all parameters

Custom MCP Server Example

// mcp-server-deploy.ts -- Custom MCP server for deployment operations
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { CallToolRequestSchema, ListToolsRequestSchema } from "@modelcontextprotocol/sdk/types.js";

const server = new Server(
  { name: "deploy-server", version: "1.0.0" },
  { capabilities: { tools: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [{
    name: "get_deployment_status",
    description: "Get current deployment status for a service",
    inputSchema: {
      type: "object",
      properties: {
        service: { type: "string" },
        environment: { type: "string", enum: ["staging", "production"] },
      },
      required: ["service", "environment"],
    },
  }],
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;
  if (name === "get_deployment_status") {
    const status = await queryDeploymentSystem(args.service, args.environment);
    return { content: [{ type: "text", text: JSON.stringify(status, null, 2) }] };
  }
  throw new Error(`Unknown tool: ${name}`);
});

await server.connect(new StdioServerTransport());

A2A (Agent-to-Agent) Protocol

A2A is Google's open protocol for agent interoperability. It enables agents built on different frameworks to discover each other, negotiate capabilities, and exchange tasks.

A2A Core Concepts

ConceptDescription
Agent CardJSON metadata describing an agent's capabilities, endpoint, and auth
TaskA unit of work sent from one agent to another
MessageCommunication within a task (text, files, structured data)
ArtifactOutput produced by an agent (files, data, results)
Push NotificationServer-sent updates for long-running tasks

A2A Agent Card

{
  "name": "DevOps Deployment Agent",
  "description": "Handles deployments, rollbacks, and release management",
  "url": "https://agents.internal/deploy",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": true
  },
  "authentication": {
    "schemes": ["bearer"],
    "credentials": "oauth2_token"
  },
  "defaultInputModes": ["text/plain", "application/json"],
  "defaultOutputModes": ["text/plain", "application/json"],
  "skills": [
    {
      "id": "deploy-service",
      "name": "Deploy Service",
      "description": "Deploy a service to staging or production",
      "tags": ["deployment", "release"],
      "examples": [
        "Deploy payment-api v2.3.1 to staging",
        "Roll back auth-service in production to previous version"
      ]
    },
    {
      "id": "deployment-status",
      "name": "Check Deployment Status",
      "description": "Get current deployment status and history",
      "tags": ["monitoring", "status"]
    }
  ]
}

A2A Task Message Exchange

{
  "jsonrpc": "2.0",
  "method": "tasks/send",
  "id": "req-001",
  "params": {
    "id": "task-deploy-2025-001",
    "message": {
      "role": "user",
      "parts": [
        {
          "type": "text",
          "text": "Deploy payment-api v2.3.1 to staging environment"
        },
        {
          "type": "data",
          "mimeType": "application/json",
          "data": {
            "service": "payment-api",
            "version": "v2.3.1",
            "environment": "staging",
            "strategy": "canary",
            "canary_percentage": 10,
            "rollback_on_error": true
          }
        }
      ]
    }
  }
}

A2A Task Response

{
  "jsonrpc": "2.0",
  "id": "req-001",
  "result": {
    "id": "task-deploy-2025-001",
    "status": {
      "state": "completed",
      "message": {
        "role": "agent",
        "parts": [{ "type": "text", "text": "Deployed payment-api v2.3.1 to staging, canary at 10%." }]
      }
    },
    "artifacts": [{
      "name": "deployment-report",
      "parts": [{
        "type": "data",
        "mimeType": "application/json",
        "data": {
          "deployment_id": "deploy-abc123",
          "status": "healthy",
          "canary_metrics": { "error_rate": 0.001, "p99_latency_ms": 245 }
        }
      }]
    }]
  }
}

Agent Team Configuration

Agent teams assign distinct roles to specialized agents that collaborate on complex tasks.

Claude Code Agent Team Configuration

# agent-team.yaml -- DevOps agent team using Claude Code
team:
  name: devops-ops-team
  coordination: centralized

agents:
  - role: orchestrator
    model: claude-sonnet-4-20250514
    system_prompt: "Receive requests, delegate to specialists, synthesize results. Never act directly."
    tools: [dispatch_to_agent, check_agent_status, aggregate_results]

  - role: code-reviewer
    model: claude-sonnet-4-20250514
    system_prompt: "Review code for security, reliability, team standards. Actionable feedback with line refs."
    tools: [github_pr_read, github_pr_comment, run_static_analysis]

  - role: deployment-agent
    model: claude-sonnet-4-20250514
    system_prompt: "Handle deployments. Verify pre-conditions, canary for prod, confirm health checks."
    tools: [kubernetes_apply, deployment_status, rollback_deployment, run_smoke_tests]

  - role: incident-responder
    model: claude-sonnet-4-20250514
    system_prompt: "Gather metrics, correlate with changes, propose mitigations. No prod changes without approval."
    tools: [query_prometheus, query_logs, get_recent_deployments, create_incident_report]

workflows:
  deploy_request:
    - { agent: code-reviewer, action: review_changes, gate: approval_required }
    - { agent: deployment-agent, action: deploy_to_staging }
    - { agent: deployment-agent, action: run_smoke_tests, gate: tests_must_pass }
    - { agent: deployment-agent, action: deploy_to_production }
    - { agent: orchestrator, action: notify_team }

Swarm Architecture Comparison

Swarm architectures use multiple lightweight agents that coordinate through simple rules or shared state.

FrameworkArchitectureCoordinationState ManagementBest For
LangGraphGraph-based DAGExplicit edges between nodesShared state object passed through graphComplex workflows with conditional branching
CrewAIRole-based crewSequential or parallel task executionShared memory + per-agent memoryTask-oriented teams with clear role separation
AutoGenConversationalAgent-to-agent messagingConversation history as shared contextMulti-turn collaborative reasoning
OpenAI Agents SDKHandoff-basedAgent-to-agent handoffs with context transferThread-level state with tool resultsProduction agent systems with tool use
Claude CodeOrchestrator + sub-agentsParent spawns child agents via Task toolFile system + context passingDeveloper tooling and code generation

LangGraph: Conditional Workflow

# langgraph_deploy_workflow.py
from langgraph.graph import StateGraph, END
from typing import TypedDict, Literal

class DeployState(TypedDict):
    service: str
    version: str
    review_result: str       # "approved" | "rejected"
    staging_healthy: bool

def review_code(state: DeployState) -> DeployState:
    result = code_review_agent.invoke(f"Review {state['service']} {state['version']}")
    state["review_result"] = result.approval_status
    return state

def deploy_staging(state: DeployState) -> DeployState:
    result = deploy_agent.invoke(f"Deploy {state['service']} {state['version']} to staging")
    state["staging_healthy"] = result.healthy
    return state

def should_deploy(state: DeployState) -> Literal["deploy_staging", "end"]:
    return "deploy_staging" if state["review_result"] == "approved" else "end"

# Build: review --> (approved?) --> staging --> (healthy?) --> production
workflow = StateGraph(DeployState)
workflow.add_node("review", review_code)
workflow.add_node("deploy_staging", deploy_staging)
workflow.set_entry_point("review")
workflow.add_conditional_edges("review", should_deploy)
workflow.add_edge("deploy_staging", END)
graph = workflow.compile()

OpenAI Agents SDK: Handoff Pattern

# openai_agents_deploy.py
from agents import Agent, handoff, Runner

code_reviewer = Agent(
    name="Code Reviewer",
    instructions="""Review code changes for security and reliability.
    If approved, hand off to Deployer. If rejected, explain why.""",
    handoffs=["deployer"],
)

deployer = Agent(
    name="Deployer",
    instructions="""Deploy the approved changes. Use canary strategy
    for production. Hand off to Monitor after deployment.""",
    handoffs=["monitor"],
    tools=[deploy_to_staging, deploy_to_production, run_smoke_tests],
)

monitor = Agent(
    name="Monitor",
    instructions="""Monitor the deployment for 15 minutes. Check error
    rates, latency, and resource usage. Report any anomalies.""",
    tools=[query_metrics, check_error_rate, check_latency],
)

# Run the pipeline
result = Runner.run(
    code_reviewer,
    input="Deploy payment-api v2.3.1 -- changes include rate limiting middleware",
)

Agent Communication Patterns

Message Types

Message TypePurposeExample
Task RequestAsk an agent to perform work"Deploy service X to staging"
Status UpdateReport progress on ongoing work"Deployment at 50%, canary healthy"
ResultDeliver completed work output"Deployment complete, all health checks pass"
QueryAsk for information without action"What is the current error rate for service X?"
EscalationReport a problem requiring higher authority"Canary error rate exceeds 5%, requesting rollback approval"
HandoffTransfer responsibility to another agent"Code review complete, handing off to deployment agent"

Communication Topology

Centralized (Star):          Peer-to-peer (Mesh):
                              A --- B
    B   C                     |\ /|
     \ /                      | X |
      A  (orchestrator)       |/ \|
     / \                      C --- D
    D   E

Pipeline (Chain):            Hierarchical (Tree):
A --> B --> C --> D                A
                                /   \
                               B     C
                              / \     \
                             D   E     F

Shared State Protocol

# agent_state.py -- Thread-safe shared state (Blackboard pattern)
import threading
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

class SharedAgentState:
    """Shared state space for multi-agent coordination."""

    def __init__(self):
        self._state: dict[str, Any] = {}
        self._lock = threading.RLock()

    def write(self, key: str, value: Any, agent_id: str) -> None:
        with self._lock:
            self._state[key] = {
                "value": value, "updated_by": agent_id,
                "updated_at": datetime.now(timezone.utc).isoformat(),
            }

    def read(self, key: str) -> Any | None:
        with self._lock:
            entry = self._state.get(key)
            return entry["value"] if entry else None

State Management Across Agents

State Strategies by Pattern

StrategyMechanismConsistencyScalability
Pass-throughState object passed as function argumentStrong (single owner)Low (deep copying overhead)
Shared memoryIn-process shared dict with lockingStrong (with locks)Low (single process)
Message queueRedis Streams, Kafka, RabbitMQEventualHigh
DatabasePostgreSQL, DynamoDBStrong or eventual (configurable)High
File systemJSON/YAML files in shared volumeWeak (race conditions)Low
Event sourcingAppend-only log of state changesStrong (replayable)High

State Persistence for Long-Running Agents

# agent-state-config.yaml
state_management:
  backend: redis
  connection: "redis://state.internal:6379/0"
  key_prefix: "agent-state:"
  persistence:
    snapshot_interval: 60s
    snapshot_backend: s3
  isolation:
    strategy: namespace           # {team}:{workflow}:{run_id}
  recovery:
    on_agent_crash: restore_from_snapshot
    on_state_corruption: replay_from_event_log

Multi-Agent Coordination Example

End-to-end example: an incident response pipeline with four coordinating agents using parallel data gathering and sequential analysis.

# incident_response_team.py
import asyncio
from dataclasses import dataclass

@dataclass
class IncidentContext:
    alert_id: str
    service: str
    severity: str
    metrics: dict | None = None
    recent_deploys: list | None = None
    root_cause: str | None = None
    mitigation: str | None = None

async def run_incident_response(alert_id: str, service: str, severity: str):
    ctx = IncidentContext(alert_id=alert_id, service=service, severity=severity)

    # Phase 1: Parallel data gathering (metrics + deploy history agents)
    ctx.metrics, ctx.recent_deploys = await asyncio.gather(
        gather_metrics_agent(ctx),
        gather_deploys_agent(ctx),
    )

    # Phase 2: Sequential analysis (needs data from phase 1)
    ctx.root_cause = await analyze_root_cause_agent(ctx)

    # Phase 3: Mitigation (needs root cause from phase 2)
    ctx.mitigation = await execute_mitigation_agent(ctx)

    # Phase 4: Documentation agent generates postmortem from full context
    return ctx

Anti-Patterns

Anti-PatternProblemFix
Giving agents unrestricted production accessSingle hallucinated command can cause outageUse read-only access by default; require approval gates for writes
No audit trail for agent actionsCannot determine what an agent did or whyLog all tool invocations, decisions, and state changes
Agents calling agents in unbounded loopsInfinite recursion, cost explosion, no convergenceSet max iteration limits, timeout budgets, and cycle detection
Single mega-agent instead of specialized teamContext window overflow, poor at every taskSplit into focused agents with clear responsibilities
Shared state without concurrency controlRace conditions, lost updates, inconsistent stateUse locking, versioned writes, or event sourcing
No fallback when an agent failsEntire pipeline stops on one agent errorImplement retries, circuit breakers, and graceful degradation
Hardcoding agent dependenciesCannot swap implementations or scale independentlyUse discovery (A2A Agent Cards) or dependency injection
Trusting agent output without validationHallucinated data propagates through the pipelineValidate outputs against schemas; add human checkpoints for critical actions
Running all agents on the most expensive modelUnnecessary cost for simple tasksMatch model capability to task complexity (small model for routing, large for analysis)
No resource budgets per agentOne runaway agent consumes all API quota or computeSet per-agent token limits, rate limits, and cost ceilings
Synchronous-only communicationPipeline blocked waiting for slow agentsUse async messaging with status callbacks for long-running tasks
Ignoring agent context window limitsAgents receive truncated context and make poor decisionsSummarize and filter context before passing between agents

Agent Orchestration Readiness Checklist

Infrastructure

  • MCP servers deployed for required external tools (GitHub, K8s, monitoring)
  • MCP server credentials scoped to minimum required permissions
  • Agent communication channel established (A2A, message queue, or direct)
  • State management backend selected and configured (Redis, DB, or file)
  • Logging and audit trail capturing all agent actions
  • Rate limiting configured per agent and per MCP server

Agent Design

  • Each agent has a single, well-defined responsibility
  • Agent system prompts include boundaries (what NOT to do)
  • Model selection matches task complexity (not all tasks need the largest model)
  • Input/output schemas defined for agent communication
  • Error handling and retry logic implemented per agent
  • Maximum iteration and token budgets set per agent

Coordination

  • Orchestration pattern selected and documented (centralized, hierarchical, etc.)
  • Task routing logic tested with representative workloads
  • Handoff protocols defined between agent pairs
  • Shared state access patterns documented with concurrency controls
  • Timeout and circuit breaker thresholds configured
  • Escalation paths defined (agent to agent, agent to human)

Safety and Governance

  • Human-in-the-loop gates for destructive actions (deploy, delete, rollback)
  • Agent outputs validated against schemas before downstream consumption
  • Cost monitoring and alerting configured per agent team
  • Kill switch available to halt all agent activity immediately
  • Regular review of agent decision logs for quality and drift
  • Incident response plan covers agent-caused failures