Claude-skill-registry ipc-communication
当用户要求"IPC 通信"、"进程通信"、"Agent 协议"、"stdin/stdout"、"JSON 消息"、"Orchestrator 通信"、"消息序列化",或者提到"进程间通信"、"Agent 集成"、"消息协议"时使用此技能。用于实现 Rust Orchestrator 和 Platform Agents 之间的 IPC 通信,包括消息格式定义、序列化、错误处理和性能优化。
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/ipc-communication" ~/.claude/skills/majiayu000-claude-skill-registry-ipc-communication && rm -rf "$T"
manifest:
skills/data/ipc-communication/SKILL.mdsource content
IPC Communication Skill
Expert guidance for implementing Inter-Process Communication between Rust Orchestrator and Platform Agents using JSON protocol via stdin/stdout.
Overview
WeReply uses JSON-based IPC protocol for communication:
- Protocol: JSON messages via stdin/stdout
- Direction: Bidirectional (Orchestrator ↔ Agent)
- Message Format: Line-delimited JSON (one message per line)
- Character Encoding: UTF-8
Message Protocol Design
Message Types
use serde::{Deserialize, Serialize}; use specta::Type; // Agent → Orchestrator 消息 #[derive(Serialize, Deserialize, Type, Debug)] #[serde(tag = "type")] pub enum AgentMessage { MessageNew { content: String, sender: String, timestamp: String, }, CommandResponse { success: bool, #[serde(skip_serializing_if = "Option::is_none")] error: Option<String>, }, HealthStatus { status: String, // "ok", "degraded", "error" agent_type: String, // "windows_wxauto", "macos_accessibility" }, Heartbeat { timestamp: f64, }, Error { message: String, #[serde(skip_serializing_if = "Option::is_none")] code: Option<String>, }, } // Orchestrator → Agent 消息 #[derive(Serialize, Deserialize, Type, Debug)] #[serde(tag = "type")] pub enum OrchestratorCommand { WriteInput { content: String, }, ClearInput, HealthCheck, Stop, }
Message Format Examples
// Agent → Orchestrator: 新消息 { "type": "MessageNew", "content": "你好,最近怎么样?", "sender": "张三", "timestamp": "2024-01-23T10:30:00" } // Orchestrator → Agent: 写入输入框 { "type": "WriteInput", "content": "很好,谢谢!" } // Agent → Orchestrator: 命令响应 { "type": "CommandResponse", "success": true } // Agent → Orchestrator: 错误 { "type": "Error", "message": "无法访问微信窗口", "code": "ACCESS_DENIED" }
Rust Orchestrator Implementation
Agent Process Manager
use tokio::process::{Child, Command}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use std::process::Stdio; pub struct AgentProcess { child: Child, stdin: tokio::process::ChildStdin, stdout_reader: BufReader<tokio::process::ChildStdout>, } impl AgentProcess { pub async fn spawn(agent_path: &str) -> anyhow::Result<Self> { let mut child = Command::new(agent_path) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn()?; let stdin = child.stdin.take() .ok_or_else(|| anyhow!("无法获取 Agent stdin"))?; let stdout = child.stdout.take() .ok_or_else(|| anyhow!("无法获取 Agent stdout"))?; let stdout_reader = BufReader::new(stdout); Ok(Self { child, stdin, stdout_reader, }) } pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> { let json = serde_json::to_string(&command)?; self.stdin.write_all(json.as_bytes()).await?; self.stdin.write_all(b"\n").await?; self.stdin.flush().await?; Ok(()) } pub async fn read_message(&mut self) -> anyhow::Result<AgentMessage> { let mut line = String::new(); self.stdout_reader.read_line(&mut line).await?; if line.is_empty() { return Err(anyhow!("Agent 已关闭输出")); } let message = serde_json::from_str(&line)?; Ok(message) } pub async fn kill(&mut self) -> anyhow::Result<()> { self.child.kill().await?; Ok(()) } }
Message Handler Pattern
use tokio::sync::mpsc; pub struct AgentHandler { agent: AgentProcess, message_tx: mpsc::UnboundedSender<AgentMessage>, } impl AgentHandler { pub async fn start(agent_path: &str) -> anyhow::Result<(Self, mpsc::UnboundedReceiver<AgentMessage>)> { let agent = AgentProcess::spawn(agent_path).await?; let (message_tx, message_rx) = mpsc::unbounded_channel(); let handler = Self { agent, message_tx, }; Ok((handler, message_rx)) } pub async fn run(mut self) { loop { match self.agent.read_message().await { Ok(message) => { if self.message_tx.send(message).is_err() { tracing::error!("消息接收器已关闭"); break; } } Err(e) => { tracing::error!(error = %e, "读取 Agent 消息失败"); break; } } } } pub async fn send_command(&mut self, command: OrchestratorCommand) -> anyhow::Result<()> { self.agent.send_command(command).await } }
Error Handling with Timeout
use tokio::time::{timeout, Duration}; pub async fn send_command_with_timeout( agent: &mut AgentProcess, command: OrchestratorCommand, timeout_secs: u64, ) -> anyhow::Result<()> { timeout( Duration::from_secs(timeout_secs), agent.send_command(command) ) .await .map_err(|_| anyhow!("发送命令超时"))? } pub async fn read_message_with_timeout( agent: &mut AgentProcess, timeout_secs: u64, ) -> anyhow::Result<AgentMessage> { timeout( Duration::from_secs(timeout_secs), agent.read_message() ) .await .map_err(|_| anyhow!("读取消息超时"))? }
Python Agent Implementation
Message Sender Pattern
import json import sys from typing import Dict, Any class MessageSender: """发送消息到 Rust Orchestrator (stdout)""" @staticmethod def send_message(message: Dict[str, Any]): """ 发送 JSON 消息到 stdout Args: message: 消息字典,必须包含 'type' 字段 """ json_str = json.dumps(message, ensure_ascii=False) print(json_str, flush=True) @staticmethod def send_message_new(content: str, sender: str, timestamp: str): """发送新消息通知""" message = { "type": "MessageNew", "content": content, "sender": sender, "timestamp": timestamp } MessageSender.send_message(message) @staticmethod def send_command_response(success: bool, error: str = None): """发送命令响应""" message = { "type": "CommandResponse", "success": success } if error: message["error"] = error MessageSender.send_message(message) @staticmethod def send_error(error_message: str, code: str = None): """发送错误""" message = { "type": "Error", "message": error_message } if code: message["code"] = code MessageSender.send_message(message) @staticmethod def send_heartbeat(timestamp: float): """发送心跳""" message = { "type": "Heartbeat", "timestamp": timestamp } MessageSender.send_message(message)
Command Receiver Pattern
import threading from typing import Callable, Dict, Any class CommandReceiver: """从 Rust Orchestrator 接收命令 (stdin)""" def __init__(self): self.handlers: Dict[str, Callable] = {} self.running = True def register_handler(self, command_type: str, handler: Callable): """注册命令处理器""" self.handlers[command_type] = handler def start_listening(self): """开始监听命令(阻塞)""" try: for line in sys.stdin: if not self.running: break try: command = json.loads(line.strip()) self.handle_command(command) except json.JSONDecodeError as e: MessageSender.send_error(f"JSON 解析失败: {str(e)}", "JSON_PARSE_ERROR") except Exception as e: MessageSender.send_error(f"处理命令失败: {str(e)}", "COMMAND_HANDLER_ERROR") except KeyboardInterrupt: pass except Exception as e: MessageSender.send_error(f"命令监听错误: {str(e)}", "LISTENER_ERROR") def start_listening_async(self): """在后台线程中监听命令""" thread = threading.Thread(target=self.start_listening, daemon=True) thread.start() return thread def handle_command(self, command: Dict[str, Any]): """处理收到的命令""" command_type = command.get('type') if command_type not in self.handlers: MessageSender.send_error(f"未知命令类型: {command_type}", "UNKNOWN_COMMAND") return try: handler = self.handlers[command_type] result = handler(command) # 如果处理器返回 True,发送成功响应 if result is True or result is None: MessageSender.send_command_response(success=True) elif result is False: MessageSender.send_command_response(success=False, error="处理失败") except Exception as e: MessageSender.send_command_response(success=False, error=str(e)) def stop(self): """停止监听""" self.running = False
Complete Agent Example
from wechat_monitor import WeChatMonitor from input_writer import WeChatInputWriter class Agent: def __init__(self): self.monitor = WeChatMonitor() self.input_writer = WeChatInputWriter() self.command_receiver = CommandReceiver() # 注册命令处理器 self.command_receiver.register_handler("WriteInput", self.handle_write_input) self.command_receiver.register_handler("ClearInput", self.handle_clear_input) self.command_receiver.register_handler("HealthCheck", self.handle_health_check) self.command_receiver.register_handler("Stop", self.handle_stop) def handle_write_input(self, command: Dict[str, Any]) -> bool: """处理写入输入框命令""" content = command.get('content', '') return self.input_writer.write_to_input(content) def handle_clear_input(self, command: Dict[str, Any]) -> bool: """处理清空输入框命令""" return self.input_writer.clear_input() def handle_health_check(self, command: Dict[str, Any]) -> bool: """处理健康检查""" message = { "type": "HealthStatus", "status": "ok", "agent_type": "windows_wxauto" } MessageSender.send_message(message) return None # 不发送 CommandResponse def handle_stop(self, command: Dict[str, Any]) -> bool: """处理停止命令""" self.command_receiver.stop() return True def run(self): """启动 Agent""" # 启动命令监听(后台线程) self.command_receiver.start_listening_async() # 启动微信监听(主线程,阻塞) self.monitor.start_monitoring() if __name__ == '__main__': agent = Agent() agent.run()
Swift Agent Implementation
Message Sender Pattern
import Foundation class MessageSender { static func sendMessage(_ message: [String: Any]) { guard let jsonData = try? JSONSerialization.data(withJSONObject: message), let jsonString = String(data: jsonData, encoding: .utf8) else { return } print(jsonString) fflush(stdout) } static func sendMessageNew(content: String, sender: String, timestamp: String) { let message: [String: Any] = [ "type": "MessageNew", "content": content, "sender": sender, "timestamp": timestamp ] sendMessage(message) } static func sendCommandResponse(success: Bool, error: String? = nil) { var message: [String: Any] = [ "type": "CommandResponse", "success": success ] if let error = error { message["error"] = error } sendMessage(message) } static func sendError(message errorMessage: String, code: String? = nil) { var message: [String: Any] = [ "type": "Error", "message": errorMessage ] if let code = code { message["code"] = code } sendMessage(message) } }
Command Receiver Pattern
class CommandReceiver { private var handlers: [String: (([String: Any]) -> Bool)] = [:] private var running = true func registerHandler(commandType: String, handler: @escaping ([String: Any]) -> Bool) { handlers[commandType] = handler } func startListening() { while running { guard let line = readLine() else { break } guard let data = line.data(using: .utf8), let command = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { MessageSender.sendError(message: "JSON 解析失败", code: "JSON_PARSE_ERROR") continue } handleCommand(command) } } func startListeningAsync() { DispatchQueue.global(qos: .userInitiated).async { self.startListening() } } private func handleCommand(_ command: [String: Any]) { guard let commandType = command["type"] as? String else { MessageSender.sendError(message: "命令缺少 type 字段", code: "INVALID_COMMAND") return } guard let handler = handlers[commandType] else { MessageSender.sendError(message: "未知命令类型: \(commandType)", code: "UNKNOWN_COMMAND") return } let success = handler(command) MessageSender.sendCommandResponse(success: success) } func stop() { running = false } }
Message Validation
Rust Validation
pub fn validate_agent_message(message: &AgentMessage) -> Result<(), String> { match message { AgentMessage::MessageNew { content, sender, .. } => { if content.is_empty() { return Err("消息内容为空".to_string()); } if sender.is_empty() { return Err("发送者为空".to_string()); } if content.len() > 100000 { return Err("消息内容过长".to_string()); } } AgentMessage::Error { message, .. } => { if message.is_empty() { return Err("错误消息为空".to_string()); } } _ => {} } Ok(()) }
Python Validation
def validate_command(command: Dict[str, Any]) -> bool: """验证命令格式""" # 检查必需字段 if 'type' not in command: return False cmd_type = command['type'] # 验证特定命令的字段 if cmd_type == 'WriteInput': if 'content' not in command: return False content = command['content'] if len(content) > 10000: # 最大 10KB return False return True
Performance Optimization
Batch Message Processing
pub struct MessageBatcher { buffer: Vec<AgentMessage>, max_batch_size: usize, flush_interval: Duration, last_flush: Instant, } impl MessageBatcher { pub fn new(max_batch_size: usize, flush_interval: Duration) -> Self { Self { buffer: Vec::new(), max_batch_size, flush_interval, last_flush: Instant::now(), } } pub fn add_message(&mut self, message: AgentMessage) -> Option<Vec<AgentMessage>> { self.buffer.push(message); // 达到批量大小或超时,返回批次 if self.buffer.len() >= self.max_batch_size || self.last_flush.elapsed() >= self.flush_interval { return Some(self.flush()); } None } pub fn flush(&mut self) -> Vec<AgentMessage> { let messages = std::mem::take(&mut self.buffer); self.last_flush = Instant::now(); messages } }
Async Message Queue
use tokio::sync::mpsc; pub struct AsyncMessageQueue { tx: mpsc::UnboundedSender<AgentMessage>, rx: mpsc::UnboundedReceiver<AgentMessage>, } impl AsyncMessageQueue { pub fn new() -> Self { let (tx, rx) = mpsc::unbounded_channel(); Self { tx, rx } } pub fn sender(&self) -> mpsc::UnboundedSender<AgentMessage> { self.tx.clone() } pub async fn receive(&mut self) -> Option<AgentMessage> { self.rx.recv().await } }
Testing
Mock Agent Process
#[cfg(test)] mod tests { use super::*; use tokio::io::{duplex, AsyncWriteExt}; #[tokio::test] async fn test_send_command() { let (mut client, mut server) = duplex(1024); // 模拟发送命令 let command = OrchestratorCommand::WriteInput { content: "测试内容".to_string(), }; let json = serde_json::to_string(&command).unwrap(); client.write_all(json.as_bytes()).await.unwrap(); client.write_all(b"\n").await.unwrap(); // 模拟接收 let mut buf = String::new(); use tokio::io::AsyncBufReadExt; let mut reader = BufReader::new(server); reader.read_line(&mut buf).await.unwrap(); let received: OrchestratorCommand = serde_json::from_str(&buf).unwrap(); match received { OrchestratorCommand::WriteInput { content } => { assert_eq!(content, "测试内容"); } _ => panic!("错误的命令类型"), } } }
When to Use This Skill
Activate this skill when:
- Implementing IPC between Orchestrator and Agents
- Designing message protocols
- Working with stdin/stdout communication
- Handling JSON serialization/deserialization
- Implementing message validation
- Optimizing IPC performance
- Testing Agent communication
- Debugging IPC issues