Claude-Skills agent-workflow-designer

install
source · Clone the upstream repo
git clone https://github.com/borghei/Claude-Skills
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/borghei/Claude-Skills "$T" && mkdir -p ~/.claude/skills && cp -r "$T/engineering/agent-workflow-designer" ~/.claude/skills/borghei-claude-skills-agent-workflow-designer && rm -rf "$T"
manifest: engineering/agent-workflow-designer/SKILL.md
source content

Agent Workflow Designer

The agent designs multi-agent orchestration systems using five core patterns: sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, and consensus validation. It implements agent routing strategies, circuit breaker reliability patterns, context window budgeting, and cost optimization across LangGraph, CrewAI, AutoGen, and Claude Code agent teams.

Core Capabilities

1. Pattern Selection and Design

  • Sequential pipelines with typed handoffs
  • Parallel fan-out/fan-in with merge strategies
  • Hierarchical delegation with dynamic subtask discovery
  • Event-driven reactors with pub/sub agent triggers
  • Consensus validation with voting and arbitration

2. Agent Routing

  • Intent-based routing with classifier agents
  • Skill-based routing using capability matching
  • Cost-aware routing (cheap models for simple tasks)
  • Load-balanced routing across agent pools
  • Fallback chains with graceful degradation

3. State and Context Management

  • Persistent workflow state across agent hops
  • Context window budgeting and summarization
  • Checkpoint/resume for long-running workflows
  • Conflict resolution for parallel state updates

4. Reliability Engineering

  • Circuit breakers for failing agents
  • Retry with exponential backoff and model fallback
  • Dead letter queues for unprocessable tasks
  • Timeout enforcement at every agent boundary
  • Idempotent operations for safe retries

When to Use

  • Building multi-step AI pipelines that exceed one agent's capability
  • Parallelizing research, analysis, or generation tasks
  • Creating specialist agent teams with defined roles and contracts
  • Designing fault-tolerant AI workflows for production deployment
  • Optimizing cost across workflows with mixed model tiers

Pattern Selection Decision Tree

What does the workflow look like?
│
├─ Linear: step A feeds step B feeds step C
│  └─ SEQUENTIAL PIPELINE
│     Best for: content pipelines, code review chains, data transformation
│
├─ Parallel: N independent tasks, then combine
│  └─ FAN-OUT / FAN-IN
│     Best for: competitive research, multi-source analysis, parallel code gen
│
├─ Tree: orchestrator breaks work into subtasks dynamically
│  └─ HIERARCHICAL DELEGATION
│     Best for: complex projects, open-ended research, code generation with planning
│
├─ Reactive: agents respond to events/triggers
│  └─ EVENT-DRIVEN REACTOR
│     Best for: monitoring, alerting, continuous integration, chat workflows
│
└─ Verification: multiple agents must agree on output
   └─ CONSENSUS VALIDATION
      Best for: high-stakes decisions, code review, fact checking, safety-critical output

Pattern 1: Sequential Pipeline

Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.

LangGraph Implementation

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_anthropic import ChatAnthropic

class PipelineState(TypedDict):
    topic: str
    research: str
    draft: str
    final: str
    stage_costs: Annotated[list[dict], "append"]  # accumulates cost per stage

def research_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048)
    result = model.invoke(
        f"Research the following topic thoroughly. Provide key facts, statistics, "
        f"and expert perspectives:\n\n{state['topic']}"
    )
    return {
        "research": result.content,
        "stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}],
    }

def writing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Using this research, write a compelling 800-word blog post with a hook, "
        f"3 main sections, and a CTA:\n\n{state['research']}"
    )
    return {
        "draft": result.content,
        "stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}],
    }

def editing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Edit this draft for clarity, flow, and grammar. Return only the improved "
        f"version:\n\n{state['draft']}"
    )
    return {
        "final": result.content,
        "stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}],
    }

# Build the graph
graph = StateGraph(PipelineState)
graph.add_node("research", research_stage)
graph.add_node("write", writing_stage)
graph.add_node("edit", editing_stage)
graph.add_edge("research", "write")
graph.add_edge("write", "edit")
graph.add_edge("edit", END)
graph.set_entry_point("research")

pipeline = graph.compile()

# Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"})
print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")

Pattern 2: Parallel Fan-Out / Fan-In

Independent tasks run concurrently. A merge function combines results.

import asyncio
from dataclasses import dataclass

@dataclass
class FanOutTask:
    name: str
    system_prompt: str
    user_message: str
    model: str = "claude-sonnet-4-20250514"

@dataclass
class FanOutResult:
    task_name: str
    output: str
    tokens_used: int
    success: bool
    error: str | None = None

async def fan_out_fan_in(
    tasks: list[FanOutTask],
    merge_prompt: str,
    max_concurrent: int = 5,
    timeout_seconds: float = 60.0,
) -> dict:
    """Execute tasks in parallel with concurrency limit and timeout."""
    import anthropic

    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_one(task: FanOutTask) -> FanOutResult:
        async with semaphore:
            try:
                response = await asyncio.wait_for(
                    client.messages.create(
                        model=task.model,
                        max_tokens=2048,
                        system=task.system_prompt,
                        messages=[{"role": "user", "content": task.user_message}],
                    ),
                    timeout=timeout_seconds,
                )
                return FanOutResult(
                    task_name=task.name,
                    output=response.content[0].text,
                    tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                    success=True,
                )
            except Exception as e:
                return FanOutResult(
                    task_name=task.name, output="", tokens_used=0,
                    success=False, error=str(e),
                )

    # FAN-OUT: run all tasks concurrently
    results = await asyncio.gather(*[run_one(t) for t in tasks])
    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    if not successful:
        raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")

    # FAN-IN: merge results
    combined = "\n\n---\n\n".join(
        f"## {r.task_name}\n{r.output}" for r in successful
    )

    merge_response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        system="Synthesize the following parallel analyses into a unified report.",
        messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
    )

    return {
        "synthesis": merge_response.content[0].text,
        "individual_results": successful,
        "failures": failed,
        "total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
    }

Pattern 3: Hierarchical Delegation

An orchestrator agent dynamically decomposes work and delegates to specialists.

from typing import Literal

SPECIALISTS = {
    "researcher": "Find accurate information with sources. Be thorough and cite evidence.",
    "coder": "Write clean, tested code. Include error handling and type hints.",
    "writer": "Create clear, engaging content. Match the requested tone and format.",
    "analyst": "Analyze data and produce evidence-backed conclusions with visualizations.",
    "reviewer": "Review work product for quality, accuracy, and completeness.",
}

@dataclass
class SubTask:
    id: str
    agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"]
    task: str
    depends_on: list[str]
    priority: int = 0  # higher = run first when deps are equal

class HierarchicalOrchestrator:
    def __init__(self, client):
        self.client = client

    async def plan(self, request: str) -> list[SubTask]:
        """Orchestrator creates an execution plan with dependencies."""
        response = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())}
Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}}
Rules:
- Minimize the number of subtasks (prefer fewer, more substantial tasks)
- Only add dependencies when output is genuinely needed
- Independent tasks should have empty depends_on for parallel execution""",
            messages=[{"role": "user", "content": request}],
        )
        import json
        plan = json.loads(response.content[0].text)
        return [SubTask(**st) for st in plan["subtasks"]]

    async def execute(self, request: str) -> str:
        """Plan, execute with dependency resolution, and synthesize."""
        subtasks = await self.plan(request)
        results = {}

        # Execute in dependency order, parallelize where possible
        for batch in self._batch_by_dependencies(subtasks):
            batch_results = await asyncio.gather(*[
                self._run_specialist(st, results) for st in batch
            ])
            for st, result in zip(batch, batch_results):
                results[st.id] = result

        # Final synthesis
        all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
        synthesis = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            system="Synthesize specialist outputs into a coherent final response.",
            messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
        )
        return synthesis.content[0].text

    def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]:
        """Group subtasks into batches that can run in parallel."""
        completed = set()
        remaining = list(subtasks)
        batches = []
        while remaining:
            batch = [t for t in remaining if all(d in completed for d in t.depends_on)]
            if not batch:
                raise ValueError("Circular dependency detected in subtask plan")
            batches.append(sorted(batch, key=lambda t: -t.priority))
            completed.update(t.id for t in batch)
            remaining = [t for t in remaining if t.id not in completed]
        return batches

Pattern 4: Event-Driven Reactor

Agents react to events from a message bus. Decoupled and scalable.

from collections import defaultdict
from typing import Callable, Any

class AgentEventBus:
    """Simple event bus for agent-to-agent communication."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = defaultdict(list)
        self._history: list[dict] = []

    def subscribe(self, event_type: str, handler: Callable):
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: Any, source: str):
        event = {"type": event_type, "payload": payload, "source": source}
        self._history.append(event)
        handlers = self._handlers.get(event_type, [])
        results = await asyncio.gather(
            *[h(event) for h in handlers],
            return_exceptions=True,
        )
        errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
        if errors:
            for handler, error in errors:
                print(f"Handler {handler.__name__} failed: {error}")
        return results

# Usage: code review pipeline triggered by PR events
bus = AgentEventBus()

async def on_pr_opened(event):
    """Security agent scans PR for vulnerabilities."""
    diff = event["payload"]["diff"]
    # ... scan and publish results
    await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")

async def on_security_complete(event):
    """Review agent incorporates security findings into review."""
    # ... generate review with security context

bus.subscribe("pr_opened", on_pr_opened)
bus.subscribe("security_scan_complete", on_security_complete)

Pattern 5: Consensus Validation

Multiple agents independently evaluate the same input. A quorum determines the final output.

@dataclass
class Vote:
    agent: str
    verdict: str  # "approve" | "reject" | "revise"
    confidence: float  # 0.0 - 1.0
    reasoning: str

async def consensus_validate(
    content: str,
    validators: list[dict],  # [{"name": "...", "system": "..."}]
    quorum: float = 0.66,
    confidence_threshold: float = 0.7,
) -> dict:
    """Run content through multiple validators and determine consensus."""
    votes: list[Vote] = []

    # Collect independent votes (no agent sees another's vote)
    vote_tasks = []
    for v in validators:
        vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
    raw_votes = await asyncio.gather(*vote_tasks)
    votes = [v for v in raw_votes if v is not None]

    # Calculate consensus
    approvals = [v for v in votes if v.verdict == "approve"]
    approval_rate = len(approvals) / len(votes) if votes else 0
    avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0

    if approval_rate >= quorum and avg_confidence >= confidence_threshold:
        return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
    elif any(v.verdict == "reject" for v in votes):
        rejections = [v for v in votes if v.verdict == "reject"]
        return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
    else:
        return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}

Agent Routing Strategies

Intent-Based Router

class IntentRouter:
    """Route requests to specialized agents based on intent classification."""

    ROUTING_TABLE = {
        "code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
        "code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
        "research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
        "simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
        "creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
        "complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
    }

    async def route(self, message: str) -> dict:
        # Use a fast, cheap model for classification
        classification = await self.client.messages.create(
            model="claude-haiku-4-20250514",
            max_tokens=50,
            system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
            messages=[{"role": "user", "content": message}],
        )
        intent = classification.content[0].text.strip().lower()
        return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])

Context Window Budgeting

MODEL_LIMITS = {
    "claude-sonnet-4-20250514": 200_000,
    "claude-haiku-4-20250514": 200_000,
    "claude-opus-4-20250514": 200_000,
    "gpt-4o": 128_000,
}

class ContextBudget:
    def __init__(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15):
        self.total = MODEL_LIMITS.get(model, 128_000)
        self.reserve = int(self.total * reserve_pct)
        self.per_stage = (self.total - self.reserve) // pipeline_stages
        self.used = 0

    def allocate(self, stage: str) -> int:
        available = self.total - self.reserve - self.used
        allocation = min(self.per_stage, int(available * 0.6))
        return max(allocation, 1000)  # minimum 1000 tokens per stage

    def consume(self, tokens: int):
        self.used += tokens

    def summarize_if_needed(self, text: str, budget: int) -> str:
        estimated_tokens = len(text) // 4
        if estimated_tokens <= budget:
            return text
        # Truncate to budget with marker
        char_limit = budget * 4
        return text[:char_limit] + "\n\n[Content truncated to fit context budget]"

Cost Optimization Matrix

StrategyCost ReductionQuality ImpactWhen to Use
Haiku for routing/classification85-90%MinimalAlways for intent routing
Haiku for editing/formatting60-70%LowMechanical tasks
Sonnet for most stagesBaselineBaselineDefault choice
Opus only for final synthesis+50% on that stageHigher qualityHigh-stakes output
Prompt caching (system prompts)50-90% per callNoneRepeated system prompts
Truncate intermediate outputs20-40%May lose detailLong pipelines
Parallel + early termination30-50%None if threshold metSearch/validation tasks
Batch similar requestsUp to 50%Increased latencyNon-real-time workloads

Reliability Patterns

Circuit Breaker

import time

class CircuitBreaker:
    """Prevent cascading failures when an agent/model is down."""

    def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.state = "closed"  # closed = healthy, open = failing, half-open = testing
        self.last_failure_time = 0.0

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_time:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one test request

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"

Common Pitfalls

  • Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value
  • Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution
  • Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed
  • No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
  • Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths
  • Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
  • Stateless retries on stateful operations — ensure idempotency before enabling automatic retries
  • Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails

Best Practices

  1. Start with a single prompt — only add agents when you prove one cannot handle the task
  2. Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings
  3. Budget context upfront — calculate token allocations before running the pipeline
  4. Use cheap models for routing — Haiku for classification costs 10x less than Sonnet
  5. Validate DAG structure at build time, not runtime
  6. Log every agent call with input hash, output hash, tokens, latency, and cost
  7. Set SLAs per stage — if research takes >30s, timeout and use cached results
  8. Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000

Troubleshooting

ProblemCauseSolution
Pipeline hangs indefinitelyMissing timeout enforcement on one or more agent stagesAdd
asyncio.wait_for()
with explicit
timeout_seconds
at every agent boundary; use the Circuit Breaker pattern to fail fast
Circular dependency error at runtimeSubtask graph contains a cycle (e.g., task A depends on B which depends on A)Validate DAG structure at build time with topological sort; the
_batch_by_dependencies
method catches this but validation should happen earlier
Context window exceeded mid-pipelineIntermediate outputs grow beyond the model's token limitUse the
ContextBudget
class to allocate tokens per stage; summarize or truncate outputs before passing to the next stage
Fan-out tasks return inconsistent formatsEach parallel agent interprets the output schema differentlyDefine a shared
TypedDict
or
dataclass
for all fan-out results; add a validation step before the merge function
Orchestrator plan creates too many subtasksThe planning prompt does not constrain subtask count, leading to over-decompositionAdd explicit constraints in the planner system prompt (e.g., "maximum 5 subtasks"); review and approve plans before execution in high-stakes workflows
Consensus never reaches quorumValidators disagree consistently or confidence scores are too lowLower the
quorum
threshold, add a tiebreaker agent, or revise validator prompts to align on evaluation criteria
Cost spikes on parallel workflowsExpensive models (Opus) used for all fan-out branches instead of routing by complexityApply cost-aware routing: use Haiku for classification and simple tasks, Sonnet for most work, Opus only for final synthesis or high-stakes decisions

Success Criteria

  • Pipeline end-to-end latency stays within the defined SLA (e.g., under 60 seconds for a 5-stage workflow) with no stage exceeding its individual timeout
  • Agent routing accuracy exceeds 90% when measured against a labeled test set of at least 100 representative requests
  • Fan-out/fan-in workflows complete with fewer than 5% task failures across all parallel branches under normal operating conditions
  • Total token cost per workflow run decreases by at least 40% after applying model tiering (Haiku for routing, Sonnet for core work, Opus for synthesis)
  • Circuit breakers trigger correctly within 5 consecutive failures and recover automatically after the defined recovery window
  • Context window utilization stays below 85% of model limits at every pipeline stage, with no truncation-related quality degradation
  • All inter-agent handoffs pass schema validation with zero type errors across 100 consecutive workflow executions

Scope & Limitations

This skill covers:

  • Design and implementation of five core multi-agent orchestration patterns (sequential, parallel, hierarchical, event-driven, consensus)
  • Agent routing strategies including intent-based, skill-based, and cost-aware routing
  • Reliability engineering patterns: circuit breakers, retries, timeouts, and dead letter queues
  • Context window budgeting, cost optimization, and framework-specific implementations (LangGraph, CrewAI, AutoGen)

This skill does NOT cover:

  • Training or fine-tuning the underlying LLMs used by agents (see
    engineering/ml-pipeline-architect
    for ML training workflows)
  • Infrastructure provisioning, container orchestration, or deployment pipelines (see
    engineering/cloud-infrastructure-designer
    for cloud architecture)
  • Human-in-the-loop approval workflows or UI design for agent dashboards (see
    product-team/ux-researcher
    for user-facing workflow design)
  • Long-term agent memory, vector database setup, or RAG pipeline construction (see
    engineering/rag-pipeline-architect
    for retrieval-augmented generation)

Integration Points

SkillIntegrationData Flow
engineering/ml-pipeline-architect
Agent workflows that include ML inference stages use ML Pipeline Architect for model serving and batch prediction designWorkflow DAG exports stage specs to ML pipeline; ML pipeline returns inference endpoints for agent consumption
engineering/rag-pipeline-architect
Research and retrieval agents within workflows rely on RAG pipelines for grounded knowledge accessAgent sends queries to RAG pipeline; RAG returns ranked document chunks with citations for agent context
engineering/cloud-infrastructure-designer
Production deployment of agent workflows requires infrastructure design for scaling, queuing, and monitoringWorkflow resource requirements feed into infrastructure specs; infra returns endpoint URLs, queue ARNs, and scaling policies
engineering/api-design-architect
Inter-agent communication contracts and external API boundaries follow API design standardsAgent handoff schemas are validated against API design specs; API architect provides OpenAPI definitions for external integrations
engineering/system-design-architect
Overall system architecture decisions (sync vs async, monolith vs distributed) shape workflow topology choicesSystem design constraints (latency budgets, availability targets) inform pattern selection; workflow requirements feed back into system capacity planning
project-management/technical-project-planning
Complex multi-agent projects require structured planning for phased rollout, risk management, and milestone trackingWorkflow complexity estimates feed into project plans; PM skill provides sprint boundaries and dependency timelines for staged deployment