Claude-skill-registry agent-communication-protocol
Open protocol for AI agent interoperability enabling standardized communication between agents, applications, and humans across different frameworks
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/agent-communication-protocol" ~/.claude/skills/majiayu000-claude-skill-registry-agent-communication-protocol && rm -rf "$T"
skills/data/agent-communication-protocol/SKILL.md- pip install
- makes HTTP requests (curl)
Agent Communication Protocol (ACP)
ACP is an open protocol for agent interoperability that solves the growing challenge of connecting AI agents, applications, and humans. It enables standardized communication across different frameworks and infrastructures.
Key Features
- REST-Based Design - Straightforward HTTP endpoints (not JSON-RPC)
- Multi-Modal - Supports text, images, audio, video, and any MIME type
- Async-First - Built primarily for asynchronous communication with sync support
- Streaming - Real-time Server-Sent Events (SSE) for incremental updates
- Stateful/Stateless - Sessions for maintaining state across interactions
- Offline Discovery - Agents remain discoverable through embedded metadata
- Framework Agnostic - Works with BeeAI, LangGraph, CrewAI, LlamaIndex, and more
Installation
# Create new project with uv uv init --python '>=3.11' my_acp_project cd my_acp_project # Add ACP SDK uv add acp-sdk # Or with pip pip install acp-sdk
Quick Start: Creating an Agent
Basic Echo Agent
import asyncio from collections.abc import AsyncGenerator from acp_sdk.models import Message from acp_sdk.server import Context, RunYield, RunYieldResume, Server server = Server() @server.agent() async def echo( input: list[Message], context: Context ) -> AsyncGenerator[RunYield, RunYieldResume]: """Echoes everything""" for message in input: await asyncio.sleep(0.5) yield {"thought": "I should echo everything"} await asyncio.sleep(0.5) yield message server.run()
Run with:
uv run agent.py
Server starts at
http://localhost:8000.
Agent with Metadata
@server.agent( name="data-analyzer", description="Analyzes datasets and generates insights" ) async def DataAnalyzerAgent( input: list[Message], context: Context ) -> AsyncGenerator[RunYield, RunYieldResume]: # Implementation yield str(response.object)
Agent Manifest
The Agent Manifest describes agent properties for discovery and interoperability.
Required Fields
| Field | Description | Example |
|---|---|---|
| Unique RFC 1123 DNS label (1-63 chars) | |
| Human-readable summary | |
| Supported input MIME types | |
| Supported output MIME types | |
Optional Metadata
@server.agent( name="my-agent", description="My custom agent", metadata={ "license": "Apache-2.0", "programming_language": "python", "framework": "beeai", "domains": ["finance", "analytics"], "capabilities": [ {"name": "analysis", "description": "Data analysis"} ], "recommended_models": ["ollama:llama3.1", "openai:gpt-4o"], "author": {"name": "Developer", "email": "dev@example.com"} } )
Message Structure
Messages are the fundamental communication units consisting of ordered parts.
Message Roles
- User-originated messagesuser
- Generic agent communicationsagent
- Specific agent (e.g.,agent/{name}
)agent/image-analyzer
Message Parts
| Attribute | Required | Details |
|---|---|---|
| Yes | MIME type (e.g., , ) |
OR | Yes | Inline or URL-referenced data |
| No | (default) or |
| No | Makes part an Artifact |
| No | Citations, trajectories |
Common MIME Types
- Plain texttext/plain
- Structured dataapplication/json
,image/png
- Imagesimage/jpeg
- Documentsapplication/pdf
- Any content type*/*
REST API Endpoints
Discovery
# List all agents curl http://localhost:8000/agents # Get specific agent manifest curl http://localhost:8000/agents/{name}
Run Management
| Method | Endpoint | Purpose |
|---|---|---|
| | Create and start new run |
| | Get run status |
| | Resume awaiting run |
| | Cancel run |
| | List run events |
Execution Modes
Synchronous - Blocks until completion:
curl -X POST http://localhost:8000/runs \ -H "Content-Type: application/json" \ -d '{ "agent_name": "echo", "input": [{"role": "user", "parts": [{"content_type": "text/plain", "content": "Hello"}]}], "mode": "sync" }'
Asynchronous - Returns run_id for polling:
curl -X POST http://localhost:8000/runs \ -H "Content-Type: application/json" \ -d '{"agent_name": "echo", "input": [...], "mode": "async"}' # Check status curl http://localhost:8000/runs/{run_id}
Streaming - Server-Sent Events:
curl -N -H "Accept: text/event-stream" -X POST http://localhost:8000/runs \ -H "Content-Type: application/json" \ -d '{"agent_name": "echo", "input": [...], "mode": "stream"}'
Python SDK Client
Basic Client Usage
import asyncio from acp_sdk.client import Client from acp_sdk.models import Message, MessagePart async def example() -> None: async with Client(base_url="http://localhost:8000") as client: # Synchronous execution run = await client.run_sync( agent="echo", input=[ Message( parts=[MessagePart( content="Hello from client!", content_type="text/plain" )] ) ], ) print(run.output) if __name__ == "__main__": asyncio.run(example())
Agent Discovery
async with Client(base_url="http://localhost:8000") as client: async for agent in client.agents(): print(f"Agent: {agent.name}") print(f"Description: {agent.description}")
Asynchronous Execution
async with Client(base_url="http://localhost:8000") as client: run = await client.run_async(agent="echo", input=[...]) # Poll for completion result = await client.run_status(run_id=run.run_id)
Streaming Execution
async with Client(base_url="http://localhost:8000") as client: async for event in client.run_stream(agent="echo", input=[...]): print(event)
Agent Run Lifecycle
Run States
| State | Description |
|---|---|
| Run initiated, processing not started |
| Agent actively processing |
| Paused, waiting for client input |
| Successfully finished |
| Cancellation in progress |
| Run terminated |
| Error occurred |
State Machine
created → in-progress → completed → failed → cancelling → cancelled → awaiting → in-progress (resume) → failed (timeout) → cancelling → cancelled
Await Mechanism
Agents can pause execution to request additional input:
@server.agent() async def interactive_agent(input: list[Message], context: Context): # Process initial input yield {"thought": "Need clarification"} # Pause and wait for client response = yield AwaitInput(prompt="Please provide more details") # Continue with additional input yield process_response(response)
Stateful Agents with Sessions
Client-Side Sessions
async with Client(base_url="http://localhost:8000") as client: async with client.session() as session: # First interaction run1 = await session.run_sync( agent="chat", input=[Message(parts=[MessagePart(content="Hello")])] ) # Second interaction - same session, agent remembers context run2 = await session.run_sync( agent="chat", input=[Message(parts=[MessagePart(content="What did I just say?")])] )
Server-Side Session Access
@server.agent() async def stateful_agent(input: list[Message], context: Context): # Load conversation history history = [message async for message in context.session.load_history()] # Load persisted state state = await context.session.load_state() # Process with context response = process_with_history(input, history, state) # Update state context.session.state = await context.session.store_state(new_state) yield response
Agent Discovery Methods
1. Basic Discovery (Online)
Query running ACP servers directly:
curl http://localhost:8000/agents
async for agent in client.agents(): print(agent)
2. Open Discovery (Well-Known URL)
Agents publish metadata at standardized URLs:
https://your-domain.com/.well-known/agent.yml
3. Registry-Based Discovery
Centralized registry for discovering agents across multiple servers.
4. Embedded Discovery (Offline)
Metadata embedded in distribution packages (container image labels) for disconnected environments.
Generating Artifacts
Artifacts are named message parts representing files, images, or structured outputs.
Image Artifact
import base64 from io import BytesIO from PIL import Image from acp_sdk.models import Artifact @server.agent() async def image_generator(input: list[Message], context: Context): # Create image img = Image.new('RGB', (100, 100), color='red') buffer = BytesIO() img.save(buffer, format='PNG') buffer.seek(0) yield Artifact( name="generated.png", content=base64.b64encode(buffer.read()).decode("utf-8"), content_encoding="base64", content_type="image/png" )
JSON Artifact
import json from acp_sdk.models import Artifact @server.agent() async def data_agent(input: list[Message], context: Context): data = {"results": [1, 2, 3], "status": "complete"} yield Artifact( name="results.json", content=json.dumps(data, indent=2), content_type="application/json" )
Message Metadata
Citation Metadata
For source attribution in RAG agents:
MessagePart( content="AI adoption increased by 40%", content_type="text/plain", metadata={ "kind": "citation", "url": "https://example.com/study", "title": "AI Adoption Study 2024", "start_index": 0, "end_index": 32 } )
Trajectory Metadata
For transparency and debugging:
MessagePart( content="The weather in SF is 72°F", content_type="text/plain", metadata={ "kind": "trajectory", "tool_name": "weather_api", "tool_input": {"location": "San Francisco, CA"}, "tool_output": {"temperature": 72, "condition": "sunny"} } )
Wrapping Existing Agents
Integrate any existing agent into ACP regardless of framework:
from acp_sdk.server import Server, Context from acp_sdk.models import Message server = Server() @server.agent() async def wrapped_agent(input: list[Message], context: Context): """Wraps an existing LangChain/CrewAI/custom agent""" # Extract text from ACP message user_input = input[0].parts[0].content # Call your existing agent result = await your_existing_agent.run(user_input) # Yield as ACP message yield Message(parts=[MessagePart( content=result, content_type="text/plain" )]) server.run()
Architecture Patterns
Single-Agent
Client → HTTP → ACP Server → Agent
Multi-Agent Single Server
Client → HTTP → ACP Server → Agent A → Agent B → Agent C
Distributed Multi-Server
Client → Agent Registry → Server 1 → Agent A → Server 2 → Agent B → Server 3 → Agent C
Router Agent Pattern
Client → Router Agent → Decompose task → Route to specialists → Aggregate responses
Example Agents
The ACP repository includes 13 reference implementations:
| Example | Framework | Description |
|---|---|---|
| Basic Server/Client | ACP SDK | Standalone implementations |
| Chat Agent | BeeAI | ReAct with tools, memory, structured output |
| Slack Agent | BeeAI + MCP | Slack integration via ACP |
| RAG Agent | LlamaIndex | Document retrieval and synthesis |
| Prompt Chaining | BeeAI | Marketing copy → Translation pipeline |
| Dynamic Routing | BeeAI | Language-specific translation routing |
| Handoff Pattern | BeeAI | Delegation to specialized agents |
| Canvas Agent | BeeAI | Artifact generation for files |
| Song Writer | CrewAI | Multi-agent collaboration |
| GPT Researcher | Custom | Real-time research progress |
| Greeting Agent | LangGraph | Context-aware greetings |
| Agent Generator | ACP | Dynamically creates other agents |
| Story Writer | OpenAI | Short story generation |
Best Practices
Agent Design
- Clear descriptions - Agent docstrings become manifest descriptions
- Explicit content types - Declare input/output MIME types
- Async generators - Use
for streaming intermediate resultsyield - Error handling - Return meaningful error messages
Message Handling
- Validate content types - Check incoming MIME types
- Use artifacts for files - Named parts for downloadable content
- Include metadata - Citations and trajectories for transparency
Session Management
- Use sessions for context - Maintain conversation history
- Store minimal state - Keep session state lightweight
- Handle timeouts - Implement graceful timeout handling
Discovery
- Publish manifests - Use well-known URLs for open discovery
- Rich metadata - Include capabilities, domains, tags
- Version agents - Track changes with semantic versioning
Resources
- Specification: https://agentcommunicationprotocol.dev
- GitHub: https://github.com/i-am-bee/acp
- Python SDK: https://github.com/i-am-bee/acp/tree/main/python
- OpenAPI Spec: https://github.com/i-am-bee/acp/blob/main/docs/spec/openapi.yaml
- BeeAI Platform: Reference implementation
Governance
ACP is an open standard under the Linux Foundation, developed alongside BeeAI as its reference implementation. Community-driven development ensures vendor-neutral evolution.
Comparison with MCP
| Feature | ACP | MCP |
|---|---|---|
| Focus | Agent-to-agent interop | Tool/resource integration |
| Transport | HTTP REST | JSON-RPC, stdio, SSE |
| Discovery | Built-in (manifest, registry) | External |
| Sessions | Native support | Not built-in |
| Multi-modal | Native (MIME types) | Limited |
| Governance | Linux Foundation | Anthropic |
ACP and MCP are complementary - ACP handles agent communication while MCP provides tool integration. ACP includes MCP extension support for exposing agent tools.
AI Agent Fundamentals
Understanding AI agents helps design effective ACP-based systems.
What is an AI Agent?
An AI agent is a system that autonomously performs tasks by designing workflows with available tools. Unlike simple chatbots, agents handle decision-making, problem-solving, and environmental interaction through three stages:
- Goal Initialization & Planning - Decompose complex goals into subtasks
- Reasoning with Tools - Leverage databases, APIs, other agents to bridge knowledge gaps
- Learning & Reflection - Refine responses through feedback and store solutions
Agent Types (Complexity Spectrum)
| Type | Characteristics | ACP Use Case |
|---|---|---|
| Simple Reflex | Rule-based responses; no memory | Basic request handlers |
| Model-Based Reflex | Maintains internal world model | Context-aware agents |
| Goal-Based | Searches for action sequences | Task orchestrators |
| Utility-Based | Maximizes reward across paths | Optimization agents |
| Learning | Autonomous knowledge expansion | Adaptive systems |
Agentic vs Non-Agentic Systems
| Aspect | Non-Agentic (Chatbot) | Agentic (ACP Agent) |
|---|---|---|
| Tools | None | Full tool access |
| Memory | Session only | Persistent state |
| Reasoning | Single response | Multi-step planning |
| Autonomy | Requires user input | Self-directed |
| Adaptation | Static | Learns from feedback |
Reasoning Paradigms
ReAct Pattern (Think-Act-Observe)
Step-by-step problem resolution with continuous context updates:
@server.agent() async def react_agent(input: list[Message], context: Context): """ReAct agent with think-act-observe loop""" # Think: Analyze the request yield {"thought": "Analyzing user request for data analysis"} # Act: Call tool data = await fetch_data_tool(input[0].parts[0].content) yield {"thought": f"Retrieved {len(data)} records"} # Observe: Process results analysis = analyze(data) yield {"thought": "Analysis complete, formatting response"} # Final output yield Message(parts=[MessagePart( content=json.dumps(analysis), content_type="application/json" )])
ReWOO Pattern (Upfront Planning)
Plan all steps before execution, reducing token usage:
@server.agent() async def rewoo_agent(input: list[Message], context: Context): """ReWOO agent with upfront planning""" # Plan phase: Generate complete execution plan plan = [ {"step": 1, "action": "fetch_data", "params": {...}}, {"step": 2, "action": "transform", "params": {...}}, {"step": 3, "action": "summarize", "params": {...}} ] yield {"plan": plan} # Execute phase: Run all steps results = [] for step in plan: result = await execute_step(step) results.append(result) yield Message(parts=[MessagePart( content=json.dumps(results[-1]), content_type="application/json" )])
Multi-Agent Orchestration
Hierarchical Agent Architecture
┌─────────────────┐ │ Orchestrator │ │ (Goal-Based) │ └────────┬────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ Research Agent │ │ Analysis Agent │ │ Writer Agent │ │ (Learning) │ │ (Utility) │ │ (Model-Based) │ └─────────────────┘ └─────────────────┘ └─────────────────┘
Orchestrator Agent Example
@server.agent( name="orchestrator", description="Coordinates specialized agents for complex tasks" ) async def orchestrator(input: list[Message], context: Context): """Multi-agent orchestrator using ACP""" # Decompose task task = input[0].parts[0].content subtasks = await decompose_task(task) yield {"thought": f"Decomposed into {len(subtasks)} subtasks"} # Route to specialist agents async with Client(base_url="http://localhost:8000") as client: results = [] for subtask in subtasks: agent = select_specialist(subtask) run = await client.run_sync( agent=agent, input=[Message(parts=[MessagePart( content=subtask["description"], content_type="text/plain" )])] ) results.append(run.output) yield {"thought": f"Completed: {subtask['name']}"} # Aggregate responses final = synthesize_results(results) yield Message(parts=[MessagePart( content=final, content_type="text/plain" )])
Agent-to-Agent Communication
@server.agent() async def delegating_agent(input: list[Message], context: Context): """Agent that delegates to other ACP agents""" async with Client(base_url="http://localhost:8000") as client: # Discover available agents specialists = [] async for agent in client.agents(): if "analysis" in agent.description.lower(): specialists.append(agent.name) yield {"thought": f"Found {len(specialists)} analysis agents"} # Delegate to best match if specialists: run = await client.run_sync( agent=specialists[0], input=input ) yield run.output[0] else: yield Message(parts=[MessagePart( content="No suitable agent found", content_type="text/plain" )])
Production Safety Patterns
Activity Logging
import logging from datetime import datetime logger = logging.getLogger("acp_audit") @server.agent() async def audited_agent(input: list[Message], context: Context): """Agent with comprehensive activity logging""" run_id = context.run_id start_time = datetime.utcnow() # Log start logger.info(f"[{run_id}] Agent started", extra={ "run_id": run_id, "input_size": len(input), "timestamp": start_time.isoformat() }) try: # Process result = await process(input) # Log tool usage logger.info(f"[{run_id}] Tool called", extra={ "run_id": run_id, "tool": "process", "duration_ms": (datetime.utcnow() - start_time).total_seconds() * 1000 }) yield result except Exception as e: logger.error(f"[{run_id}] Agent failed: {e}", extra={ "run_id": run_id, "error": str(e) }) raise finally: logger.info(f"[{run_id}] Agent completed", extra={ "run_id": run_id, "duration_ms": (datetime.utcnow() - start_time).total_seconds() * 1000 })
Interruptibility (Graceful Cancellation)
import asyncio @server.agent() async def interruptible_agent(input: list[Message], context: Context): """Agent that supports graceful interruption""" for i, step in enumerate(processing_steps): # Check for cancellation between steps if context.is_cancelled: yield {"thought": "Cancellation requested, stopping gracefully"} # Cleanup resources await cleanup() return yield {"thought": f"Processing step {i+1}/{len(processing_steps)}"} # Use asyncio.shield for critical operations result = await asyncio.shield(step.execute()) yield {"progress": (i + 1) / len(processing_steps)} yield final_result
Human Approval Gates
@server.agent() async def approval_agent(input: list[Message], context: Context): """Agent requiring human approval for high-impact actions""" action = parse_action(input) # Check if action requires approval if action.requires_approval: yield {"thought": "This action requires human approval"} # Pause and wait for approval approval = yield AwaitInput( prompt=f"Approve action: {action.description}? (yes/no)", metadata={"action_type": action.type, "impact": "high"} ) if approval.lower() != "yes": yield Message(parts=[MessagePart( content="Action cancelled by user", content_type="text/plain" )]) return # Execute approved action result = await action.execute() yield result
Preventing Infinite Loops
MAX_ITERATIONS = 10 MAX_TOOL_CALLS = 50 @server.agent() async def bounded_agent(input: list[Message], context: Context): """Agent with loop prevention""" iterations = 0 tool_calls = 0 seen_states = set() while not is_complete(): iterations += 1 # Iteration limit if iterations > MAX_ITERATIONS: yield {"warning": "Max iterations reached"} break # State cycle detection current_state = hash(frozenset(context.state.items())) if current_state in seen_states: yield {"warning": "Cycle detected, breaking loop"} break seen_states.add(current_state) # Tool call limit if tool_calls >= MAX_TOOL_CALLS: yield {"warning": "Max tool calls reached"} break result = await process_step() tool_calls += result.tool_call_count yield {"thought": f"Iteration {iterations}, tools: {tool_calls}"} yield final_result()
Enterprise Use Cases
Cross-Platform Integration
Connect agents across different business systems:
@server.agent( name="crm-integration", description="Integrates CRM, analytics, and support systems", metadata={ "domains": ["sales", "support", "analytics"], "capabilities": [ {"name": "customer-lookup", "description": "Find customer data"}, {"name": "ticket-creation", "description": "Create support tickets"}, {"name": "report-generation", "description": "Generate analytics reports"} ] } ) async def crm_agent(input: list[Message], context: Context): """Enterprise CRM integration agent""" # Implementation pass
Agent Replacement Pattern
Swap agents seamlessly in production:
# Old agent (being replaced) @server.agent(name="search-v1", description="Legacy search") async def search_v1(input, context): return legacy_search(input) # New agent (replacement) - same interface @server.agent(name="search-v2", description="AI-powered search") async def search_v2(input, context): return ai_search(input) # Client code unchanged - just update agent name run = await client.run_sync(agent="search-v2", input=input)
Inter-Organizational Collaboration
Secure agent partnerships between companies:
@server.agent( name="partner-gateway", description="Secure gateway for partner agent access", metadata={ "security": { "auth": "oauth2", "allowed_origins": ["partner-a.com", "partner-b.com"], "rate_limit": "100/hour" } } ) async def partner_agent(input: list[Message], context: Context): """Agent for inter-organizational communication""" # Verify partner identity partner_id = context.metadata.get("partner_id") if not validate_partner(partner_id): yield Message(parts=[MessagePart( content="Unauthorized", content_type="text/plain" )]) return # Process partner request with audit yield {"audit": f"Partner {partner_id} request received"} result = await process_partner_request(input) yield result
Protocol Ecosystem
ACP works alongside other agent communication standards:
| Protocol | Purpose | Relationship to ACP |
|---|---|---|
| ACP | Agent-to-agent interop | Core protocol |
| MCP | Agent-to-tool integration | Complementary (ACP supports MCP extension) |
| A2A | Direct agent messaging | Alternative for peer-to-peer |
| Agent Client Protocol | Editor-to-agent | Different domain (IDE integration) |
Using ACP with MCP Tools
from acp_sdk.extensions import MCPExtension @server.agent(extensions=[MCPExtension()]) async def mcp_enabled_agent(input: list[Message], context: Context): """Agent that exposes MCP tools via ACP""" # Access MCP tools through ACP context tools = context.mcp.available_tools() # Call MCP tool result = await context.mcp.call_tool("web_search", query="...") yield result