Vibeship-spawner-skills agent-communication

id: agent-communication

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: ai-agents/agent-communication/skill.yaml
source content

id: agent-communication name: Agent Communication version: 1.0.0 layer: 2 description: Inter-agent communication patterns including message passing, shared memory, blackboard systems, and event-driven architectures for LLM agents

owns:

  • message-passing
  • shared-memory-patterns
  • blackboard-systems
  • event-driven-agents
  • protocol-design

pairs_with:

  • multi-agent-orchestration
  • agent-evaluation
  • autonomous-agents

requires:

  • async-programming
  • llm-fundamentals

ecosystem: primary_tools: - name: LangGraph description: Graph-based state management for agent communication url: https://langchain-ai.github.io/langgraph/ - name: AutoGen description: Conversational patterns between agents url: https://microsoft.github.io/autogen/ - name: Redis Pub/Sub description: Message broker for agent events url: https://redis.io/docs/manual/pubsub/ alternatives: - name: RabbitMQ description: Enterprise message queue when: Need guaranteed delivery and complex routing - name: Apache Kafka description: High-throughput event streaming when: Need event sourcing and replay deprecated: - name: Direct function calls between agents reason: Tight coupling, no observability migration: Use message-based or event-based communication

prerequisites: knowledge: - Async/await patterns - Message queue concepts - State machine basics skills_recommended: - multi-agent-orchestration

limits: does_not_cover: - Network protocols (TCP/HTTP) - Distributed systems fundamentals - Database communication boundaries: - Focus is agent-to-agent communication - Covers patterns within a single system

tags:

  • multi-agent
  • communication
  • message-passing
  • events
  • coordination

triggers:

  • agent communication
  • message passing
  • inter-agent
  • blackboard
  • agent events

history:

  • version: "Classic AI" milestone: Blackboard systems in expert systems impact: First formal inter-agent communication pattern
  • version: "2023" milestone: AutoGen conversation patterns impact: Natural language as agent communication
  • version: "2024" milestone: LangGraph channels and checkpointing impact: Structured state-based communication

contrarian_insights:

  • claim: Natural language is best for agent communication reality: Structured messages reduce ambiguity and enable validation
  • claim: Agents should communicate freely reality: Constrained protocols prevent cascading failures and improve reliability
  • claim: More communication = better coordination reality: Over-communication creates latency and token costs; minimize messages

identity: | You're a distributed systems engineer who has adapted message-passing patterns for LLM agents. You understand that agent communication is fundamentally different from traditional IPC—agents can hallucinate, misinterpret, and generate novel message formats.

You've learned that the key to reliable multi-agent systems is constrained, validated communication. Agents that can say anything will eventually say something wrong. Structure and validation catch errors before they propagate.

Your core principles:

  1. Structured over natural language—validate messages against schemas
  2. Minimize communication—every message costs tokens and latency
  3. Fail fast—catch malformed messages immediately
  4. Log everything—communication is where things go wrong
  5. Design for replay—enable debugging and recovery

patterns:

  • name: Typed Message Passing description: Strongly typed messages with schema validation when: Agents need to exchange structured data example: | import { z } from 'zod';

    // Define message types const TaskRequestSchema = z.object({ type: z.literal('task_request'), requestId: z.string().uuid(), fromAgent: z.string(), toAgent: z.string(), task: z.object({ description: z.string(), priority: z.enum(['low', 'medium', 'high', 'critical']), deadline: z.number().optional(), context: z.record(z.unknown()) }), timestamp: z.number() });

    const TaskResponseSchema = z.object({ type: z.literal('task_response'), requestId: z.string().uuid(), fromAgent: z.string(), status: z.enum(['accepted', 'rejected', 'completed', 'failed']), result: z.unknown().optional(), error: z.string().optional(), timestamp: z.number() });

    const MessageSchema = z.discriminatedUnion('type', [ TaskRequestSchema, TaskResponseSchema, // Add more message types... ]);

    type Message = z.infer<typeof MessageSchema>;

    class TypedMessageBus { private handlers: Map<string, Set<MessageHandler>> = new Map(); private messageLog: Message[] = [];

      async send(message: Message): Promise<void> {
          // Validate message
          const validated = MessageSchema.parse(message);
    
          // Log for debugging
          this.messageLog.push({
              ...validated,
              _logged: Date.now()
          });
    
          // Deliver to subscribers
          const handlers = this.handlers.get(validated.toAgent);
          if (!handlers || handlers.size === 0) {
              throw new Error(`No handlers for agent: ${validated.toAgent}`);
          }
    
          await Promise.all(
              Array.from(handlers).map(h => h(validated))
          );
      }
    
      subscribe(agentId: string, handler: MessageHandler): () => void {
          if (!this.handlers.has(agentId)) {
              this.handlers.set(agentId, new Set());
          }
          this.handlers.get(agentId)!.add(handler);
    
          return () => this.handlers.get(agentId)!.delete(handler);
      }
    
      // Replay for debugging
      getMessages(filter?: MessageFilter): Message[] {
          return this.messageLog.filter(m => {
              if (filter?.fromAgent && m.fromAgent !== filter.fromAgent) return false;
              if (filter?.toAgent && m.toAgent !== filter.toAgent) return false;
              if (filter?.type && m.type !== filter.type) return false;
              return true;
          });
      }
    

    }

    // Agent with typed messaging class TypedAgent { constructor( private id: string, private bus: TypedMessageBus ) { this.bus.subscribe(this.id, this.handleMessage.bind(this)); }

      async requestTask(toAgent: string, task: TaskRequest['task']): Promise<TaskResponse> {
          const requestId = crypto.randomUUID();
    
          const request: TaskRequest = {
              type: 'task_request',
              requestId,
              fromAgent: this.id,
              toAgent,
              task,
              timestamp: Date.now()
          };
    
          await this.bus.send(request);
    
          // Wait for response
          return this.waitForResponse(requestId);
      }
    
      private async handleMessage(message: Message): Promise<void> {
          switch (message.type) {
              case 'task_request':
                  await this.handleTaskRequest(message);
                  break;
              case 'task_response':
                  this.resolveResponse(message);
                  break;
          }
      }
    

    }

  • name: Blackboard Pattern description: Shared knowledge space that agents read/write to when: Agents need to collaborate on evolving shared state example: | // Blackboard: shared knowledge space for agent collaboration

    interface BlackboardEntry { key: string; value: unknown; author: string; timestamp: number; confidence: number; dependencies: string[]; // Keys this entry depends on }

    class Blackboard { private entries: Map<string, BlackboardEntry> = new Map(); private watchers: Map<string, Set<WatchCallback>> = new Map(); private history: BlackboardChange[] = [];

      // Write with provenance tracking
      async write(
          key: string,
          value: unknown,
          author: string,
          metadata?: { confidence?: number; dependencies?: string[] }
      ): Promise<void> {
          const entry: BlackboardEntry = {
              key,
              value,
              author,
              timestamp: Date.now(),
              confidence: metadata?.confidence ?? 1.0,
              dependencies: metadata?.dependencies ?? []
          };
    
          const previousValue = this.entries.get(key);
          this.entries.set(key, entry);
    
          // Log change
          this.history.push({
              type: 'write',
              key,
              previousValue,
              newValue: entry,
              author,
              timestamp: Date.now()
          });
    
          // Notify watchers
          await this.notifyWatchers(key, entry);
      }
    
      // Read with dependency tracking
      read(key: string, reader?: string): BlackboardEntry | undefined {
          const entry = this.entries.get(key);
    
          if (entry && reader) {
              this.history.push({
                  type: 'read',
                  key,
                  reader,
                  timestamp: Date.now()
              });
          }
    
          return entry;
      }
    
      // Watch for changes
      watch(key: string, callback: WatchCallback): () => void {
          if (!this.watchers.has(key)) {
              this.watchers.set(key, new Set());
          }
          this.watchers.get(key)!.add(callback);
    
          return () => this.watchers.get(key)!.delete(callback);
      }
    
      // Query by pattern
      query(pattern: string | RegExp): BlackboardEntry[] {
          const regex = typeof pattern === 'string'
              ? new RegExp(pattern)
              : pattern;
    
          return Array.from(this.entries.values())
              .filter(e => regex.test(e.key));
      }
    
      // Get entries by author
      getByAuthor(author: string): BlackboardEntry[] {
          return Array.from(this.entries.values())
              .filter(e => e.author === author);
      }
    
      // Conflict resolution for concurrent writes
      async mergeConflict(
          key: string,
          entries: BlackboardEntry[],
          resolver: ConflictResolver
      ): Promise<BlackboardEntry> {
          const resolved = await resolver.resolve(entries);
          await this.write(key, resolved.value, 'conflict_resolver', {
              confidence: resolved.confidence,
              dependencies: entries.map(e => `${key}@${e.timestamp}`)
          });
          return this.entries.get(key)!;
      }
    

    }

    // Agent using blackboard for collaboration class BlackboardAgent { constructor( private id: string, private blackboard: Blackboard, private llm: LLMClient ) {}

      async contribute(topic: string): Promise<void> {
          // Read what others have written
          const existingEntries = this.blackboard.query(`${topic}.*`);
          const context = existingEntries.map(e =>
              `[${e.author}]: ${JSON.stringify(e.value)}`
          ).join('\n');
    
          // Generate contribution
          const response = await this.llm.invoke({
              messages: [{
                  role: 'system',
                  content: `You are contributing to a collaborative analysis on: ${topic}
    

Existing contributions: ${context}

Add new insights that complement, not duplicate, existing contributions.` }] });

          // Write to blackboard
          await this.blackboard.write(
              `${topic}.${this.id}`,
              response.content,
              this.id,
              { confidence: 0.8 }
          );
      }

      // React to changes
      async watchAndRespond(pattern: string): Promise<void> {
          const entries = this.blackboard.query(pattern);

          for (const entry of entries) {
              this.blackboard.watch(entry.key, async (newValue) => {
                  await this.respondToChange(entry.key, newValue);
              });
          }
      }
  }
  • name: Event-Driven Agent Communication description: Agents publish events, others subscribe and react when: Loose coupling and async communication needed example: | import { EventEmitter } from 'events';

    // Event types interface AgentEvent { eventId: string; eventType: string; source: string; timestamp: number; payload: unknown; correlationId?: string; }

    interface TaskStartedEvent extends AgentEvent { eventType: 'task.started'; payload: { taskId: string; description: string; assignee: string; }; }

    interface TaskCompletedEvent extends AgentEvent { eventType: 'task.completed'; payload: { taskId: string; result: unknown; duration: number; }; }

    interface ErrorOccurredEvent extends AgentEvent { eventType: 'error.occurred'; payload: { error: string; context: unknown; recoverable: boolean; }; }

    type AgentEvents = TaskStartedEvent | TaskCompletedEvent | ErrorOccurredEvent;

    class EventBus { private emitter = new EventEmitter(); private eventLog: AgentEvent[] = [];

      publish<T extends AgentEvent>(event: T): void {
          // Add metadata
          const enrichedEvent = {
              ...event,
              eventId: event.eventId || crypto.randomUUID(),
              timestamp: event.timestamp || Date.now()
          };
    
          // Log for replay
          this.eventLog.push(enrichedEvent);
    
          // Emit
          this.emitter.emit(event.eventType, enrichedEvent);
          this.emitter.emit('*', enrichedEvent);  // Wildcard subscribers
      }
    
      subscribe<T extends AgentEvent>(
          eventType: T['eventType'] | '*',
          handler: (event: T) => void | Promise<void>
      ): () => void {
          this.emitter.on(eventType, handler);
          return () => this.emitter.off(eventType, handler);
      }
    
      // Replay events for recovery or debugging
      async replay(
          filter?: { since?: number; types?: string[] },
          handler?: (event: AgentEvent) => Promise<void>
      ): Promise<AgentEvent[]> {
          const filtered = this.eventLog.filter(e => {
              if (filter?.since && e.timestamp < filter.since) return false;
              if (filter?.types && !filter.types.includes(e.eventType)) return false;
              return true;
          });
    
          if (handler) {
              for (const event of filtered) {
                  await handler(event);
              }
          }
    
          return filtered;
      }
    

    }

    // Event-driven agent class EventDrivenAgent { constructor( private id: string, private eventBus: EventBus ) { // Subscribe to relevant events this.eventBus.subscribe('task.started', this.onTaskStarted.bind(this)); this.eventBus.subscribe('error.occurred', this.onError.bind(this)); }

      private async onTaskStarted(event: TaskStartedEvent): Promise<void> {
          if (event.payload.assignee !== this.id) return;
    
          try {
              const result = await this.executeTask(event.payload);
    
              this.eventBus.publish({
                  eventType: 'task.completed',
                  source: this.id,
                  correlationId: event.eventId,
                  payload: {
                      taskId: event.payload.taskId,
                      result,
                      duration: Date.now() - event.timestamp
                  }
              } as TaskCompletedEvent);
          } catch (error) {
              this.eventBus.publish({
                  eventType: 'error.occurred',
                  source: this.id,
                  correlationId: event.eventId,
                  payload: {
                      error: error.message,
                      context: { taskId: event.payload.taskId },
                      recoverable: true
                  }
              } as ErrorOccurredEvent);
          }
      }
    
      private onError(event: ErrorOccurredEvent): void {
          console.error(`[${this.id}] Error from ${event.source}: ${event.payload.error}`);
      }
    

    }

  • name: Request-Response with Timeout description: Synchronous-style communication with timeout handling when: Agent needs response before continuing example: | class RequestResponseChannel { private pendingRequests: Map<string, { resolve: (value: Response) => void; reject: (error: Error) => void; timeout: NodeJS.Timeout; }> = new Map();

      private bus: TypedMessageBus;
      private defaultTimeout = 30000;
    
      async request<T>(
          toAgent: string,
          payload: unknown,
          options?: { timeout?: number }
      ): Promise<T> {
          const requestId = crypto.randomUUID();
          const timeout = options?.timeout ?? this.defaultTimeout;
    
          return new Promise((resolve, reject) => {
              // Set timeout
              const timeoutHandle = setTimeout(() => {
                  this.pendingRequests.delete(requestId);
                  reject(new Error(`Request ${requestId} timed out after ${timeout}ms`));
              }, timeout);
    
              // Store pending request
              this.pendingRequests.set(requestId, {
                  resolve: resolve as (value: Response) => void,
                  reject,
                  timeout: timeoutHandle
              });
    
              // Send request
              this.bus.send({
                  type: 'request',
                  requestId,
                  fromAgent: this.agentId,
                  toAgent,
                  payload,
                  timestamp: Date.now()
              }).catch(reject);
          });
      }
    
      respond(requestId: string, response: unknown): void {
          this.bus.send({
              type: 'response',
              requestId,
              fromAgent: this.agentId,
              payload: response,
              timestamp: Date.now()
          });
      }
    
      handleResponse(message: ResponseMessage): void {
          const pending = this.pendingRequests.get(message.requestId);
          if (!pending) return;
    
          clearTimeout(pending.timeout);
          this.pendingRequests.delete(message.requestId);
          pending.resolve(message.payload);
      }
    

    }

anti_patterns:

  • name: Untyped Messages description: Sending arbitrary JSON between agents without schemas why: Leads to runtime errors, hard to debug, no IDE support instead: Define message schemas with Zod or TypeScript interfaces.

  • name: Synchronous Everywhere description: All agent communication is blocking request-response why: Creates bottlenecks, doesn't scale, fails cascade instead: Use async events where response not immediately needed.

  • name: No Message Logging description: Messages not persisted for debugging or replay why: Impossible to debug failures, can't recover from crashes instead: Log all messages with timestamps and correlation IDs.

  • name: Circular Dependencies description: Agent A waits for B which waits for A why: Deadlocks, hangs, hard to detect instead: Design acyclic communication flows or use async events.

handoffs:

  • trigger: orchestration|coordination to: multi-agent-orchestration context: Need orchestration patterns

  • trigger: testing|evaluation to: agent-evaluation context: Need to test agent communication