Claude-skill-registry implement-event-sourcing
Step-by-step guide for implementing event sourcing with event store, aggregate replay, snapshots, projections, and schema evolution.
install
source · Clone the upstream repo
git clone https://github.com/majiayu000/claude-skill-registry
Claude Code · Install into ~/.claude/skills/
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/implement-event-sourcing" ~/.claude/skills/majiayu000-claude-skill-registry-implement-event-sourcing && rm -rf "$T"
manifest:
skills/data/implement-event-sourcing/SKILL.mdsource content
Skill: Implement Event Sourcing
This skill teaches event sourcing following patterns. Event sourcing stores all state changes as an append-only log of immutable events - current state is computed by replaying events. History is never lost, and projections can be rebuilt anytime.
Prerequisites
- Understanding of DDD aggregates and Clean Architecture
- Knowledge of the Aggregate Root Pattern and consistency boundaries
- Familiarity with event-driven architecture principles
- Understanding of CQRS pattern and read/write model separation
- Database configured for event storage
Overview
- Design event stream structure
- Create event store (stream ID as PK, version as SK)
- Implement aggregate with event replay
- Add snapshots for performance
- Build projections from events
- Handle event versioning (schema evolution)
- Test replay and projections
Step 1: Design the Event Stream Structure
// Pattern: Domain Event Interface // All domain events must implement this interface INTERFACE Event METHOD EventID() RETURNS String METHOD EventType() RETURNS String METHOD AggregateID() RETURNS String METHOD AggregateType() RETURNS String METHOD Sequence() RETURNS Integer METHOD OccurredAt() RETURNS Timestamp METHOD SchemaVersion() RETURNS String END INTERFACE // Pattern: Base Event with common fields // Embed in concrete events to provide common functionality TYPE BaseEvent id: String type: String aggregateId: String aggregateType: String sequence: Integer occurred: Timestamp version: String correlationId: String METHOD BaseEvent.EventID() RETURNS String RETURN this.id END METHOD METHOD BaseEvent.EventType() RETURNS String RETURN this.type END METHOD METHOD BaseEvent.AggregateID() RETURNS String RETURN this.aggregateId END METHOD METHOD BaseEvent.AggregateType() RETURNS String RETURN this.aggregateType END METHOD METHOD BaseEvent.Sequence() RETURNS Integer RETURN this.sequence END METHOD METHOD BaseEvent.OccurredAt() RETURNS Timestamp RETURN this.occurred END METHOD METHOD BaseEvent.SchemaVersion() RETURNS String RETURN this.version END METHOD CONSTRUCTOR NewBaseEvent(eventType: String, aggregateId: String, aggregateType: String, seq: Integer) RETURNS BaseEvent RETURN BaseEvent{ id: GenerateUUID(), type: eventType, aggregateId: aggregateId, aggregateType: aggregateType, sequence: seq, occurred: CurrentTimestamp(), version: "1.0.0" } END CONSTRUCTOR
Stored Event Envelope
// Pattern: Event Envelope separates routing from domain data // StoredEvent is the persistence format TYPE StoredEvent eventId: String eventType: String aggregateId: String aggregateType: String sequence: Integer occurredAt: Timestamp schemaVersion: String payload: RawJSON metadata: EventMetadata TYPE EventMetadata correlationId: String causationId: String userId: String
Step 2: Create the Event Store
// Pattern: Event Store Port (Repository Pattern) // Interface for event persistence - adapters implement this INTERFACE EventStore METHOD Append(ctx: Context, events: List<StoredEvent>, expectedVersion: Integer) RETURNS Result<Void, Error> METHOD Load(ctx: Context, aggregateId: String, aggregateType: String, fromSequence: Integer) RETURNS Result<List<StoredEvent>, Error> METHOD LoadSnapshot(ctx: Context, aggregateId: String, aggregateType: String) RETURNS Result<Snapshot, Error> METHOD SaveSnapshot(ctx: Context, snapshot: Snapshot) RETURNS Result<Void, Error> END INTERFACE // Pattern: Optimistic Concurrency Control CONSTANT ErrOptimisticConcurrency = Error("optimistic concurrency violation") // Pattern: Single-Table Design for Event Store // Data model: PK=STREAM#{type}#{id}, SK=SEQ#{sequence} TYPE EventStoreImpl client: DataStoreClient tableName: String TYPE eventItem pk: String // STREAM#{type}#{id} sk: String // SEQ#{sequence} eventId: String eventType: String aggregateId: String aggregateType: String sequence: Integer occurredAt: String schemaVersion: String payload: String METHOD EventStoreImpl.Append(ctx: Context, events: List<StoredEvent>, expectedVersion: Integer) RETURNS Result<Void, Error> IF events.Length() == 0 THEN RETURN Ok() END IF transactItems = [] FOR EACH event IN events DO item = eventItem{ pk: "STREAM#" + event.aggregateType + "#" + event.aggregateId, sk: FormatPadded("SEQ#%020d", event.sequence), eventId: event.eventId, eventType: event.eventType, aggregateId: event.aggregateId, aggregateType: event.aggregateType, sequence: event.sequence, occurredAt: FormatTimestamp(event.occurredAt), schemaVersion: event.schemaVersion, payload: ToString(event.payload) } // Pattern: Conditional Write for Optimistic Concurrency transactItems.append(TransactWriteItem{ put: PutItem{ tableName: this.tableName, item: item, conditionExpression: "attribute_not_exists(PK)" } }) END FOR result = this.client.TransactWriteItems(ctx, transactItems) IF result.IsError() THEN IF result.Error() IS TransactionCanceledException THEN RETURN ErrOptimisticConcurrency END IF RETURN Error("transact write: " + result.Error()) END IF RETURN Ok() END METHOD METHOD EventStoreImpl.Load(ctx: Context, aggregateId: String, aggregateType: String, fromSequence: Integer) RETURNS Result<List<StoredEvent>, Error> pk = "STREAM#" + aggregateType + "#" + aggregateId result = this.client.Query(ctx, QueryInput{ tableName: this.tableName, keyConditionExpression: "PK = :pk AND SK >= :sk", expressionAttributeValues: { ":pk": pk, ":sk": FormatPadded("SEQ#%020d", fromSequence) }, scanIndexForward: true }) IF result.IsError() THEN RETURN Error(result.Error()) END IF events = [] FOR EACH item IN result.Value().Items DO ei = UnmarshalEventItem(item) occurredAt = ParseTimestamp(ei.occurredAt) events.append(StoredEvent{ eventId: ei.eventId, eventType: ei.eventType, aggregateId: ei.aggregateId, aggregateType: ei.aggregateType, sequence: ei.sequence, occurredAt: occurredAt, schemaVersion: ei.schemaVersion, payload: RawJSON(ei.payload) }) END FOR RETURN Ok(events) END METHOD
Step 3: Implement Aggregate with Event Replay
// Pattern: Aggregate Root Base Type // Base type for event-sourced aggregates TYPE AggregateRoot id: String aggregateType: String version: Integer uncommittedEvents: List<Event> METHOD AggregateRoot.ID() RETURNS String RETURN this.id END METHOD METHOD AggregateRoot.Version() RETURNS Integer RETURN this.version END METHOD METHOD AggregateRoot.SetVersion(v: Integer) this.version = v END METHOD METHOD AggregateRoot.UncommittedEvents() RETURNS List<Event> RETURN this.uncommittedEvents END METHOD METHOD AggregateRoot.ClearUncommittedEvents() this.uncommittedEvents = [] END METHOD METHOD AggregateRoot.Raise(event: Event) this.uncommittedEvents.append(event) this.version = event.Sequence() END METHOD
Concrete Aggregate
// Pattern: Enumeration as Value Object TYPE OrderStatus CONSTANT Pending = "pending" CONSTANT Confirmed = "confirmed" // Pattern: Event-Sourced Aggregate TYPE Order AggregateRoot // Embedded base type customerId: String items: List<OrderItem> status: OrderStatus // Pattern: Value Object TYPE OrderItem productId: String quantity: Integer unitPrice: Money TYPE Money amount: Integer currency: String // Pattern: Domain Events as Past-Tense Facts TYPE OrderCreated EXTENDS BaseEvent customerId: String items: List<OrderItem> totalAmount: Money TYPE OrderConfirmed EXTENDS BaseEvent // No additional fields // Pattern: Factory Constructor with Validation CONSTRUCTOR NewOrder(id: String, customerId: String, items: List<OrderItem>) RETURNS Result<Order, Error> IF customerId == "" OR items.Length() == 0 THEN RETURN Error("invalid order") END IF order = Order{} event = NewOrderCreated(id, customerId, items) order.Apply(event) order.Raise(event) RETURN Ok(order) END CONSTRUCTOR // Pattern: Apply Method - Pure State Reconstruction // Apply rebuilds state from event - pure, no side effects METHOD Order.Apply(event: Event) MATCH event TYPE CASE OrderCreated: e = event AS OrderCreated this.id = e.AggregateID() this.customerId = e.customerId this.items = e.items this.status = OrderStatus.Pending this.aggregateType = "Order" CASE OrderConfirmed: this.status = OrderStatus.Confirmed END MATCH this.SetVersion(event.Sequence()) END METHOD // Pattern: Command Method - Validate Then Raise METHOD Order.Confirm() RETURNS Result<Void, Error> IF this.status != OrderStatus.Pending THEN RETURN Error("invalid status") END IF event = NewOrderConfirmed(this.ID(), this.Version() + 1) this.Apply(event) this.Raise(event) RETURN Ok() END METHOD // Pattern: Aggregate Rehydration from Event History FUNCTION LoadFromHistory(events: List<StoredEvent>) RETURNS Result<Order, Error> IF events.Length() == 0 THEN RETURN Error("no events") END IF order = Order{} FOR EACH stored IN events DO eventResult = deserializeEvent(stored) IF eventResult.IsError() THEN RETURN Error(eventResult.Error()) END IF order.Apply(eventResult.Value()) END FOR RETURN Ok(order) END FUNCTION // Pattern: Event Deserialization Registry FUNCTION deserializeEvent(stored: StoredEvent) RETURNS Result<Event, Error> MATCH stored.eventType CASE "order.created": e = DeserializeJSON<OrderCreated>(stored.payload) e.BaseEvent = NewBaseEvent(stored.eventType, stored.aggregateId, stored.aggregateType, stored.sequence) RETURN Ok(e) CASE "order.confirmed": e = DeserializeJSON<OrderConfirmed>(stored.payload) e.BaseEvent = NewBaseEvent(stored.eventType, stored.aggregateId, stored.aggregateType, stored.sequence) RETURN Ok(e) DEFAULT: RETURN Error("unknown type: " + stored.eventType) END MATCH END FUNCTION
Step 4: Create Snapshots for Performance
// Pattern: Snapshot for Performance Optimization // Snapshot stores aggregate state at a point in time TYPE Snapshot aggregateId: String aggregateType: String version: Integer state: RawJSON createdAt: Timestamp // Pattern: Snapshotter Interface // Interface for aggregates that support snapshots INTERFACE Snapshotter METHOD ToSnapshot() RETURNS Result<RawJSON, Error> METHOD FromSnapshot(state: RawJSON, version: Integer) RETURNS Result<Void, Error> END INTERFACE // Pattern: Internal Snapshot State TYPE orderSnapshot customerId: String items: List<OrderItem> status: String METHOD Order.ToSnapshot() RETURNS Result<RawJSON, Error> snap = orderSnapshot{ customerId: this.customerId, items: this.items, status: this.status } RETURN SerializeJSON(snap) END METHOD METHOD Order.FromSnapshot(state: RawJSON, version: Integer) RETURNS Result<Void, Error> snap = DeserializeJSON<orderSnapshot>(state) IF snap.IsError() THEN RETURN Error(snap.Error()) END IF this.customerId = snap.customerId this.items = snap.items this.status = OrderStatus(snap.status) this.SetVersion(version) RETURN Ok() END METHOD
Snapshot-Aware Repository
// Pattern: Snapshot-Aware Repository // Loads from snapshot then replays remaining events TYPE OrderRepository eventStore: EventStore METHOD OrderRepository.Load(ctx: Context, orderId: String) RETURNS Result<Order, Error> // Step 1: Try to load snapshot snapshotResult = this.eventStore.LoadSnapshot(ctx, orderId, "Order") order = NULL fromSequence = 0 IF snapshotResult.IsOk() AND snapshotResult.Value() != NULL THEN snapshot = snapshotResult.Value() order = Order{} order.FromSnapshot(snapshot.state, snapshot.version) fromSequence = snapshot.version + 1 END IF // Step 2: Load events after snapshot eventsResult = this.eventStore.Load(ctx, orderId, "Order", fromSequence) IF eventsResult.IsError() THEN RETURN Error(eventsResult.Error()) END IF events = eventsResult.Value() // Step 3: Handle not found case IF order == NULL AND events.Length() == 0 THEN RETURN Error("order not found") END IF // Step 4: Rebuild from history if no snapshot IF order == NULL THEN RETURN LoadFromHistory(events) END IF // Step 5: Replay events after snapshot FOR EACH e IN events DO eventResult = deserializeEvent(e) IF eventResult.IsError() THEN RETURN Error(eventResult.Error()) END IF order.Apply(eventResult.Value()) END FOR RETURN Ok(order) END METHOD
Step 5: Build Projections from Events
// Pattern: Read Model / Projection // Optimized view for queries TYPE OrderSummaryView orderId: String customerId: String status: String totalAmount: Integer itemCount: Integer gsi1pk: String // CUSTOMER#{id} gsi1sk: String // ORDER#{time} // Pattern: Projection Handler // Processes events to update read model TYPE OrderSummaryProjection client: DataStoreClient tableName: String METHOD OrderSummaryProjection.Handle(ctx: Context, stored: StoredEvent) RETURNS Result<Void, Error> MATCH stored.eventType CASE "order.created": payload = DeserializeJSON<OrderCreatedPayload>(stored.payload) view = OrderSummaryView{ orderId: stored.aggregateId, customerId: payload.customerId, status: "pending", totalAmount: payload.totalAmount, itemCount: payload.items.Length(), gsi1pk: "CUSTOMER#" + payload.customerId, gsi1sk: "ORDER#" + FormatTimestamp(stored.occurredAt) } this.client.PutItem(ctx, PutItemInput{ tableName: this.tableName, item: view }) CASE "order.confirmed": // Update status field this.client.UpdateItem(ctx, UpdateItemInput{ tableName: this.tableName, key: {orderId: stored.aggregateId}, updateExpression: "SET #status = :status", expressionAttributeNames: {"#status": "status"}, expressionAttributeValues: {":status": "confirmed"} }) END MATCH RETURN Ok() END METHOD // Pattern: Projection Rebuild // Rebuild entire projection from event log METHOD OrderSummaryProjection.Rebuild(ctx: Context, eventStore: EventStoreWithLoadAll) RETURNS Result<Void, Error> eventsResult = eventStore.LoadAll(ctx, "Order") IF eventsResult.IsError() THEN RETURN Error(eventsResult.Error()) END IF FOR EACH event IN eventsResult.Value() DO this.Handle(ctx, event) END FOR RETURN Ok() END METHOD
Step 6: Handle Event Versioning
// Pattern: Event Upcaster // Transforms old event versions to current version INTERFACE EventUpcaster METHOD CanUpcast(eventType: String, version: String) RETURNS Boolean METHOD Upcast(payload: RawJSON, fromVersion: String) RETURNS Result<Tuple<RawJSON, String>, Error> END INTERFACE // Pattern: Concrete Upcaster for Schema Evolution // Upcaster for OrderCreated v1 to v2 TYPE OrderCreatedUpcaster METHOD OrderCreatedUpcaster.CanUpcast(eventType: String, version: String) RETURNS Boolean RETURN eventType == "order.created" AND version == "1.0.0" END METHOD // v1: "price" field -> v2: "unit_price" with currency METHOD OrderCreatedUpcaster.Upcast(payload: RawJSON, fromVersion: String) RETURNS Result<Tuple<RawJSON, String>, Error> v1 = DeserializeJSON<OrderCreatedV1>(payload) v2 = OrderCreatedV2{ customerId: v1.customerId, items: [], totalAmount: Money{amount: v1.totalCents, currency: "USD"} } FOR EACH item IN v1.items DO v2.items.append(OrderItem{ productId: item.productId, quantity: item.quantity, unitPrice: Money{amount: item.price, currency: "USD"} }) END FOR result = SerializeJSON(v2) RETURN Ok(Tuple(result, "2.0.0")) END METHOD // Pattern: Upcaster Chain // Applies upcasters in sequence until current version TYPE UpcasterChain upcasters: List<EventUpcaster> METHOD UpcasterChain.Upcast(eventType: String, payload: RawJSON, version: String) RETURNS Result<Tuple<RawJSON, String>, Error> currentPayload = payload currentVersion = version FOR EACH u IN this.upcasters DO IF u.CanUpcast(eventType, currentVersion) THEN result = u.Upcast(currentPayload, currentVersion) IF result.IsError() THEN RETURN Error(result.Error()) END IF currentPayload, currentVersion = result.Value() END IF END FOR RETURN Ok(Tuple(currentPayload, currentVersion)) END METHOD
Step 7: Test Replay and Projections
// Test: Load order from history TEST Order_LoadFromHistory events = [ StoredEvent{ eventId: "evt-1", eventType: "order.created", aggregateId: "order-123", sequence: 1, payload: RawJSON('{"customer_id":"cust-1","items":[{"product_id":"prod-1","quantity":2}]}') }, StoredEvent{ eventId: "evt-2", eventType: "order.confirmed", aggregateId: "order-123", sequence: 2, payload: RawJSON('{}') } ] orderResult = LoadFromHistory(events) ASSERT orderResult.IsOk() order = orderResult.Value() ASSERT order.ID() == "order-123" ASSERT order.Status() == OrderStatus.Confirmed ASSERT order.Version() == 2 END TEST // Test: Snapshot round trip TEST Order_SnapshotRoundTrip items = [OrderItem{ productId: "prod-1", quantity: 2, unitPrice: Money{amount: 1000, currency: "USD"} }] originalResult = NewOrder("order-1", "cust-1", items) original = originalResult.Value() original.Confirm() stateResult = original.ToSnapshot() state = stateResult.Value() restored = Order{} restored.FromSnapshot(state, original.Version()) ASSERT original.Status() == restored.Status() END TEST // Test: Projection rebuild TEST Projection_Rebuild store = InMemoryEventStore.New() store.Append(ctx, [ createOrderEvent("order-1", "cust-1"), confirmOrderEvent("order-1") ], 0) projection = NewOrderSummaryProjection(testDB, "projections") projection.Rebuild(ctx, store) summary = projection.Get(ctx, "order-1") ASSERT summary.status == "confirmed" END TEST
Verification Checklist
After implementing event sourcing, verify:
- Events are immutable facts (past tense, complete data)
- Event store enforces append-only with optimistic concurrency
- Aggregates rebuild from event replay
- Apply method is pure (no side effects, no validation)
- Command methods validate first, then raise events
- Snapshots are optimization, not required for correctness
- Projections rebuild from event log
- Event versioning uses upcasters - never modify stored events
- Sequence numbers ensure ordering
- Events include schema version
- Tests verify replay and projection consistency