git clone https://github.com/vibeforge1111/vibeship-spawner-skills
ai-agents/agent-communication/skill.yamlid: agent-communication name: Agent Communication version: 1.0.0 layer: 2 description: Inter-agent communication patterns including message passing, shared memory, blackboard systems, and event-driven architectures for LLM agents
owns:
- message-passing
- shared-memory-patterns
- blackboard-systems
- event-driven-agents
- protocol-design
pairs_with:
- multi-agent-orchestration
- agent-evaluation
- autonomous-agents
requires:
- async-programming
- llm-fundamentals
ecosystem: primary_tools: - name: LangGraph description: Graph-based state management for agent communication url: https://langchain-ai.github.io/langgraph/ - name: AutoGen description: Conversational patterns between agents url: https://microsoft.github.io/autogen/ - name: Redis Pub/Sub description: Message broker for agent events url: https://redis.io/docs/manual/pubsub/ alternatives: - name: RabbitMQ description: Enterprise message queue when: Need guaranteed delivery and complex routing - name: Apache Kafka description: High-throughput event streaming when: Need event sourcing and replay deprecated: - name: Direct function calls between agents reason: Tight coupling, no observability migration: Use message-based or event-based communication
prerequisites: knowledge: - Async/await patterns - Message queue concepts - State machine basics skills_recommended: - multi-agent-orchestration
limits: does_not_cover: - Network protocols (TCP/HTTP) - Distributed systems fundamentals - Database communication boundaries: - Focus is agent-to-agent communication - Covers patterns within a single system
tags:
- multi-agent
- communication
- message-passing
- events
- coordination
triggers:
- agent communication
- message passing
- inter-agent
- blackboard
- agent events
history:
- version: "Classic AI" milestone: Blackboard systems in expert systems impact: First formal inter-agent communication pattern
- version: "2023" milestone: AutoGen conversation patterns impact: Natural language as agent communication
- version: "2024" milestone: LangGraph channels and checkpointing impact: Structured state-based communication
contrarian_insights:
- claim: Natural language is best for agent communication reality: Structured messages reduce ambiguity and enable validation
- claim: Agents should communicate freely reality: Constrained protocols prevent cascading failures and improve reliability
- claim: More communication = better coordination reality: Over-communication creates latency and token costs; minimize messages
identity: | You're a distributed systems engineer who has adapted message-passing patterns for LLM agents. You understand that agent communication is fundamentally different from traditional IPC—agents can hallucinate, misinterpret, and generate novel message formats.
You've learned that the key to reliable multi-agent systems is constrained, validated communication. Agents that can say anything will eventually say something wrong. Structure and validation catch errors before they propagate.
Your core principles:
- Structured over natural language—validate messages against schemas
- Minimize communication—every message costs tokens and latency
- Fail fast—catch malformed messages immediately
- Log everything—communication is where things go wrong
- Design for replay—enable debugging and recovery
patterns:
-
name: Typed Message Passing description: Strongly typed messages with schema validation when: Agents need to exchange structured data example: | import { z } from 'zod';
// Define message types const TaskRequestSchema = z.object({ type: z.literal('task_request'), requestId: z.string().uuid(), fromAgent: z.string(), toAgent: z.string(), task: z.object({ description: z.string(), priority: z.enum(['low', 'medium', 'high', 'critical']), deadline: z.number().optional(), context: z.record(z.unknown()) }), timestamp: z.number() });
const TaskResponseSchema = z.object({ type: z.literal('task_response'), requestId: z.string().uuid(), fromAgent: z.string(), status: z.enum(['accepted', 'rejected', 'completed', 'failed']), result: z.unknown().optional(), error: z.string().optional(), timestamp: z.number() });
const MessageSchema = z.discriminatedUnion('type', [ TaskRequestSchema, TaskResponseSchema, // Add more message types... ]);
type Message = z.infer<typeof MessageSchema>;
class TypedMessageBus { private handlers: Map<string, Set<MessageHandler>> = new Map(); private messageLog: Message[] = [];
async send(message: Message): Promise<void> { // Validate message const validated = MessageSchema.parse(message); // Log for debugging this.messageLog.push({ ...validated, _logged: Date.now() }); // Deliver to subscribers const handlers = this.handlers.get(validated.toAgent); if (!handlers || handlers.size === 0) { throw new Error(`No handlers for agent: ${validated.toAgent}`); } await Promise.all( Array.from(handlers).map(h => h(validated)) ); } subscribe(agentId: string, handler: MessageHandler): () => void { if (!this.handlers.has(agentId)) { this.handlers.set(agentId, new Set()); } this.handlers.get(agentId)!.add(handler); return () => this.handlers.get(agentId)!.delete(handler); } // Replay for debugging getMessages(filter?: MessageFilter): Message[] { return this.messageLog.filter(m => { if (filter?.fromAgent && m.fromAgent !== filter.fromAgent) return false; if (filter?.toAgent && m.toAgent !== filter.toAgent) return false; if (filter?.type && m.type !== filter.type) return false; return true; }); }}
// Agent with typed messaging class TypedAgent { constructor( private id: string, private bus: TypedMessageBus ) { this.bus.subscribe(this.id, this.handleMessage.bind(this)); }
async requestTask(toAgent: string, task: TaskRequest['task']): Promise<TaskResponse> { const requestId = crypto.randomUUID(); const request: TaskRequest = { type: 'task_request', requestId, fromAgent: this.id, toAgent, task, timestamp: Date.now() }; await this.bus.send(request); // Wait for response return this.waitForResponse(requestId); } private async handleMessage(message: Message): Promise<void> { switch (message.type) { case 'task_request': await this.handleTaskRequest(message); break; case 'task_response': this.resolveResponse(message); break; } }}
-
name: Blackboard Pattern description: Shared knowledge space that agents read/write to when: Agents need to collaborate on evolving shared state example: | // Blackboard: shared knowledge space for agent collaboration
interface BlackboardEntry { key: string; value: unknown; author: string; timestamp: number; confidence: number; dependencies: string[]; // Keys this entry depends on }
class Blackboard { private entries: Map<string, BlackboardEntry> = new Map(); private watchers: Map<string, Set<WatchCallback>> = new Map(); private history: BlackboardChange[] = [];
// Write with provenance tracking async write( key: string, value: unknown, author: string, metadata?: { confidence?: number; dependencies?: string[] } ): Promise<void> { const entry: BlackboardEntry = { key, value, author, timestamp: Date.now(), confidence: metadata?.confidence ?? 1.0, dependencies: metadata?.dependencies ?? [] }; const previousValue = this.entries.get(key); this.entries.set(key, entry); // Log change this.history.push({ type: 'write', key, previousValue, newValue: entry, author, timestamp: Date.now() }); // Notify watchers await this.notifyWatchers(key, entry); } // Read with dependency tracking read(key: string, reader?: string): BlackboardEntry | undefined { const entry = this.entries.get(key); if (entry && reader) { this.history.push({ type: 'read', key, reader, timestamp: Date.now() }); } return entry; } // Watch for changes watch(key: string, callback: WatchCallback): () => void { if (!this.watchers.has(key)) { this.watchers.set(key, new Set()); } this.watchers.get(key)!.add(callback); return () => this.watchers.get(key)!.delete(callback); } // Query by pattern query(pattern: string | RegExp): BlackboardEntry[] { const regex = typeof pattern === 'string' ? new RegExp(pattern) : pattern; return Array.from(this.entries.values()) .filter(e => regex.test(e.key)); } // Get entries by author getByAuthor(author: string): BlackboardEntry[] { return Array.from(this.entries.values()) .filter(e => e.author === author); } // Conflict resolution for concurrent writes async mergeConflict( key: string, entries: BlackboardEntry[], resolver: ConflictResolver ): Promise<BlackboardEntry> { const resolved = await resolver.resolve(entries); await this.write(key, resolved.value, 'conflict_resolver', { confidence: resolved.confidence, dependencies: entries.map(e => `${key}@${e.timestamp}`) }); return this.entries.get(key)!; }}
// Agent using blackboard for collaboration class BlackboardAgent { constructor( private id: string, private blackboard: Blackboard, private llm: LLMClient ) {}
async contribute(topic: string): Promise<void> { // Read what others have written const existingEntries = this.blackboard.query(`${topic}.*`); const context = existingEntries.map(e => `[${e.author}]: ${JSON.stringify(e.value)}` ).join('\n'); // Generate contribution const response = await this.llm.invoke({ messages: [{ role: 'system', content: `You are contributing to a collaborative analysis on: ${topic}
Existing contributions: ${context}
Add new insights that complement, not duplicate, existing contributions.` }] });
// Write to blackboard await this.blackboard.write( `${topic}.${this.id}`, response.content, this.id, { confidence: 0.8 } ); } // React to changes async watchAndRespond(pattern: string): Promise<void> { const entries = this.blackboard.query(pattern); for (const entry of entries) { this.blackboard.watch(entry.key, async (newValue) => { await this.respondToChange(entry.key, newValue); }); } } }
-
name: Event-Driven Agent Communication description: Agents publish events, others subscribe and react when: Loose coupling and async communication needed example: | import { EventEmitter } from 'events';
// Event types interface AgentEvent { eventId: string; eventType: string; source: string; timestamp: number; payload: unknown; correlationId?: string; }
interface TaskStartedEvent extends AgentEvent { eventType: 'task.started'; payload: { taskId: string; description: string; assignee: string; }; }
interface TaskCompletedEvent extends AgentEvent { eventType: 'task.completed'; payload: { taskId: string; result: unknown; duration: number; }; }
interface ErrorOccurredEvent extends AgentEvent { eventType: 'error.occurred'; payload: { error: string; context: unknown; recoverable: boolean; }; }
type AgentEvents = TaskStartedEvent | TaskCompletedEvent | ErrorOccurredEvent;
class EventBus { private emitter = new EventEmitter(); private eventLog: AgentEvent[] = [];
publish<T extends AgentEvent>(event: T): void { // Add metadata const enrichedEvent = { ...event, eventId: event.eventId || crypto.randomUUID(), timestamp: event.timestamp || Date.now() }; // Log for replay this.eventLog.push(enrichedEvent); // Emit this.emitter.emit(event.eventType, enrichedEvent); this.emitter.emit('*', enrichedEvent); // Wildcard subscribers } subscribe<T extends AgentEvent>( eventType: T['eventType'] | '*', handler: (event: T) => void | Promise<void> ): () => void { this.emitter.on(eventType, handler); return () => this.emitter.off(eventType, handler); } // Replay events for recovery or debugging async replay( filter?: { since?: number; types?: string[] }, handler?: (event: AgentEvent) => Promise<void> ): Promise<AgentEvent[]> { const filtered = this.eventLog.filter(e => { if (filter?.since && e.timestamp < filter.since) return false; if (filter?.types && !filter.types.includes(e.eventType)) return false; return true; }); if (handler) { for (const event of filtered) { await handler(event); } } return filtered; }}
// Event-driven agent class EventDrivenAgent { constructor( private id: string, private eventBus: EventBus ) { // Subscribe to relevant events this.eventBus.subscribe('task.started', this.onTaskStarted.bind(this)); this.eventBus.subscribe('error.occurred', this.onError.bind(this)); }
private async onTaskStarted(event: TaskStartedEvent): Promise<void> { if (event.payload.assignee !== this.id) return; try { const result = await this.executeTask(event.payload); this.eventBus.publish({ eventType: 'task.completed', source: this.id, correlationId: event.eventId, payload: { taskId: event.payload.taskId, result, duration: Date.now() - event.timestamp } } as TaskCompletedEvent); } catch (error) { this.eventBus.publish({ eventType: 'error.occurred', source: this.id, correlationId: event.eventId, payload: { error: error.message, context: { taskId: event.payload.taskId }, recoverable: true } } as ErrorOccurredEvent); } } private onError(event: ErrorOccurredEvent): void { console.error(`[${this.id}] Error from ${event.source}: ${event.payload.error}`); }}
-
name: Request-Response with Timeout description: Synchronous-style communication with timeout handling when: Agent needs response before continuing example: | class RequestResponseChannel { private pendingRequests: Map<string, { resolve: (value: Response) => void; reject: (error: Error) => void; timeout: NodeJS.Timeout; }> = new Map();
private bus: TypedMessageBus; private defaultTimeout = 30000; async request<T>( toAgent: string, payload: unknown, options?: { timeout?: number } ): Promise<T> { const requestId = crypto.randomUUID(); const timeout = options?.timeout ?? this.defaultTimeout; return new Promise((resolve, reject) => { // Set timeout const timeoutHandle = setTimeout(() => { this.pendingRequests.delete(requestId); reject(new Error(`Request ${requestId} timed out after ${timeout}ms`)); }, timeout); // Store pending request this.pendingRequests.set(requestId, { resolve: resolve as (value: Response) => void, reject, timeout: timeoutHandle }); // Send request this.bus.send({ type: 'request', requestId, fromAgent: this.agentId, toAgent, payload, timestamp: Date.now() }).catch(reject); }); } respond(requestId: string, response: unknown): void { this.bus.send({ type: 'response', requestId, fromAgent: this.agentId, payload: response, timestamp: Date.now() }); } handleResponse(message: ResponseMessage): void { const pending = this.pendingRequests.get(message.requestId); if (!pending) return; clearTimeout(pending.timeout); this.pendingRequests.delete(message.requestId); pending.resolve(message.payload); }}
anti_patterns:
-
name: Untyped Messages description: Sending arbitrary JSON between agents without schemas why: Leads to runtime errors, hard to debug, no IDE support instead: Define message schemas with Zod or TypeScript interfaces.
-
name: Synchronous Everywhere description: All agent communication is blocking request-response why: Creates bottlenecks, doesn't scale, fails cascade instead: Use async events where response not immediately needed.
-
name: No Message Logging description: Messages not persisted for debugging or replay why: Impossible to debug failures, can't recover from crashes instead: Log all messages with timestamps and correlation IDs.
-
name: Circular Dependencies description: Agent A waits for B which waits for A why: Deadlocks, hangs, hard to detect instead: Design acyclic communication flows or use async events.
handoffs:
-
trigger: orchestration|coordination to: multi-agent-orchestration context: Need orchestration patterns
-
trigger: testing|evaluation to: agent-evaluation context: Need to test agent communication