Claude-skill-registry composable-rust-sagas
Expert knowledge for implementing distributed sagas in Composable Rust. Use when coordinating multiple aggregates in distributed transactions, implementing compensation logic or rollback flows, working with EventBus trait or Redpanda integration, designing saga state machines, or questions about eventual consistency and distributed transaction patterns.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/composable-rust-sagas" ~/.claude/skills/majiayu000-claude-skill-registry-composable-rust-sagas && rm -rf "$T"
skills/data/composable-rust-sagas/SKILL.mdComposable Rust Sagas Expert
Expert knowledge for implementing distributed sagas in Composable Rust - multi-aggregate coordination, compensation logic, state machines, event bus patterns, and orchestration vs choreography.
When to Use This Skill
Automatically apply when:
- Coordinating multiple aggregates in a distributed transaction
- Implementing compensation logic or rollback flows
- Working with
trait or Redpanda integrationEventBus - Designing saga state machines
- Questions about eventual consistency or distributed transactions
- Debugging saga failures or compensation flows
Saga Pattern Fundamentals
What is a Saga?
A saga is a sequence of local transactions across multiple aggregates, where each transaction publishes events. If a step fails, execute compensating transactions to undo completed work.
Success flow: Order → Payment → Inventory → Shipping → ✅ Complete Failure flow (payment fails): Order → Payment ❌ → Compensate Order → ✅ Rolled back
Why Sagas?
Problem: You can't use distributed transactions (2PC) because:
- High latency and contention
- Poor availability (all participants must be up)
- Doesn't scale across services/databases
Solution: Saga pattern with eventual consistency:
- Each aggregate commits independently
- If failure occurs, compensate completed steps
- Eventually consistent (all aggregates converge to correct state)
Saga vs 2PC
| Aspect | 2PC (Distributed Transaction) | Saga |
|---|---|---|
| Consistency | Strong (ACID) | Eventual |
| Availability | Low (blocks on coordinator) | High (no blocking) |
| Latency | High (2 phases, locks) | Low (async events) |
| Failure handling | Rollback (automatic) | Compensate (manual) |
| Scalability | Poor (locks, contention) | Good (no locks) |
Saga as Reducer (Core Pattern)
Key insight: A saga is just a reducer with a state machine.
Saga State Pattern
#[derive(Debug, Clone, Serialize, Deserialize)] pub struct CheckoutSagaState { pub checkout_id: String, pub current_step: SagaStep, pub completed_steps: Vec<SagaStep>, // IDs for compensation pub order_id: Option<String>, pub payment_id: Option<String>, pub reservation_id: Option<String>, // Data pub customer_id: String, pub items: Vec<Item>, pub amount: Decimal, // Tracking pub started_at: DateTime<Utc>, pub retry_count: u32, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub enum SagaStep { NotStarted, CreatingOrder, ProcessingPayment, ReservingInventory, Completed, Compensating { failed_at: Box<SagaStep> }, Failed, }
Pattern: Track current step, completed steps, and IDs needed for compensation.
Saga Action Pattern
#[derive(Debug, Clone, Serialize, Deserialize)] pub enum CheckoutSagaAction { // Initiating command StartCheckout { customer_id: String, items: Vec<Item>, amount: Decimal, }, // Success events from aggregates OrderCreated { order_id: String }, PaymentProcessed { payment_id: String }, InventoryReserved { reservation_id: String }, // Failure events from aggregates OrderCreationFailed { reason: String }, PaymentFailed { reason: String }, InventoryReservationFailed { reason: String }, // Compensation events OrderCompensated, PaymentRefunded, // Saga completion SagaCompleted, SagaFailed { reason: String }, }
Pattern: Commands to start, events for each step (success/failure), compensation events, terminal states.
Saga Reducer Implementation
pub struct CheckoutSagaReducer; impl Reducer for CheckoutSagaReducer { type State = CheckoutSagaState; type Action = CheckoutSagaAction; type Environment = SagaEnvironment; fn reduce( &self, state: &mut Self::State, action: Self::Action, env: &Self::Environment, ) -> Vec<Effect<Self::Action>> { match (&state.current_step, action) { // Start saga (SagaStep::NotStarted, CheckoutSagaAction::StartCheckout { customer_id, items, amount }) => { state.current_step = SagaStep::CreatingOrder; state.customer_id = customer_id.clone(); state.items = items.clone(); state.amount = amount; vec![ Effect::PublishEvent(OrderCommand::CreateOrder { customer_id, items, }), ] } // Order created → proceed to payment (SagaStep::CreatingOrder, CheckoutSagaAction::OrderCreated { order_id }) => { state.order_id = Some(order_id.clone()); state.completed_steps.push(SagaStep::CreatingOrder); state.current_step = SagaStep::ProcessingPayment; vec![ Effect::PublishEvent(PaymentCommand::ProcessPayment { order_id, amount: state.amount, }), ] } // Payment processed → proceed to inventory (SagaStep::ProcessingPayment, CheckoutSagaAction::PaymentProcessed { payment_id }) => { state.payment_id = Some(payment_id.clone()); state.completed_steps.push(SagaStep::ProcessingPayment); state.current_step = SagaStep::ReservingInventory; vec![ Effect::PublishEvent(InventoryCommand::ReserveItems { order_id: state.order_id.clone().unwrap(), items: state.items.clone(), }), ] } // Inventory reserved → complete (SagaStep::ReservingInventory, CheckoutSagaAction::InventoryReserved { reservation_id }) => { state.reservation_id = Some(reservation_id); state.completed_steps.push(SagaStep::ReservingInventory); state.current_step = SagaStep::Completed; vec![ Effect::PublishEvent(CheckoutSagaAction::SagaCompleted), ] } // Payment failed → compensate order (SagaStep::ProcessingPayment, CheckoutSagaAction::PaymentFailed { reason }) => { state.current_step = SagaStep::Compensating { failed_at: Box::new(SagaStep::ProcessingPayment), }; // Compensate completed steps in reverse order self.compensate(state, env) } // Inventory failed → compensate payment and order (SagaStep::ReservingInventory, CheckoutSagaAction::InventoryReservationFailed { reason }) => { state.current_step = SagaStep::Compensating { failed_at: Box::new(SagaStep::ReservingInventory), }; self.compensate(state, env) } _ => vec![Effect::None], } } } impl CheckoutSagaReducer { /// Compensate completed steps in reverse order fn compensate( &self, state: &CheckoutSagaState, env: &SagaEnvironment, ) -> Vec<Effect<CheckoutSagaAction>> { let mut effects = vec![]; // Compensate in reverse order for step in state.completed_steps.iter().rev() { match step { SagaStep::CreatingOrder => { if let Some(order_id) = &state.order_id { effects.push(Effect::PublishEvent(OrderCommand::CancelOrder { order_id: order_id.clone(), reason: "Saga compensation".to_string(), })); } } SagaStep::ProcessingPayment => { if let Some(payment_id) = &state.payment_id { effects.push(Effect::PublishEvent(PaymentCommand::RefundPayment { payment_id: payment_id.clone(), })); } } _ => {} } } effects } }
Pattern:
- Match on
tuple(current_step, action) - Update state, track completed steps
- Publish next command via
Effect::PublishEvent - On failure, compensate completed steps in reverse order
EventBus Pattern (Multi-Aggregate Communication)
EventBus Trait
pub trait EventBus: Send + Sync { type Event: Send + Sync; /// Publish event to topic async fn publish(&self, topic: &str, event: Self::Event) -> Result<(), Error>; /// Subscribe to topic with consumer group async fn subscribe( &self, topic: &str, group_id: &str, handler: impl Fn(Self::Event) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> + Send + Sync + 'static, ) -> Result<(), Error>; }
Publish Pattern
// In reducer, return Effect::PublishEvent vec![ Effect::PublishEvent { topic: "orders".to_string(), event: OrderEvent::OrderCreated { order_id: "123".to_string() }, }, ] // Store executes effect via event bus async fn execute_effect(&self, effect: Effect<Action>) { match effect { Effect::PublishEvent { topic, event } => { self.event_bus.publish(&topic, event).await?; } // ... } }
Subscribe Pattern
// Payment aggregate subscribes to orders topic event_bus .subscribe("orders", "payment-service", |event| { Box::pin(async move { match event { OrderEvent::OrderCreated { order_id } => { // Send payment processing action to payment store payment_store .send(PaymentAction::ProcessPayment { order_id }) .await?; } _ => {} } Ok(()) }) }) .await?;
Pattern: Each aggregate subscribes to events it cares about. Sends actions to its own store. This creates cross-aggregate coordination.
At-Least-Once Delivery
Events may be delivered multiple times. Design for idempotency:
// Idempotent action handling fn reduce(&self, state: &mut State, action: Action, env: &Env) -> Vec<Effect> { match action { PaymentAction::ProcessPayment { order_id } => { // Check if already processed if state.processed_order_ids.contains(&order_id) { return vec![Effect::None]; // ✅ Idempotent } // Process payment state.processed_order_ids.insert(order_id.clone()); // ... payment logic vec![Effect::PublishEvent(PaymentEvent::PaymentProcessed { order_id })] } _ => vec![Effect::None], } }
Pattern: Track processed IDs or use unique keys to prevent duplicate processing.
Orchestration vs Choreography
Choreography (Event-Driven)
Each aggregate listens to events and reacts independently:
Order creates order → publishes OrderCreated ↓ Payment listens → processes payment → publishes PaymentProcessed ↓ Inventory listens → reserves items → publishes InventoryReserved
Benefits:
- Decoupled (no central coordinator)
- Aggregates are independent
- Easy to add new participants
Drawbacks:
- Hard to understand full flow
- Difficult to handle failures (who compensates?)
- No single source of saga state
Orchestration (Saga Coordinator)
Central saga reducer coordinates the flow:
Saga: Create order → command ↓ Order: Order created → event ↓ Saga: Process payment → command ↓ Payment: Payment processed → event ↓ Saga: Reserve inventory → command
Benefits:
- Clear flow (visible in saga reducer)
- Centralized compensation logic
- Easy to track saga state
- Easier to debug
Drawbacks:
- Central coordinator (potential bottleneck)
- Saga knows about all participants
Recommendation: Use orchestration (saga as reducer) for complex flows with compensation. Use choreography for simple event cascades.
Redpanda/Kafka Integration
RedpandaEventBus Pattern
pub struct RedpandaEventBus { producer: FutureProducer, consumer: StreamConsumer, } impl RedpandaEventBus { pub fn builder() -> RedpandaEventBusBuilder { RedpandaEventBusBuilder::new() } } // Usage let event_bus = RedpandaEventBus::builder() .broker("localhost:9092") .build()?;
Publish Implementation
async fn publish(&self, topic: &str, event: Event) -> Result<(), Error> { let payload = bincode::serialize(&event)?; let record = FutureRecord::to(topic) .payload(&payload) .key(&event.aggregate_id()); // Partition by aggregate ID self.producer .send(record, Duration::from_secs(5)) .await .map_err(|(err, _)| Error::PublishFailed(err))?; Ok(()) }
Pattern: Serialize with bincode. Use aggregate ID as key (ensures ordering per aggregate).
Subscribe Implementation
async fn subscribe<F>( &self, topic: &str, group_id: &str, handler: F, ) -> Result<(), Error> where F: Fn(Event) -> Pin<Box<dyn Future<Output = Result<(), Error>> + Send>> + Send + Sync + 'static, { self.consumer.subscribe(&[topic])?; loop { match self.consumer.recv().await { Ok(message) => { let payload = message.payload().ok_or(Error::EmptyMessage)?; let event: Event = bincode::deserialize(payload)?; // Process event handler(event).await?; // Commit offset (at-least-once delivery) self.consumer.commit_message(&message, CommitMode::Async)?; } Err(e) => { eprintln!("Error receiving message: {}", e); } } } }
Pattern:
- Receive message
- Deserialize event
- Call handler
- Commit offset (manual commit for at-least-once)
Consumer Groups Pattern
Multiple instances of the same aggregate can share work:
Topic: orders (3 partitions) Consumer Group: payment-service ├─ Instance 1 → Partition 0 ├─ Instance 2 → Partition 1 └─ Instance 3 → Partition 2
Pattern: Use same
group_id for all instances. Kafka assigns partitions automatically. Each partition processed by exactly one instance.
Compensation Patterns
Pattern 1: Reverse Order Compensation
Compensate in reverse order of execution:
fn compensate(state: &SagaState) -> Vec<Effect> { let mut effects = vec![]; for step in state.completed_steps.iter().rev() { effects.push(compensation_for_step(step)); } effects }
Pattern 2: Compensating Actions
Each action has a compensating action:
| Action | Compensating Action |
|---|---|
| Create order | Cancel order |
| Charge payment | Refund payment |
| Reserve inventory | Release inventory |
| Send email | Send cancellation email |
Pattern 3: Idempotent Compensation
Compensation must be idempotent (may be retried):
fn compensate_order(state: &mut State, order_id: &str) -> Vec<Effect> { // Check if already compensated if state.compensated_order_ids.contains(order_id) { return vec![Effect::None]; } state.compensated_order_ids.insert(order_id.to_string()); vec![Effect::PublishEvent(OrderCommand::CancelOrder { order_id: order_id.to_string(), })] }
Pattern 4: Partial Compensation
Some actions can't be fully compensated. Use best-effort:
match failed_step { SagaStep::EmailSent => { // Can't unsend email, send apology email instead vec![Effect::PublishEvent(EmailCommand::SendApology { customer_id: state.customer_id.clone(), })] } SagaStep::ExternalApiCalled => { // External API may not support compensation // Log for manual intervention vec![Effect::Log { level: LogLevel::Error, message: "Manual compensation required for external API".to_string(), }] } }
Error Handling and Retries
Transient vs Permanent Failures
use composable_rust_core::delay; match error { Error::NetworkTimeout | Error::ServiceUnavailable => { // Transient error → retry if state.retry_count < MAX_RETRIES { state.retry_count += 1; vec![delay! { duration: exponential_backoff(state.retry_count), action: original_action }] } else { // Max retries → compensate self.compensate(state, env) } } Error::InvalidData | Error::InsufficientFunds => { // Permanent error → compensate immediately self.compensate(state, env) } }
Dead Letter Queue Pattern
if state.retry_count >= MAX_RETRIES { vec![ Effect::PublishEvent { topic: "dead-letter-queue".to_string(), event: SagaFailedEvent { saga_id: state.saga_id.clone(), failed_at: state.current_step.clone(), reason: error.to_string(), }, }, // Then compensate ...self.compensate(state, env), ] }
Pattern: Send to DLQ for manual review, then compensate.
Testing Patterns
Unit Test: Saga State Machine
#[test] fn test_checkout_saga_success_flow() { let env = test_environment(); let mut state = CheckoutSagaState::new("saga-1".to_string()); // Start checkout let effects = reducer.reduce( &mut state, CheckoutSagaAction::StartCheckout { ... }, &env, ); assert_eq!(state.current_step, SagaStep::CreatingOrder); // Order created let effects = reducer.reduce( &mut state, CheckoutSagaAction::OrderCreated { order_id: "order-1".to_string() }, &env, ); assert_eq!(state.current_step, SagaStep::ProcessingPayment); // Payment processed let effects = reducer.reduce( &mut state, CheckoutSagaAction::PaymentProcessed { payment_id: "pay-1".to_string() }, &env, ); assert_eq!(state.current_step, SagaStep::ReservingInventory); // Inventory reserved let effects = reducer.reduce( &mut state, CheckoutSagaAction::InventoryReserved { reservation_id: "res-1".to_string() }, &env, ); assert_eq!(state.current_step, SagaStep::Completed); }
Unit Test: Compensation
#[test] fn test_checkout_saga_compensation() { let env = test_environment(); let mut state = CheckoutSagaState::new("saga-1".to_string()); // Simulate completed steps state.current_step = SagaStep::ProcessingPayment; state.completed_steps.push(SagaStep::CreatingOrder); state.order_id = Some("order-1".to_string()); // Payment fails let effects = reducer.reduce( &mut state, CheckoutSagaAction::PaymentFailed { reason: "Insufficient funds".to_string() }, &env, ); // Should compensate order assert!(matches!(state.current_step, SagaStep::Compensating { .. })); assert!(matches!(effects[0], Effect::PublishEvent(OrderCommand::CancelOrder { .. }))); }
Integration Test: With InMemoryEventBus
#[tokio::test] async fn test_saga_with_event_bus() { let event_bus = InMemoryEventBus::new(); // Create stores for each aggregate let order_store = Store::new(OrderState::default(), OrderReducer, order_env); let payment_store = Store::new(PaymentState::default(), PaymentReducer, payment_env); let saga_store = Store::new(CheckoutSagaState::new("saga-1"), SagaReducer, saga_env); // Subscribe aggregates to events event_bus.subscribe("orders", "payment-service", |event| { Box::pin(async move { payment_store.send(PaymentAction::from_order_event(event)).await }) }).await?; // Start saga saga_store.send(CheckoutSagaAction::StartCheckout { ... }).await; // Wait for completion tokio::time::sleep(Duration::from_millis(100)).await; let saga_state = saga_store.state().await; assert_eq!(saga_state.current_step, SagaStep::Completed); }
Common Anti-Patterns to Avoid
❌ Anti-Pattern 1: Synchronous Cross-Aggregate Calls
// ❌ Don't call other aggregates directly fn reduce(...) -> Vec<Effect> { let payment_result = payment_service.process_payment().await; // ❌ Tight coupling }
Solution: Use event bus for async communication.
❌ Anti-Pattern 2: Not Tracking Compensation Data
// ❌ Can't compensate without IDs struct SagaState { current_step: Step, // ❌ Missing: order_id, payment_id, etc. }
Solution: Store all IDs needed for compensation.
❌ Anti-Pattern 3: Ignoring Idempotency
// ❌ Processing same event twice fn reduce(...) -> Vec<Effect> { state.balance -= amount; // ❌ Double-deduct if event replays }
Solution: Check if event already processed.
❌ Anti-Pattern 4: Compensation Order Errors
// ❌ Compensating in wrong order fn compensate(state: &SagaState) -> Vec<Effect> { for step in state.completed_steps.iter() { // ❌ Forward order // Compensation } }
Solution: Compensate in reverse order.
❌ Anti-Pattern 5: No Timeout for Saga Steps
// ❌ Saga waits forever for event fn reduce(...) -> Vec<Effect> { // Waits indefinitely for PaymentProcessed event }
Solution: Use timeouts and retry logic:
use composable_rust_core::delay; vec![ Effect::PublishEvent(command), delay! { duration: Duration::from_secs(30), action: SagaAction::StepTimeout { step: current_step } }, ]
Advanced Patterns
Pattern: Nested Sagas (Hierarchical Workflows)
Use Case: Complex workflows where one saga orchestrates multiple child sagas.
Example: Order Fulfillment saga coordinates Payment saga + Shipping saga + Notification saga.
Parent Saga State
#[derive(Clone, Debug)] pub struct OrderFulfillmentState { pub order_id: String, pub status: FulfillmentStatus, // Child saga states (owned by parent) pub payment_saga: PaymentSagaState, pub shipping_saga: Option<ShippingSagaState>, pub notification_saga: Option<NotificationSagaState>, // Track which child sagas are active pub active_children: Vec<ChildSaga>, } #[derive(Clone, Debug)] pub enum FulfillmentStatus { Idle, ProcessingPayment, ProcessingShipping, SendingNotifications, Completed, Compensating { failed_child: ChildSaga }, Failed, } #[derive(Clone, Debug, PartialEq)] pub enum ChildSaga { Payment, Shipping, Notification, }
Parent Saga Reducer
impl Reducer for OrderFulfillmentSaga { type State = OrderFulfillmentState; type Action = FulfillmentAction; type Environment = FulfillmentEnvironment; fn reduce( &self, state: &mut Self::State, action: Self::Action, env: &Self::Environment, ) -> SmallVec<[Effect<Self::Action>; 4]> { use FulfillmentStatus::*; match (&state.status, action) { // Parent starts: delegate to Payment child saga (Idle, FulfillmentAction::StartFulfillment { order_id, amount }) => { state.status = ProcessingPayment; state.order_id = order_id.clone(); state.active_children.push(ChildSaga::Payment); // Delegate to child Payment saga smallvec![Effect::PublishEvent(PaymentAction::InitiatePayment { order_id, amount, })] } // Child Payment saga completed successfully (ProcessingPayment, FulfillmentAction::PaymentCompleted { transaction_id }) => { state.payment_saga.transaction_id = Some(transaction_id.clone()); state.status = ProcessingShipping; state.active_children.push(ChildSaga::Shipping); // Start next child saga smallvec![Effect::PublishEvent(ShippingAction::InitiateShipping { order_id: state.order_id.clone(), transaction_id, })] } // Child Payment saga failed → Compensate (ProcessingPayment, FulfillmentAction::PaymentFailed { error }) => { state.status = Compensating { failed_child: ChildSaga::Payment, }; // No compensation needed (nothing to undo yet) smallvec![Effect::PublishEvent(FulfillmentAction::FulfillmentFailed { reason: format!("Payment failed: {error}"), })] } // Child Shipping saga failed → Compensate Payment (ProcessingShipping, FulfillmentAction::ShippingFailed { error }) => { state.status = Compensating { failed_child: ChildSaga::Shipping, }; // Compensate: Refund payment (undo child Payment saga) if let Some(transaction_id) = &state.payment_saga.transaction_id { smallvec![Effect::PublishEvent(PaymentAction::RefundPayment { transaction_id: transaction_id.clone(), })] } else { smallvec![Effect::None] } } // All child sagas succeeded (ProcessingShipping, FulfillmentAction::ShippingCompleted { tracking }) => { state.status = Completed; state.active_children.clear(); smallvec![Effect::PublishEvent(FulfillmentAction::FulfillmentCompleted { order_id: state.order_id.clone(), tracking, })] } _ => smallvec![Effect::None], } } }
Key Patterns for Nested Sagas
1. Parent Owns Child State
pub struct ParentState { pub child1: Child1State, // ✅ Owned by parent pub child2: Child2State, // ✅ Owned by parent }
Why: Parent needs visibility into child state for compensation and coordination.
2. Explicit Child Tracking
pub active_children: Vec<ChildSaga>, // Track which children are running
Why: Know which children to compensate if parent saga fails.
3. Error Propagation Up, Compensation Down
// Error propagates UP from child to parent (ProcessingChild, Action::ChildFailed { error }) => { state.status = Compensating { failed_child }; // Parent decides what to do } // Compensation flows DOWN from parent to children (Compensating, _) => { // Parent triggers compensation in children smallvec![Effect::PublishEvent(ChildAction::Compensate)] }
4. Sequential vs Parallel Children
// Sequential: Start child2 after child1 completes (ChildCompleted, Child1Success) => { smallvec![Effect::PublishEvent(StartChild2)] } // Parallel: Start all children at once (Idle, Start) => { smallvec![ Effect::PublishEvent(StartChild1), Effect::PublishEvent(StartChild2), Effect::PublishEvent(StartChild3), ] }
Benefits of Nested Sagas
- ✅ Modularity: Child sagas are reusable in different parent contexts
- ✅ Clear ownership: Parent owns coordination, children own domain logic
- ✅ Explicit compensation: Parent decides compensation strategy
- ✅ Testing: Test child sagas independently, parent tests coordination
When to Use Nested Sagas
Use nested sagas when:
- Workflow has clear hierarchical structure
- Child workflows are reusable across different parents
- Need different compensation strategies per parent context
Avoid nested sagas when:
- Workflow is simple and flat (use single saga)
- Children are tightly coupled (merge into one saga)
- Communication overhead exceeds benefit (inline the logic)
Pattern: Saga Timeouts
use composable_rust_core::delay; vec![ Effect::PublishEvent(command), delay! { duration: Duration::from_secs(30), action: SagaAction::Timeout { step: state.current_step.clone(), } }, ] // In reducer (step, SagaAction::Timeout { step: timeout_step }) if step == timeout_step => { // Step timed out, compensate self.compensate(state, env) }
Pattern: Saga Persistence
Persist saga state to recover from crashes:
fn reduce(...) -> Vec<Effect> { vec![ Effect::Database(SaveSagaState(state.clone())), Effect::PublishEvent(next_command), ] }
Quick Reference Checklist
When implementing sagas:
- State machine: Track current step and completed steps
- Compensation data: Store IDs needed for rollback
- Idempotent actions: Check if already processed
- Reverse compensation: Undo in reverse order
- Timeout handling: Don't wait forever for events
- Retry logic: Distinguish transient vs permanent errors
- Dead letter queue: Handle max retries
- Event bus: Use for cross-aggregate communication
- Consumer groups: For parallel processing
- Manual offset commit: For at-least-once delivery
Performance Considerations
- Orchestration overhead: Saga coordinator is extra hop, but usually negligible (<10ms)
- Event bus latency: Kafka/Redpanda add ~5-20ms per event
- Compensation cost: Rare in happy path (typically <1% of sagas compensate)
- Consumer lag: Monitor with Kafka metrics, scale consumers if needed
See Also
- Architecture:
- Core reducer/effect patternscomposable-rust-architecture.skill - Event Sourcing:
- Event store and persistencecomposable-rust-event-sourcing.skill - Documentation:
- Comprehensive saga guidedocs/sagas.md - Patterns:
- Additional saga patternsdocs/saga-patterns.md - Consistency:
- Consistency guaranteesdocs/consistency-patterns.md
Remember: A saga is just a reducer with a state machine. Compensate in reverse order. Design for idempotency. Use event bus for coordination.