Claude-skill-registry-data Messaging & Event Systems
Messaging and event-driven architecture. Activate when: (1) Working with NATS pub/sub, (2) Configuring Temporal workflows, (3) Implementing event sourcing, (4) Setting up message queues, or (5) Designing async communication patterns.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry-data
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/messaging" ~/.claude/skills/majiayu000-claude-skill-registry-data-messaging-event-systems && rm -rf "$T"
manifest:
data/messaging/SKILL.mdsource content
Messaging & Event Systems
Overview
This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.
NATS
Core Concepts
| Concept | Description |
|---|---|
| Subject | Message address/topic (e.g., ) |
| Publisher | Sends messages to subjects |
| Subscriber | Receives messages from subjects |
| Queue Group | Load-balanced message distribution |
| JetStream | Persistence and streaming layer |
| KV Store | Key-value storage built on JetStream |
NATS CLI
# Connect nats context add local --server nats://localhost:4222 nats context select local # Pub/Sub nats pub orders.created '{"id": 123}' nats sub 'orders.>' # Wildcard subscription # Request/Reply nats reply 'service.ping' 'pong' # In terminal 1 nats request 'service.ping' '' # In terminal 2 # JetStream nats stream add ORDERS --subjects "orders.>" --retention limits nats stream info ORDERS nats consumer add ORDERS processor --pull --ack explicit # KV Store nats kv add CONFIG nats kv put CONFIG app.setting "value" nats kv get CONFIG app.setting nats kv watch CONFIG
NATS Server Configuration
# nats.conf port: 4222 jetstream { store_dir: /data/jetstream max_mem: 1G max_file: 10G } cluster { name: my-cluster port: 6222 routes: [ nats-route://nats-1:6222 nats-route://nats-2:6222 ] }
NATS Client (Rust)
use async_nats; #[tokio::main] async fn main() -> Result<(), async_nats::Error> { let client = async_nats::connect("nats://localhost:4222").await?; // Publish client.publish("events.user.created", "user data".into()).await?; // Subscribe let mut subscriber = client.subscribe("events.>").await?; while let Some(message) = subscriber.next().await { println!("Received: {:?}", message); } Ok(()) }
NATS Client (Python)
import asyncio import nats async def main(): nc = await nats.connect("nats://localhost:4222") # Subscribe async def message_handler(msg): print(f"Received: {msg.subject}: {msg.data.decode()}") await nc.subscribe("events.>", cb=message_handler) # Publish await nc.publish("events.user.created", b'{"user_id": 123}') # Request/Reply response = await nc.request("service.ping", b'', timeout=1) print(f"Response: {response.data.decode()}") asyncio.run(main())
Temporal
Core Concepts
| Concept | Description |
|---|---|
| Workflow | Durable, long-running business process |
| Activity | A single unit of work (can fail/retry) |
| Worker | Executes workflows and activities |
| Task Queue | Routes work to workers |
| Signal | External event sent to running workflow |
| Query | Read workflow state without affecting it |
Workflow Definition (Python)
from temporalio import workflow, activity from datetime import timedelta @activity.defn async def send_email(to: str, subject: str) -> str: # Actual email sending logic return f"Email sent to {to}" @activity.defn async def process_payment(order_id: str, amount: float) -> bool: # Payment processing logic return True @workflow.defn class OrderWorkflow: @workflow.run async def run(self, order_id: str) -> str: # Activities with retry policy payment_result = await workflow.execute_activity( process_payment, args=[order_id, 99.99], start_to_close_timeout=timedelta(seconds=30), retry_policy=RetryPolicy(maximum_attempts=3) ) if payment_result: await workflow.execute_activity( send_email, args=["customer@example.com", "Order Confirmed"], start_to_close_timeout=timedelta(seconds=10) ) return f"Order {order_id} completed"
Worker Setup
from temporalio.client import Client from temporalio.worker import Worker async def main(): client = await Client.connect("localhost:7233") worker = Worker( client, task_queue="order-queue", workflows=[OrderWorkflow], activities=[send_email, process_payment] ) await worker.run() asyncio.run(main())
Start Workflow
async def start_order(): client = await Client.connect("localhost:7233") result = await client.execute_workflow( OrderWorkflow.run, "order-123", id="order-workflow-123", task_queue="order-queue" ) print(f"Result: {result}")
Temporal CLI
# Start workflow temporal workflow start \ --task-queue order-queue \ --type OrderWorkflow \ --input '"order-123"' # List workflows temporal workflow list # Describe workflow temporal workflow describe --workflow-id order-workflow-123 # Signal workflow temporal workflow signal \ --workflow-id order-workflow-123 \ --name cancel \ --input '"reason"' # Query workflow temporal workflow query \ --workflow-id order-workflow-123 \ --name status
Event-Driven Patterns
Event Sourcing
from dataclasses import dataclass from typing import List from datetime import datetime @dataclass class Event: id: str timestamp: datetime type: str data: dict class OrderAggregate: def __init__(self, order_id: str): self.id = order_id self.status = "pending" self.items = [] self.events: List[Event] = [] def apply(self, event: Event): if event.type == "OrderCreated": self.status = "created" self.items = event.data["items"] elif event.type == "OrderPaid": self.status = "paid" elif event.type == "OrderShipped": self.status = "shipped" def add_item(self, item: dict): event = Event( id=str(uuid4()), timestamp=datetime.utcnow(), type="ItemAdded", data={"item": item} ) self.events.append(event) self.apply(event)
CQRS Pattern
# Command Handler class CreateOrderCommand: def __init__(self, customer_id: str, items: list): self.customer_id = customer_id self.items = items async def handle_create_order(cmd: CreateOrderCommand): order = Order.create(cmd.customer_id, cmd.items) await event_store.append(order.id, order.events) await nats.publish("orders.created", order.to_json()) # Query Handler (separate read model) async def get_order_summary(order_id: str) -> dict: return await read_db.query( "SELECT * FROM order_summaries WHERE id = $1", order_id )
Saga Pattern
@workflow.defn class OrderSaga: @workflow.run async def run(self, order: dict) -> str: try: # Step 1: Reserve inventory reservation = await workflow.execute_activity( reserve_inventory, args=[order["items"]] ) # Step 2: Process payment payment = await workflow.execute_activity( process_payment, args=[order["total"]] ) # Step 3: Ship order shipping = await workflow.execute_activity( create_shipment, args=[order["address"]] ) return "Order completed" except Exception as e: # Compensating transactions if reservation: await workflow.execute_activity(release_inventory) if payment: await workflow.execute_activity(refund_payment) raise