Claude-skill-registry distributed-systems
Distributed systems tools including NATS messaging, libp2p networking, and Temporal workflows
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/distributed-systems" ~/.claude/skills/majiayu000-claude-skill-registry-distributed-systems && rm -rf "$T"
manifest:
skills/data/distributed-systems/SKILL.mdsource content
Distributed Systems Skills
Overview
This skill provides expertise in distributed systems infrastructure for multi-robot coordination, microservices communication, and workflow orchestration.
NATS - Cloud Native Messaging
NATS is a high-performance messaging system ideal for ROS2 multi-robot coordination.
Installation (Nix)
# In flake.nix devshells or packages { pkgs, ... }: { packages = with pkgs; [ nats-server # NATS server natscli # CLI tools ]; }
Installation (Pixi)
pixi add nats-py # Python client pixi add nats-server # Server (if available)
Quick Start
# Start NATS server nats-server # With JetStream (persistence) nats-server --jetstream # With config file nats-server -c /path/to/nats.conf
Configuration
# nats.conf port: 4222 http_port: 8222 jetstream { store_dir: "/var/lib/nats/jetstream" max_memory_store: 1G max_file_store: 10G } # Cluster configuration cluster { name: "ros2-cluster" port: 6222 routes: [ "nats://robot1:6222" "nats://robot2:6222" ] }
Python Client
import asyncio import nats async def main(): # Connect to NATS nc = await nats.connect("nats://localhost:4222") # Publish message await nc.publish("robot.status", b"online") # Subscribe to topic async def message_handler(msg): print(f"Received: {msg.data.decode()}") sub = await nc.subscribe("robot.*", cb=message_handler) # Request-reply pattern response = await nc.request("robot.ping", b"hello", timeout=1.0) print(f"Response: {response.data.decode()}") await nc.close() asyncio.run(main())
ROS2 Integration Pattern
import rclpy from rclpy.node import Node import nats import asyncio import json class NATSBridgeNode(Node): """Bridge ROS2 topics to NATS subjects.""" def __init__(self): super().__init__('nats_bridge') self.nc = None async def connect_nats(self): self.nc = await nats.connect("nats://localhost:4222") async def publish_to_nats(self, subject: str, data: dict): if self.nc: await self.nc.publish(subject, json.dumps(data).encode()) async def subscribe_from_nats(self, subject: str, callback): if self.nc: await self.nc.subscribe(subject, cb=callback)
JetStream (Persistence)
import nats from nats.js import JetStreamContext async def setup_jetstream(): nc = await nats.connect() js = nc.jetstream() # Create stream await js.add_stream(name="ROBOTS", subjects=["robot.*"]) # Publish with acknowledgment ack = await js.publish("robot.telemetry", b"data") print(f"Published: seq={ack.seq}") # Consumer sub = await js.pull_subscribe("robot.*", "telemetry-consumer") msgs = await sub.fetch(10) for msg in msgs: await msg.ack()
CLI Commands
# Server info nats server info # Publish message nats pub robot.command "move forward" # Subscribe nats sub "robot.>" # Request-reply nats request robot.ping "hello" # Stream management nats stream add ROBOTS --subjects "robot.*" nats stream info ROBOTS nats consumer add ROBOTS telemetry
Temporal - Workflow Orchestration
Temporal provides durable workflow execution for complex robotics tasks.
Installation
{ pkgs, ... }: { packages = with pkgs; [ temporal-cli # CLI tools # temporalite # Local development server ]; }
Quick Start
# Start local server temporalite start --namespace default # Or with Docker docker run -d --name temporal \ -p 7233:7233 -p 8233:8233 \ temporalio/auto-setup:latest
Python Workflow
from temporalio import activity, workflow from temporalio.client import Client from temporalio.worker import Worker from datetime import timedelta @activity.defn async def navigate_to_waypoint(waypoint: dict) -> bool: """Activity: Navigate robot to waypoint.""" # ROS2 navigation logic here return True @activity.defn async def pick_object(object_id: str) -> bool: """Activity: Pick up an object.""" return True @workflow.defn class DeliveryWorkflow: """Durable workflow for robot delivery task.""" @workflow.run async def run(self, delivery_request: dict) -> str: # Navigate to pickup await workflow.execute_activity( navigate_to_waypoint, delivery_request["pickup_location"], start_to_close_timeout=timedelta(minutes=5) ) # Pick up item await workflow.execute_activity( pick_object, delivery_request["object_id"], start_to_close_timeout=timedelta(minutes=2) ) # Navigate to delivery await workflow.execute_activity( navigate_to_waypoint, delivery_request["delivery_location"], start_to_close_timeout=timedelta(minutes=5) ) return "delivered" async def main(): client = await Client.connect("localhost:7233") # Start workflow handle = await client.start_workflow( DeliveryWorkflow.run, {"pickup_location": {...}, "delivery_location": {...}}, id="delivery-001", task_queue="robot-tasks" ) result = await handle.result() print(f"Delivery result: {result}")
Architecture Patterns
Multi-Robot Coordination
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Robot 1 │ │ Robot 2 │ │ Robot 3 │ │ (ROS2) │ │ (ROS2) │ │ (ROS2) │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └───────────────────┼───────────────────┘ │ ┌──────┴──────┐ │ NATS Cluster│ │ (JetStream)│ └──────┬──────┘ │ ┌────────────┼────────────┐ │ │ │ ┌──────┴──────┐ ┌──┴───┐ ┌────┴─────┐ │ Coordinator │ │ Fleet │ │ Telemetry│ │ Service │ │ Mgmt │ │ Collector│ └─────────────┘ └──────┘ └──────────┘
Event-Driven Microservices
┌─────────────────────────────────────────────────────┐ │ NATS Subjects │ ├───────────────┬─────────────────┬───────────────────┤ │ robot.status │ robot.telemetry │ robot.command │ │ task.created │ task.completed │ task.failed │ │ alert.warning │ alert.critical │ alert.resolved │ └───────────────┴─────────────────┴───────────────────┘
Best Practices
- Use JetStream for messages that need persistence or replay
- Subject naming: Use dot-separated hierarchical names (
)robot.{id}.{type} - Cluster for HA: Run 3+ NATS servers in production
- Workflows for durability: Use Temporal for long-running, recoverable tasks
- Monitor with Prometheus: Both NATS and Temporal export metrics
Related Skills
- ROS2 Development - ROS2 integration
- DevOps - Deployment and CI/CD
- Observability - Monitoring and metrics