Claude-skill-registry kratos-events
Implements event-driven architecture with Watermill pub/sub (RabbitMQ/AMQP/message queue) for go-kratos microservices. Creates publisher/subscriber wrappers, event handlers, worker services with context propagation (request_id, correlation_id), and RabbitMQ/AMQP integration. Use when implementing pub/sub patterns, creating message queues, adding event handlers, building worker services for async processing, or setting up background jobs.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kratos-events" ~/.claude/skills/majiayu000-claude-skill-registry-kratos-events && rm -rf "$T"
skills/data/kratos-events/SKILL.md<essential_principles>
How Event-Driven Architecture Works in Kratos
Three-Layer Architecture
1. Platform Layer (
platform/events/)
- Broker-agnostic interfaces (Publisher, Subscriber)
- Allows swapping brokers (AMQP, Redis, Kafka) without changing code
2. Data Layer (
services/{service}/internal/data/mq/)
- Publisher/Subscriber wrappers
- Context propagation and metadata enrichment
- Structured logging with request_id and correlation_id
3. Application Layer (
services/{service}/internal/handlers/)
- Event handlers with business logic
- Delegate to use cases (biz layer)
- Extract context and metadata from messages
Worker Services
Each service can have a companion worker binary:
services/{service}/ ├── cmd/ │ ├── {service}/ # Main HTTP/gRPC service │ └── {service}-worker/ # Event processing worker ├── internal/ │ ├── handlers/ # Event handlers │ ├── worker/ # Watermill router setup │ └── ...
Context Propagation Flow
HTTP Request → Middleware (request_id) → Use Case → Publisher ↓ (message with context + metadata) Subscriber → Handler → Use Case (same request_id in logs)
Best Practice: Always use
logger.WithContext(ctx) for distributed tracing.
</essential_principles>
<intake>
What would you like to do?
- Create publisher/subscriber wrappers for a new service
- Add event handler for existing service
- Set up worker service for async processing
- Publish events from business layer
- View examples and patterns
Wait for response before proceeding. </intake>
<routing> | Response | Workflow | |----------|----------| | 1, "create wrappers", "new service" | `workflows/create-pubsub-wrappers.md` | | 2, "add handler", "handler" | `workflows/add-event-handler.md` | | 3, "worker", "async", "background" | `workflows/setup-worker.md` | | 4, "publish", "emit event" | `workflows/publish-events.md` | | 5, "examples", "patterns", "help" | `workflows/view-examples.md` |After reading the workflow, follow it exactly. </routing>
<quick_start>
1. Create Publisher/Subscriber Wrappers
File:
services/{service}/internal/data/mq/publisher.go
package mq import ( "context" "fmt" "platform/events" middleware2 "platform/middleware" "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/go-kratos/kratos/v2/log" ) const RoutingKey = "routing_key" func NewEventPublisher(pub message.Publisher, logger log.Logger) events.Publisher { return &eventPublisher{ pub: pub, logger: log.NewHelper(logger), } } type eventPublisher struct { pub message.Publisher logger *log.Helper } func (ep *eventPublisher) Unwrap() message.Publisher { return ep.pub } func (ep *eventPublisher) Publish(ctx context.Context, topic string, payload []byte) error { msg := message.NewMessage(watermill.NewUUID(), payload) // Propagate context to subscriber msg.SetContext(ctx) // Set correlation ID if not already set correlationId := middleware.MessageCorrelationID(msg) if correlationId == "" { correlationId = watermill.NewUUID() middleware.SetCorrelationID(correlationId, msg) } // Set routing key SetMessageRoutingKey(topic, msg) // Extract request ID from HTTP context if requestID := extractRequestID(ctx); requestID != "" { msg.Metadata.Set(middleware2.RequestIdKey, requestID) } ep.logger.WithContext(ctx).Infof("Publishing message %s to topic %s, correlation_id: %s", msg.UUID, topic, correlationId) if err := ep.pub.Publish(topic, msg); err != nil { ep.logger.WithContext(ctx).Errorf("Failed to publish message %s to topic %s: %v", msg.UUID, topic, err) return fmt.Errorf("failed to publish message to topic %s: %w", topic, err) } return nil } func SetMessageRoutingKey(key string, msg *message.Message) { if MessageRoutingKey(msg) != "" { return } msg.Metadata.Set(RoutingKey, key) } func MessageRoutingKey(message *message.Message) string { return message.Metadata.Get(RoutingKey) } func extractRequestID(ctx context.Context) string { val := middleware2.RequestID()(ctx) if requestID, ok := val.(string); ok { return requestID } return "" }
File:
services/{service}/internal/data/mq/subscriber.go
package mq import ( "context" "platform/events" "github.com/ThreeDotsLabs/watermill/message" "github.com/go-kratos/kratos/v2/log" ) func NewEventSubscriber(sub message.Subscriber, logger log.Logger) events.Subscriber { return &eventSubscriber{ sub: sub, logger: log.NewHelper(logger), } } type eventSubscriber struct { sub message.Subscriber logger *log.Helper } func (es *eventSubscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error) { es.logger.WithContext(ctx).Infof("Subscribing to topic: %s", topic) messages, err := es.sub.Subscribe(ctx, topic) if err != nil { es.logger.WithContext(ctx).Errorf("Failed to subscribe to topic %s: %v", topic, err) return nil, err } es.logger.WithContext(ctx).Infof("Successfully subscribed to topic: %s", topic) return messages, nil } func (es *eventSubscriber) Close() error { es.logger.Info("Closing event subscriber") if err := es.sub.Close(); err != nil { es.logger.Errorf("Failed to close subscriber: %v", err) return err } es.logger.Info("Event subscriber closed successfully") return nil } func (es *eventSubscriber) Unwrap() message.Subscriber { return es.sub }
2. Create Event Handler
File:
services/{service}/internal/handlers/{event}_handler.go
package handlers import ( "{service}/internal/biz" "github.com/ThreeDotsLabs/watermill/message" "github.com/go-kratos/kratos/v2/log" ) func NewLifecycleEventHandler(symbolUC biz.SymbolUseCase, logger log.Logger) *LifecycleEventHandler { return &LifecycleEventHandler{ logger: log.NewHelper(logger), symbolUC: symbolUC, } } type LifecycleEventHandler struct { logger *log.Helper symbolUC biz.SymbolUseCase } func (h *LifecycleEventHandler) Handle(msg *message.Message) error { // CRITICAL: Extract context from message ctx := msg.Context() // Extract correlation ID for tracing correlationID := msg.Metadata.Get("correlation_id") requestID := msg.Metadata.Get("request_id") h.logger.WithContext(ctx).Infof( "Processing lifecycle event - msgID: %s, correlationID: %s, requestID: %s", msg.UUID, correlationID, requestID, ) // Unmarshal payload var payload EventPayload if err := json.Unmarshal(msg.Payload, &payload); err != nil { h.logger.WithContext(ctx).Errorf("Failed to unmarshal payload: %v", err) return err } // Delegate to business layer if err := h.symbolUC.ProcessLifecycleEvent(ctx, &payload); err != nil { h.logger.WithContext(ctx).Errorf("Failed to process event: %v", err) return err } return nil }
File:
services/{service}/internal/handlers/provider.go
package handlers import "github.com/google/wire" // ProviderSet is handlers providers. var ProviderSet = wire.NewSet( NewLifecycleEventHandler, )
3. Create Worker Service
File:
services/{service}/internal/worker/worker.go
package worker import ( "context" "fmt" "platform/events" platform_logger "platform/logger" conf "symbols/internal/conf/gen" "symbols/internal/handlers" "sync" "time" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/router/middleware" "github.com/ThreeDotsLabs/watermill/message/router/plugin" "github.com/go-kratos/kratos/v2/log" ) type HookFunc func(ctx context.Context) error type Worker interface { Start() HookFunc Stop() HookFunc } func NewWorker(router *message.Router, logger log.Logger) Worker { return &worker{ logger: log.NewHelper(logger), name: "watermill-router", router: router, closeTimeout: 15 * time.Second, done: make(chan struct{}), } } type worker struct { logger *log.Helper name string router *message.Router closeTimeout time.Duration startOnce sync.Once stopOnce sync.Once done chan struct{} err error } func (w *worker) Start() HookFunc { return func(ctx context.Context) error { w.startOnce.Do(func() { go func() { defer close(w.done) w.logger.WithContext(ctx).Infof("Starting router %s", w.name) if err := w.router.Run(ctx); err != nil { w.err = fmt.Errorf("%s: router run: %w", w.name, err) w.logger.WithContext(ctx).Errorf("%v", w.err) } }() }) return nil } } func (w *worker) Stop() HookFunc { return func(ctx context.Context) error { var closeErr error w.stopOnce.Do(func() { stopCtx, cancel := context.WithTimeout(ctx, w.closeTimeout) defer cancel() w.logger.WithContext(ctx).Infof("Closing router %s", w.name) errCh := make(chan error, 1) go func() { errCh <- w.router.Close() }() select { case <-stopCtx.Done(): w.logger.WithContext(ctx).Errorf("Shutting down %s (timeout: %v)", w.name, w.closeTimeout) closeErr = fmt.Errorf("%s: router close timeout: %w", w.name, stopCtx.Err()) case err := <-errCh: if err != nil { w.logger.WithContext(ctx).Errorf("%s: router failed to gracefully close: %v", w.name, err) closeErr = fmt.Errorf("%s: router close: %w", w.name, err) } } }) <-w.done if closeErr != nil && w.err == nil { w.err = closeErr } return w.err } } func NewRouter(cfg *conf.Data, lifecycleHandler *handlers.LifecycleEventHandler, eventPub events.Publisher, eventSub events.Subscriber, logger *platform_logger.WatermillLogger) *message.Router { router, err := message.NewRouter(message.RouterConfig{}, logger) if err != nil { panic(err) } // SignalsHandler will gracefully shut down Router when SIGTERM is received router.AddPlugin(plugin.SignalsHandler) // Router level middleware is executed for every message sent to the router router.AddMiddleware( // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages middleware.CorrelationID, // The handler function is retried if it returns an error middleware.Retry{ MaxRetries: 3, InitialInterval: time.Millisecond * 100, Logger: logger, }.Middleware, // Recoverer handles panics from handlers middleware.Recoverer, ) // Add handlers for specific topics router.AddConsumerHandler("events", cfg.Mq.Exchange.Name, eventSub.Unwrap(), lifecycleHandler.Handle) return router }
File:
services/{service}/cmd/{service}-worker/main.go
package main import ( "flag" "os" p "platform/logger" "{service}/internal/conf/gen" "{service}/internal/worker" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config/env" "github.com/go-kratos/kratos/v2/config/file" "github.com/go-kratos/kratos/v2/log" _ "go.uber.org/automaxprocs" ) var ( Name string = "{service}-worker" Version string = "1.0" configFile string id, _ = os.Hostname() ) func init() { flag.StringVar(&configFile, "conf", "configs/config.yaml", "config path, eg: --conf config.yaml") } func newApp(worker worker.Worker, logger log.Logger) *kratos.App { return kratos.New( kratos.ID(id), kratos.Name(Name), kratos.Version(Version), kratos.Metadata(map[string]string{}), kratos.Logger(logger), kratos.BeforeStart(worker.Start()), kratos.AfterStop(worker.Stop()), ) } func main() { flag.Parse() c := config.New( config.WithSource( env.NewSource(), file.NewSource(configFile), ), ) defer c.Close() if err := c.Load(); err != nil { panic(err) } var bc conf.Bootstrap if err := c.Scan(&bc); err != nil { panic(err) } if err := bc.Validate(); err != nil { panic(err) } logger := p.NewLogger(bc.Log.Level, id, Name, Version) app, cleanup, err := wireApp(bc.Server, bc.Data, bc.Log, logger) if err != nil { panic(err) } defer cleanup() if err := app.Run(); err != nil { panic(err) } }
4. Publish Events from Business Layer
File:
services/{service}/internal/biz/{entity}.go
func (uc *symbolUseCase) CreateSymbol(ctx context.Context, symbol *Symbol) (*Symbol, error) { // Validate if err := uc.validator.Struct(symbol); err != nil { return nil, err } // Create in database result, err := uc.repo.Create(ctx, symbol) if err != nil { return nil, err } // Publish event (if publisher configured) if uc.pub != nil { payload, _ := json.Marshal(map[string]interface{}{ "symbol_id": result.Id, "action": "created", }) // Context is automatically propagated with request_id if err := uc.pub.Publish(ctx, "symbols.created", payload); err != nil { uc.log.WithContext(ctx).Errorf("Failed to publish event: %v", err) // Don't fail the operation if event publishing fails } } return result, nil }
</quick_start>
<critical_patterns>
Context Propagation
ALWAYS extract context from message:
func (h *Handler) Handle(msg *message.Message) error { ctx := msg.Context() // CRITICAL - preserves request_id chain h.logger.WithContext(ctx).Infof("Processing...") }
ALWAYS use WithContext for logging:
h.logger.WithContext(ctx).Infof("...") // ✅ Includes request_id h.logger.Infof("...") // ❌ No request_id
Metadata Extraction
correlationID := msg.Metadata.Get("correlation_id") requestID := msg.Metadata.Get("request_id")
Error Handling in Handlers
func (h *Handler) Handle(msg *message.Message) error { // Return error to trigger retry (configured in middleware) if err := h.process(msg); err != nil { h.logger.WithContext(ctx).Errorf("Processing failed: %v", err) return err // Watermill will retry based on middleware config } // Return nil to ACK message return nil }
Unwrap Pattern
Use
Unwrap() to access underlying Watermill publisher/subscriber for router:
router.AddConsumerHandler("name", topic, eventSub.Unwrap(), handler.Handle)
</critical_patterns>
<file_structure>
services/{service}/ ├── cmd/ │ ├── {service}/ # Main service │ └── {service}-worker/ # Worker service │ ├── main.go │ ├── wire.go │ └── wire_gen.go ├── internal/ │ ├── data/ │ │ └── mq/ │ │ ├── publisher.go │ │ └── subscriber.go │ ├── handlers/ # Event handlers │ │ ├── provider.go │ │ └── lifecycle.go │ └── worker/ # Worker setup │ └── worker.go
</file_structure>
<wire_integration>
Adding to Wire
File:
services/{service}/internal/data/data.go
var ProviderSet = wire.NewSet( NewData, repo.NewSymbolRepo, mq.NewEventPublisher, // Add publisher mq.NewEventSubscriber, // Add subscriber )
File:
services/{service}/internal/handlers/provider.go
var ProviderSet = wire.NewSet( NewLifecycleEventHandler, )
File:
services/{service}/internal/worker/provider.go
var ProviderSet = wire.NewSet( NewWorker, NewRouter, )
File:
services/{service}/cmd/{service}-worker/wire.go
//go:build wireinject func wireApp( *conf.Server, *conf.Data, *conf.Log, log.Logger, ) (*kratos.App, func(), error) { wire.Build( worker.ProviderSet, handlers.ProviderSet, biz.ProviderSet, data.ProviderSet, newApp, ) return &kratos.App{}, nil, nil }
After adding providers, run:
cd services/{service} make generate
</wire_integration>
<validation_checklist>
- Publisher wrapper enriches messages with context, correlation_id, request_id
- Subscriber wrapper provides lifecycle management
- Event handlers extract
ctx := msg.Context() - Event handlers use
logger.WithContext(ctx) - Event handlers delegate to biz layer (not data layer)
- Worker service uses graceful shutdown with timeout
- Router configured with middleware (CorrelationID, Retry, Recoverer)
- Unwrap() used for router integration
- Wire ProviderSets updated and regenerated
- Worker binary created in cmd/{service}-worker/ </validation_checklist>
<anti_patterns> ❌ DON'T:
- Log without context:
h.logger.Infof("...") - Ignore message context: process payload without extracting ctx
- Put business logic in handlers (delegate to use cases)
- Fail operations if event publishing fails
- Skip correlation ID propagation
- Use blocking operations in publisher without timeout
✅ DO:
- Always extract context:
ctx := msg.Context() - Always log with context:
h.logger.WithContext(ctx) - Delegate to use cases from handlers
- Make event publishing non-blocking (don't fail if publish fails)
- Use middleware.CorrelationID in router
- Set reasonable timeouts for worker shutdown </anti_patterns>
<success_criteria> Event-driven architecture is correctly implemented when:
- Publisher/subscriber wrappers created in
internal/data/mq/ - Event handlers created in
internal/handlers/ - Worker service created in
cmd/{service}-worker/ - Context propagation works end-to-end (request_id visible in all logs)
- Correlation IDs tracked across service boundaries
- Handlers delegate to business layer (biz)
- Router configured with retry and recovery middleware
- Wire dependencies configured and regenerated
- Worker gracefully shuts down on SIGTERM
- All logging uses
for distributed tracingWithContext(ctx)
Verification:
- Publish event from HTTP request, verify request_id appears in worker logs
- Check correlation_id is consistent across publisher and subscriber
- Test graceful shutdown with SIGTERM
- Verify retry middleware works on handler errors </success_criteria>
<reference_docs> For detailed architecture documentation, see:
- Complete pub/sub implementation guidedocs/pubsub-architecture.md
- Platform interfacesplatform/events/events.go- CLAUDE.md - Event-driven architecture overview </reference_docs>