Claude-skill-registry agent-communication-protocol

Open protocol for AI agent interoperability enabling standardized communication between agents, applications, and humans across different frameworks

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/agent-communication-protocol" ~/.claude/skills/majiayu000-claude-skill-registry-agent-communication-protocol && rm -rf "$T"
manifest: skills/data/agent-communication-protocol/SKILL.md
safety · automated scan (medium risk)
This is a pattern-based risk scan, not a security review. Our crawler flagged:
  • pip install
  • makes HTTP requests (curl)
Always read a skill's source content before installing. Patterns alone don't mean the skill is malicious — but they warrant attention.
source content

Agent Communication Protocol (ACP)

ACP is an open protocol for agent interoperability that solves the growing challenge of connecting AI agents, applications, and humans. It enables standardized communication across different frameworks and infrastructures.

Key Features

  • REST-Based Design - Straightforward HTTP endpoints (not JSON-RPC)
  • Multi-Modal - Supports text, images, audio, video, and any MIME type
  • Async-First - Built primarily for asynchronous communication with sync support
  • Streaming - Real-time Server-Sent Events (SSE) for incremental updates
  • Stateful/Stateless - Sessions for maintaining state across interactions
  • Offline Discovery - Agents remain discoverable through embedded metadata
  • Framework Agnostic - Works with BeeAI, LangGraph, CrewAI, LlamaIndex, and more

Installation

# Create new project with uv
uv init --python '>=3.11' my_acp_project
cd my_acp_project

# Add ACP SDK
uv add acp-sdk

# Or with pip
pip install acp-sdk

Quick Start: Creating an Agent

Basic Echo Agent

import asyncio
from collections.abc import AsyncGenerator

from acp_sdk.models import Message
from acp_sdk.server import Context, RunYield, RunYieldResume, Server

server = Server()

@server.agent()
async def echo(
    input: list[Message], context: Context
) -> AsyncGenerator[RunYield, RunYieldResume]:
    """Echoes everything"""
    for message in input:
        await asyncio.sleep(0.5)
        yield {"thought": "I should echo everything"}
        await asyncio.sleep(0.5)
        yield message

server.run()

Run with:

uv run agent.py

Server starts at

http://localhost:8000
.

Agent with Metadata

@server.agent(
    name="data-analyzer",
    description="Analyzes datasets and generates insights"
)
async def DataAnalyzerAgent(
    input: list[Message], context: Context
) -> AsyncGenerator[RunYield, RunYieldResume]:
    # Implementation
    yield str(response.object)

Agent Manifest

The Agent Manifest describes agent properties for discovery and interoperability.

Required Fields

FieldDescriptionExample
name
Unique RFC 1123 DNS label (1-63 chars)
"chat"
description
Human-readable summary
"Conversational agent with memory"
input_content_types
Supported input MIME types
["text/plain", "application/json"]
output_content_types
Supported output MIME types
["text/plain", "*/*"]

Optional Metadata

@server.agent(
    name="my-agent",
    description="My custom agent",
    metadata={
        "license": "Apache-2.0",
        "programming_language": "python",
        "framework": "beeai",
        "domains": ["finance", "analytics"],
        "capabilities": [
            {"name": "analysis", "description": "Data analysis"}
        ],
        "recommended_models": ["ollama:llama3.1", "openai:gpt-4o"],
        "author": {"name": "Developer", "email": "dev@example.com"}
    }
)

Message Structure

Messages are the fundamental communication units consisting of ordered parts.

Message Roles

  • user
    - User-originated messages
  • agent
    - Generic agent communications
  • agent/{name}
    - Specific agent (e.g.,
    agent/image-analyzer
    )

Message Parts

AttributeRequiredDetails
content_type
YesMIME type (e.g.,
text/plain
,
image/png
)
content
OR
content_url
YesInline or URL-referenced data
content_encoding
No
"plain"
(default) or
"base64"
name
NoMakes part an Artifact
metadata
NoCitations, trajectories

Common MIME Types

  • text/plain
    - Plain text
  • application/json
    - Structured data
  • image/png
    ,
    image/jpeg
    - Images
  • application/pdf
    - Documents
  • */*
    - Any content type

REST API Endpoints

Discovery

# List all agents
curl http://localhost:8000/agents

# Get specific agent manifest
curl http://localhost:8000/agents/{name}

Run Management

MethodEndpointPurpose
POST
/runs
Create and start new run
GET
/runs/{run_id}
Get run status
POST
/runs/{run_id}
Resume awaiting run
POST
/runs/{run_id}/cancel
Cancel run
GET
/runs/{run_id}/events
List run events

Execution Modes

Synchronous - Blocks until completion:

curl -X POST http://localhost:8000/runs \
  -H "Content-Type: application/json" \
  -d '{
    "agent_name": "echo",
    "input": [{"role": "user", "parts": [{"content_type": "text/plain", "content": "Hello"}]}],
    "mode": "sync"
  }'

Asynchronous - Returns run_id for polling:

curl -X POST http://localhost:8000/runs \
  -H "Content-Type: application/json" \
  -d '{"agent_name": "echo", "input": [...], "mode": "async"}'

# Check status
curl http://localhost:8000/runs/{run_id}

Streaming - Server-Sent Events:

curl -N -H "Accept: text/event-stream" -X POST http://localhost:8000/runs \
  -H "Content-Type: application/json" \
  -d '{"agent_name": "echo", "input": [...], "mode": "stream"}'

Python SDK Client

Basic Client Usage

import asyncio
from acp_sdk.client import Client
from acp_sdk.models import Message, MessagePart

async def example() -> None:
    async with Client(base_url="http://localhost:8000") as client:
        # Synchronous execution
        run = await client.run_sync(
            agent="echo",
            input=[
                Message(
                    parts=[MessagePart(
                        content="Hello from client!",
                        content_type="text/plain"
                    )]
                )
            ],
        )
        print(run.output)

if __name__ == "__main__":
    asyncio.run(example())

Agent Discovery

async with Client(base_url="http://localhost:8000") as client:
    async for agent in client.agents():
        print(f"Agent: {agent.name}")
        print(f"Description: {agent.description}")

Asynchronous Execution

async with Client(base_url="http://localhost:8000") as client:
    run = await client.run_async(agent="echo", input=[...])

    # Poll for completion
    result = await client.run_status(run_id=run.run_id)

Streaming Execution

async with Client(base_url="http://localhost:8000") as client:
    async for event in client.run_stream(agent="echo", input=[...]):
        print(event)

Agent Run Lifecycle

Run States

StateDescription
created
Run initiated, processing not started
in-progress
Agent actively processing
awaiting
Paused, waiting for client input
completed
Successfully finished
cancelling
Cancellation in progress
cancelled
Run terminated
failed
Error occurred

State Machine

created → in-progress → completed
                     → failed
                     → cancelling → cancelled
                     → awaiting → in-progress (resume)
                                → failed (timeout)
                                → cancelling → cancelled

Await Mechanism

Agents can pause execution to request additional input:

@server.agent()
async def interactive_agent(input: list[Message], context: Context):
    # Process initial input
    yield {"thought": "Need clarification"}

    # Pause and wait for client
    response = yield AwaitInput(prompt="Please provide more details")

    # Continue with additional input
    yield process_response(response)

Stateful Agents with Sessions

Client-Side Sessions

async with Client(base_url="http://localhost:8000") as client:
    async with client.session() as session:
        # First interaction
        run1 = await session.run_sync(
            agent="chat",
            input=[Message(parts=[MessagePart(content="Hello")])]
        )

        # Second interaction - same session, agent remembers context
        run2 = await session.run_sync(
            agent="chat",
            input=[Message(parts=[MessagePart(content="What did I just say?")])]
        )

Server-Side Session Access

@server.agent()
async def stateful_agent(input: list[Message], context: Context):
    # Load conversation history
    history = [message async for message in context.session.load_history()]

    # Load persisted state
    state = await context.session.load_state()

    # Process with context
    response = process_with_history(input, history, state)

    # Update state
    context.session.state = await context.session.store_state(new_state)

    yield response

Agent Discovery Methods

1. Basic Discovery (Online)

Query running ACP servers directly:

curl http://localhost:8000/agents
async for agent in client.agents():
    print(agent)

2. Open Discovery (Well-Known URL)

Agents publish metadata at standardized URLs:

https://your-domain.com/.well-known/agent.yml

3. Registry-Based Discovery

Centralized registry for discovering agents across multiple servers.

4. Embedded Discovery (Offline)

Metadata embedded in distribution packages (container image labels) for disconnected environments.

Generating Artifacts

Artifacts are named message parts representing files, images, or structured outputs.

Image Artifact

import base64
from io import BytesIO
from PIL import Image
from acp_sdk.models import Artifact

@server.agent()
async def image_generator(input: list[Message], context: Context):
    # Create image
    img = Image.new('RGB', (100, 100), color='red')
    buffer = BytesIO()
    img.save(buffer, format='PNG')
    buffer.seek(0)

    yield Artifact(
        name="generated.png",
        content=base64.b64encode(buffer.read()).decode("utf-8"),
        content_encoding="base64",
        content_type="image/png"
    )

JSON Artifact

import json
from acp_sdk.models import Artifact

@server.agent()
async def data_agent(input: list[Message], context: Context):
    data = {"results": [1, 2, 3], "status": "complete"}

    yield Artifact(
        name="results.json",
        content=json.dumps(data, indent=2),
        content_type="application/json"
    )

Message Metadata

Citation Metadata

For source attribution in RAG agents:

MessagePart(
    content="AI adoption increased by 40%",
    content_type="text/plain",
    metadata={
        "kind": "citation",
        "url": "https://example.com/study",
        "title": "AI Adoption Study 2024",
        "start_index": 0,
        "end_index": 32
    }
)

Trajectory Metadata

For transparency and debugging:

MessagePart(
    content="The weather in SF is 72°F",
    content_type="text/plain",
    metadata={
        "kind": "trajectory",
        "tool_name": "weather_api",
        "tool_input": {"location": "San Francisco, CA"},
        "tool_output": {"temperature": 72, "condition": "sunny"}
    }
)

Wrapping Existing Agents

Integrate any existing agent into ACP regardless of framework:

from acp_sdk.server import Server, Context
from acp_sdk.models import Message

server = Server()

@server.agent()
async def wrapped_agent(input: list[Message], context: Context):
    """Wraps an existing LangChain/CrewAI/custom agent"""

    # Extract text from ACP message
    user_input = input[0].parts[0].content

    # Call your existing agent
    result = await your_existing_agent.run(user_input)

    # Yield as ACP message
    yield Message(parts=[MessagePart(
        content=result,
        content_type="text/plain"
    )])

server.run()

Architecture Patterns

Single-Agent

Client → HTTP → ACP Server → Agent

Multi-Agent Single Server

Client → HTTP → ACP Server → Agent A
                          → Agent B
                          → Agent C

Distributed Multi-Server

Client → Agent Registry → Server 1 → Agent A
                       → Server 2 → Agent B
                       → Server 3 → Agent C

Router Agent Pattern

Client → Router Agent → Decompose task
                     → Route to specialists
                     → Aggregate responses

Example Agents

The ACP repository includes 13 reference implementations:

ExampleFrameworkDescription
Basic Server/ClientACP SDKStandalone implementations
Chat AgentBeeAIReAct with tools, memory, structured output
Slack AgentBeeAI + MCPSlack integration via ACP
RAG AgentLlamaIndexDocument retrieval and synthesis
Prompt ChainingBeeAIMarketing copy → Translation pipeline
Dynamic RoutingBeeAILanguage-specific translation routing
Handoff PatternBeeAIDelegation to specialized agents
Canvas AgentBeeAIArtifact generation for files
Song WriterCrewAIMulti-agent collaboration
GPT ResearcherCustomReal-time research progress
Greeting AgentLangGraphContext-aware greetings
Agent GeneratorACPDynamically creates other agents
Story WriterOpenAIShort story generation

Best Practices

Agent Design

  1. Clear descriptions - Agent docstrings become manifest descriptions
  2. Explicit content types - Declare input/output MIME types
  3. Async generators - Use
    yield
    for streaming intermediate results
  4. Error handling - Return meaningful error messages

Message Handling

  1. Validate content types - Check incoming MIME types
  2. Use artifacts for files - Named parts for downloadable content
  3. Include metadata - Citations and trajectories for transparency

Session Management

  1. Use sessions for context - Maintain conversation history
  2. Store minimal state - Keep session state lightweight
  3. Handle timeouts - Implement graceful timeout handling

Discovery

  1. Publish manifests - Use well-known URLs for open discovery
  2. Rich metadata - Include capabilities, domains, tags
  3. Version agents - Track changes with semantic versioning

Resources

Governance

ACP is an open standard under the Linux Foundation, developed alongside BeeAI as its reference implementation. Community-driven development ensures vendor-neutral evolution.

Comparison with MCP

FeatureACPMCP
FocusAgent-to-agent interopTool/resource integration
TransportHTTP RESTJSON-RPC, stdio, SSE
DiscoveryBuilt-in (manifest, registry)External
SessionsNative supportNot built-in
Multi-modalNative (MIME types)Limited
GovernanceLinux FoundationAnthropic

ACP and MCP are complementary - ACP handles agent communication while MCP provides tool integration. ACP includes MCP extension support for exposing agent tools.

AI Agent Fundamentals

Understanding AI agents helps design effective ACP-based systems.

What is an AI Agent?

An AI agent is a system that autonomously performs tasks by designing workflows with available tools. Unlike simple chatbots, agents handle decision-making, problem-solving, and environmental interaction through three stages:

  1. Goal Initialization & Planning - Decompose complex goals into subtasks
  2. Reasoning with Tools - Leverage databases, APIs, other agents to bridge knowledge gaps
  3. Learning & Reflection - Refine responses through feedback and store solutions

Agent Types (Complexity Spectrum)

TypeCharacteristicsACP Use Case
Simple ReflexRule-based responses; no memoryBasic request handlers
Model-Based ReflexMaintains internal world modelContext-aware agents
Goal-BasedSearches for action sequencesTask orchestrators
Utility-BasedMaximizes reward across pathsOptimization agents
LearningAutonomous knowledge expansionAdaptive systems

Agentic vs Non-Agentic Systems

AspectNon-Agentic (Chatbot)Agentic (ACP Agent)
ToolsNoneFull tool access
MemorySession onlyPersistent state
ReasoningSingle responseMulti-step planning
AutonomyRequires user inputSelf-directed
AdaptationStaticLearns from feedback

Reasoning Paradigms

ReAct Pattern (Think-Act-Observe)

Step-by-step problem resolution with continuous context updates:

@server.agent()
async def react_agent(input: list[Message], context: Context):
    """ReAct agent with think-act-observe loop"""

    # Think: Analyze the request
    yield {"thought": "Analyzing user request for data analysis"}

    # Act: Call tool
    data = await fetch_data_tool(input[0].parts[0].content)
    yield {"thought": f"Retrieved {len(data)} records"}

    # Observe: Process results
    analysis = analyze(data)
    yield {"thought": "Analysis complete, formatting response"}

    # Final output
    yield Message(parts=[MessagePart(
        content=json.dumps(analysis),
        content_type="application/json"
    )])

ReWOO Pattern (Upfront Planning)

Plan all steps before execution, reducing token usage:

@server.agent()
async def rewoo_agent(input: list[Message], context: Context):
    """ReWOO agent with upfront planning"""

    # Plan phase: Generate complete execution plan
    plan = [
        {"step": 1, "action": "fetch_data", "params": {...}},
        {"step": 2, "action": "transform", "params": {...}},
        {"step": 3, "action": "summarize", "params": {...}}
    ]
    yield {"plan": plan}

    # Execute phase: Run all steps
    results = []
    for step in plan:
        result = await execute_step(step)
        results.append(result)

    yield Message(parts=[MessagePart(
        content=json.dumps(results[-1]),
        content_type="application/json"
    )])

Multi-Agent Orchestration

Hierarchical Agent Architecture

                    ┌─────────────────┐
                    │  Orchestrator   │
                    │  (Goal-Based)   │
                    └────────┬────────┘
                             │
         ┌───────────────────┼───────────────────┐
         │                   │                   │
         ▼                   ▼                   ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│  Research Agent │ │  Analysis Agent │ │  Writer Agent   │
│  (Learning)     │ │  (Utility)      │ │  (Model-Based)  │
└─────────────────┘ └─────────────────┘ └─────────────────┘

Orchestrator Agent Example

@server.agent(
    name="orchestrator",
    description="Coordinates specialized agents for complex tasks"
)
async def orchestrator(input: list[Message], context: Context):
    """Multi-agent orchestrator using ACP"""

    # Decompose task
    task = input[0].parts[0].content
    subtasks = await decompose_task(task)

    yield {"thought": f"Decomposed into {len(subtasks)} subtasks"}

    # Route to specialist agents
    async with Client(base_url="http://localhost:8000") as client:
        results = []
        for subtask in subtasks:
            agent = select_specialist(subtask)
            run = await client.run_sync(
                agent=agent,
                input=[Message(parts=[MessagePart(
                    content=subtask["description"],
                    content_type="text/plain"
                )])]
            )
            results.append(run.output)
            yield {"thought": f"Completed: {subtask['name']}"}

    # Aggregate responses
    final = synthesize_results(results)
    yield Message(parts=[MessagePart(
        content=final,
        content_type="text/plain"
    )])

Agent-to-Agent Communication

@server.agent()
async def delegating_agent(input: list[Message], context: Context):
    """Agent that delegates to other ACP agents"""

    async with Client(base_url="http://localhost:8000") as client:
        # Discover available agents
        specialists = []
        async for agent in client.agents():
            if "analysis" in agent.description.lower():
                specialists.append(agent.name)

        yield {"thought": f"Found {len(specialists)} analysis agents"}

        # Delegate to best match
        if specialists:
            run = await client.run_sync(
                agent=specialists[0],
                input=input
            )
            yield run.output[0]
        else:
            yield Message(parts=[MessagePart(
                content="No suitable agent found",
                content_type="text/plain"
            )])

Production Safety Patterns

Activity Logging

import logging
from datetime import datetime

logger = logging.getLogger("acp_audit")

@server.agent()
async def audited_agent(input: list[Message], context: Context):
    """Agent with comprehensive activity logging"""

    run_id = context.run_id
    start_time = datetime.utcnow()

    # Log start
    logger.info(f"[{run_id}] Agent started", extra={
        "run_id": run_id,
        "input_size": len(input),
        "timestamp": start_time.isoformat()
    })

    try:
        # Process
        result = await process(input)

        # Log tool usage
        logger.info(f"[{run_id}] Tool called", extra={
            "run_id": run_id,
            "tool": "process",
            "duration_ms": (datetime.utcnow() - start_time).total_seconds() * 1000
        })

        yield result

    except Exception as e:
        logger.error(f"[{run_id}] Agent failed: {e}", extra={
            "run_id": run_id,
            "error": str(e)
        })
        raise

    finally:
        logger.info(f"[{run_id}] Agent completed", extra={
            "run_id": run_id,
            "duration_ms": (datetime.utcnow() - start_time).total_seconds() * 1000
        })

Interruptibility (Graceful Cancellation)

import asyncio

@server.agent()
async def interruptible_agent(input: list[Message], context: Context):
    """Agent that supports graceful interruption"""

    for i, step in enumerate(processing_steps):
        # Check for cancellation between steps
        if context.is_cancelled:
            yield {"thought": "Cancellation requested, stopping gracefully"}
            # Cleanup resources
            await cleanup()
            return

        yield {"thought": f"Processing step {i+1}/{len(processing_steps)}"}

        # Use asyncio.shield for critical operations
        result = await asyncio.shield(step.execute())

        yield {"progress": (i + 1) / len(processing_steps)}

    yield final_result

Human Approval Gates

@server.agent()
async def approval_agent(input: list[Message], context: Context):
    """Agent requiring human approval for high-impact actions"""

    action = parse_action(input)

    # Check if action requires approval
    if action.requires_approval:
        yield {"thought": "This action requires human approval"}

        # Pause and wait for approval
        approval = yield AwaitInput(
            prompt=f"Approve action: {action.description}? (yes/no)",
            metadata={"action_type": action.type, "impact": "high"}
        )

        if approval.lower() != "yes":
            yield Message(parts=[MessagePart(
                content="Action cancelled by user",
                content_type="text/plain"
            )])
            return

    # Execute approved action
    result = await action.execute()
    yield result

Preventing Infinite Loops

MAX_ITERATIONS = 10
MAX_TOOL_CALLS = 50

@server.agent()
async def bounded_agent(input: list[Message], context: Context):
    """Agent with loop prevention"""

    iterations = 0
    tool_calls = 0
    seen_states = set()

    while not is_complete():
        iterations += 1

        # Iteration limit
        if iterations > MAX_ITERATIONS:
            yield {"warning": "Max iterations reached"}
            break

        # State cycle detection
        current_state = hash(frozenset(context.state.items()))
        if current_state in seen_states:
            yield {"warning": "Cycle detected, breaking loop"}
            break
        seen_states.add(current_state)

        # Tool call limit
        if tool_calls >= MAX_TOOL_CALLS:
            yield {"warning": "Max tool calls reached"}
            break

        result = await process_step()
        tool_calls += result.tool_call_count

        yield {"thought": f"Iteration {iterations}, tools: {tool_calls}"}

    yield final_result()

Enterprise Use Cases

Cross-Platform Integration

Connect agents across different business systems:

@server.agent(
    name="crm-integration",
    description="Integrates CRM, analytics, and support systems",
    metadata={
        "domains": ["sales", "support", "analytics"],
        "capabilities": [
            {"name": "customer-lookup", "description": "Find customer data"},
            {"name": "ticket-creation", "description": "Create support tickets"},
            {"name": "report-generation", "description": "Generate analytics reports"}
        ]
    }
)
async def crm_agent(input: list[Message], context: Context):
    """Enterprise CRM integration agent"""
    # Implementation
    pass

Agent Replacement Pattern

Swap agents seamlessly in production:

# Old agent (being replaced)
@server.agent(name="search-v1", description="Legacy search")
async def search_v1(input, context):
    return legacy_search(input)

# New agent (replacement) - same interface
@server.agent(name="search-v2", description="AI-powered search")
async def search_v2(input, context):
    return ai_search(input)

# Client code unchanged - just update agent name
run = await client.run_sync(agent="search-v2", input=input)

Inter-Organizational Collaboration

Secure agent partnerships between companies:

@server.agent(
    name="partner-gateway",
    description="Secure gateway for partner agent access",
    metadata={
        "security": {
            "auth": "oauth2",
            "allowed_origins": ["partner-a.com", "partner-b.com"],
            "rate_limit": "100/hour"
        }
    }
)
async def partner_agent(input: list[Message], context: Context):
    """Agent for inter-organizational communication"""

    # Verify partner identity
    partner_id = context.metadata.get("partner_id")
    if not validate_partner(partner_id):
        yield Message(parts=[MessagePart(
            content="Unauthorized",
            content_type="text/plain"
        )])
        return

    # Process partner request with audit
    yield {"audit": f"Partner {partner_id} request received"}
    result = await process_partner_request(input)
    yield result

Protocol Ecosystem

ACP works alongside other agent communication standards:

ProtocolPurposeRelationship to ACP
ACPAgent-to-agent interopCore protocol
MCPAgent-to-tool integrationComplementary (ACP supports MCP extension)
A2ADirect agent messagingAlternative for peer-to-peer
Agent Client ProtocolEditor-to-agentDifferent domain (IDE integration)

Using ACP with MCP Tools

from acp_sdk.extensions import MCPExtension

@server.agent(extensions=[MCPExtension()])
async def mcp_enabled_agent(input: list[Message], context: Context):
    """Agent that exposes MCP tools via ACP"""

    # Access MCP tools through ACP context
    tools = context.mcp.available_tools()

    # Call MCP tool
    result = await context.mcp.call_tool("web_search", query="...")

    yield result