Claude-skill-registry-data Messaging & Event Systems

Messaging and event-driven architecture. Activate when: (1) Working with NATS pub/sub, (2) Configuring Temporal workflows, (3) Implementing event sourcing, (4) Setting up message queues, or (5) Designing async communication patterns.

install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry-data
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry-data "$T" && mkdir -p ~/.claude/skills && cp -r "$T/data/messaging" ~/.claude/skills/majiayu000-claude-skill-registry-data-messaging-event-systems && rm -rf "$T"
manifest: data/messaging/SKILL.md
source content

Messaging & Event Systems

Overview

This skill covers messaging systems (NATS), workflow orchestration (Temporal), and event-driven architecture patterns.

NATS

Core Concepts

ConceptDescription
SubjectMessage address/topic (e.g.,
orders.created
)
PublisherSends messages to subjects
SubscriberReceives messages from subjects
Queue GroupLoad-balanced message distribution
JetStreamPersistence and streaming layer
KV StoreKey-value storage built on JetStream

NATS CLI

# Connect
nats context add local --server nats://localhost:4222
nats context select local

# Pub/Sub
nats pub orders.created '{"id": 123}'
nats sub 'orders.>'                    # Wildcard subscription

# Request/Reply
nats reply 'service.ping' 'pong'       # In terminal 1
nats request 'service.ping' ''         # In terminal 2

# JetStream
nats stream add ORDERS --subjects "orders.>" --retention limits
nats stream info ORDERS
nats consumer add ORDERS processor --pull --ack explicit

# KV Store
nats kv add CONFIG
nats kv put CONFIG app.setting "value"
nats kv get CONFIG app.setting
nats kv watch CONFIG

NATS Server Configuration

# nats.conf
port: 4222

jetstream {
  store_dir: /data/jetstream
  max_mem: 1G
  max_file: 10G
}

cluster {
  name: my-cluster
  port: 6222
  routes: [
    nats-route://nats-1:6222
    nats-route://nats-2:6222
  ]
}

NATS Client (Rust)

use async_nats;

#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
    let client = async_nats::connect("nats://localhost:4222").await?;

    // Publish
    client.publish("events.user.created", "user data".into()).await?;

    // Subscribe
    let mut subscriber = client.subscribe("events.>").await?;
    while let Some(message) = subscriber.next().await {
        println!("Received: {:?}", message);
    }

    Ok(())
}

NATS Client (Python)

import asyncio
import nats

async def main():
    nc = await nats.connect("nats://localhost:4222")

    # Subscribe
    async def message_handler(msg):
        print(f"Received: {msg.subject}: {msg.data.decode()}")

    await nc.subscribe("events.>", cb=message_handler)

    # Publish
    await nc.publish("events.user.created", b'{"user_id": 123}')

    # Request/Reply
    response = await nc.request("service.ping", b'', timeout=1)
    print(f"Response: {response.data.decode()}")

asyncio.run(main())

Temporal

Core Concepts

ConceptDescription
WorkflowDurable, long-running business process
ActivityA single unit of work (can fail/retry)
WorkerExecutes workflows and activities
Task QueueRoutes work to workers
SignalExternal event sent to running workflow
QueryRead workflow state without affecting it

Workflow Definition (Python)

from temporalio import workflow, activity
from datetime import timedelta

@activity.defn
async def send_email(to: str, subject: str) -> str:
    # Actual email sending logic
    return f"Email sent to {to}"

@activity.defn
async def process_payment(order_id: str, amount: float) -> bool:
    # Payment processing logic
    return True

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str) -> str:
        # Activities with retry policy
        payment_result = await workflow.execute_activity(
            process_payment,
            args=[order_id, 99.99],
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3)
        )

        if payment_result:
            await workflow.execute_activity(
                send_email,
                args=["customer@example.com", "Order Confirmed"],
                start_to_close_timeout=timedelta(seconds=10)
            )

        return f"Order {order_id} completed"

Worker Setup

from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")

    worker = Worker(
        client,
        task_queue="order-queue",
        workflows=[OrderWorkflow],
        activities=[send_email, process_payment]
    )

    await worker.run()

asyncio.run(main())

Start Workflow

async def start_order():
    client = await Client.connect("localhost:7233")

    result = await client.execute_workflow(
        OrderWorkflow.run,
        "order-123",
        id="order-workflow-123",
        task_queue="order-queue"
    )

    print(f"Result: {result}")

Temporal CLI

# Start workflow
temporal workflow start \
  --task-queue order-queue \
  --type OrderWorkflow \
  --input '"order-123"'

# List workflows
temporal workflow list

# Describe workflow
temporal workflow describe --workflow-id order-workflow-123

# Signal workflow
temporal workflow signal \
  --workflow-id order-workflow-123 \
  --name cancel \
  --input '"reason"'

# Query workflow
temporal workflow query \
  --workflow-id order-workflow-123 \
  --name status

Event-Driven Patterns

Event Sourcing

from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Event:
    id: str
    timestamp: datetime
    type: str
    data: dict

class OrderAggregate:
    def __init__(self, order_id: str):
        self.id = order_id
        self.status = "pending"
        self.items = []
        self.events: List[Event] = []

    def apply(self, event: Event):
        if event.type == "OrderCreated":
            self.status = "created"
            self.items = event.data["items"]
        elif event.type == "OrderPaid":
            self.status = "paid"
        elif event.type == "OrderShipped":
            self.status = "shipped"

    def add_item(self, item: dict):
        event = Event(
            id=str(uuid4()),
            timestamp=datetime.utcnow(),
            type="ItemAdded",
            data={"item": item}
        )
        self.events.append(event)
        self.apply(event)

CQRS Pattern

# Command Handler
class CreateOrderCommand:
    def __init__(self, customer_id: str, items: list):
        self.customer_id = customer_id
        self.items = items

async def handle_create_order(cmd: CreateOrderCommand):
    order = Order.create(cmd.customer_id, cmd.items)
    await event_store.append(order.id, order.events)
    await nats.publish("orders.created", order.to_json())

# Query Handler (separate read model)
async def get_order_summary(order_id: str) -> dict:
    return await read_db.query(
        "SELECT * FROM order_summaries WHERE id = $1",
        order_id
    )

Saga Pattern

@workflow.defn
class OrderSaga:
    @workflow.run
    async def run(self, order: dict) -> str:
        try:
            # Step 1: Reserve inventory
            reservation = await workflow.execute_activity(
                reserve_inventory, args=[order["items"]]
            )

            # Step 2: Process payment
            payment = await workflow.execute_activity(
                process_payment, args=[order["total"]]
            )

            # Step 3: Ship order
            shipping = await workflow.execute_activity(
                create_shipment, args=[order["address"]]
            )

            return "Order completed"

        except Exception as e:
            # Compensating transactions
            if reservation:
                await workflow.execute_activity(release_inventory)
            if payment:
                await workflow.execute_activity(refund_payment)
            raise

External Links