Claude-skill-registry kleppmann-data-intensive
Design distributed systems in the style of Martin Kleppmann, author of "Designing Data-Intensive Applications". Emphasizes understanding data systems deeply, making informed trade-offs, and building reliable data infrastructure. Use when designing databases, streaming systems, or data pipelines.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/kleppmann" ~/.claude/skills/majiayu000-claude-skill-registry-kleppmann-data-intensive && rm -rf "$T"
skills/data/kleppmann/SKILL.mdMartin Kleppmann Style Guide
Overview
Martin Kleppmann is the author of "Designing Data-Intensive Applications" (DDIA), one of the most influential books on distributed systems and databases. He excels at explaining complex concepts clearly and helping engineers make informed architectural decisions.
Core Philosophy
"Reliability means making systems work correctly, even when faults occur."
"The goal of consistency models is to provide a abstraction for application developers."
"There's no such thing as a 'best' database—only trade-offs."
Kleppmann believes in understanding systems deeply, not just using them. Every architectural choice is a trade-off; understand what you're trading.
Design Principles
-
Understand the Trade-offs: CAP, PACELC, latency vs consistency.
-
Design for Failure: Partial failure is the norm in distributed systems.
-
Data Outlives Code: Schema design and data models matter enormously.
-
Exactly-Once Is Hard: Understand idempotency and at-least-once semantics.
When Writing Code
Always
- Understand the consistency guarantees your system provides
- Design for idempotency where possible
- Think about data evolution and schema changes
- Consider exactly-once vs at-least-once semantics
- Know your data access patterns before choosing storage
- Plan for failure recovery
Never
- Assume "eventual consistency" without understanding what it means
- Ignore the differences between isolation levels
- Couple tightly without considering failure modes
- Treat distributed transactions as a silver bullet
Prefer
- Idempotent operations
- Append-only data structures
- Event sourcing for audit trails
- Change data capture over dual writes
- Log-based message brokers over traditional ones
Code Patterns
Consistency Models Illustrated
# Understanding consistency models through code class LinearizableStore: """ Linearizability: operations appear atomic and instantaneous. Strongest consistency - single copy illusion. """ def __init__(self): self._lock = threading.Lock() self._data = {} def write(self, key, value): with self._lock: self._data[key] = value def read(self, key): with self._lock: return self._data.get(key) def compare_and_set(self, key, expected, new_value): with self._lock: if self._data.get(key) == expected: self._data[key] = new_value return True return False class CausallyConsistentStore: """ Causal consistency: respects happens-before relationship. Weaker than linearizable, but allows more concurrency. """ def __init__(self, node_id): self.node_id = node_id self.data = {} self.vector_clock = defaultdict(int) def write(self, key, value, dependencies=None): # Update our clock self.vector_clock[self.node_id] += 1 # Merge dependencies if dependencies: for node, time in dependencies.items(): self.vector_clock[node] = max(self.vector_clock[node], time) self.data[key] = { 'value': value, 'clock': dict(self.vector_clock) } return dict(self.vector_clock) def read(self, key): if key in self.data: return self.data[key]['value'], self.data[key]['clock'] return None, dict(self.vector_clock)
Event Sourcing
# Event sourcing: store events, derive state from dataclasses import dataclass from typing import List from datetime import datetime @dataclass class Event: event_type: str data: dict timestamp: datetime version: int class EventStore: def __init__(self): self.events: List[Event] = [] self.version = 0 def append(self, event_type: str, data: dict): self.version += 1 event = Event( event_type=event_type, data=data, timestamp=datetime.now(), version=self.version ) self.events.append(event) return event def get_events(self, from_version=0): return [e for e in self.events if e.version > from_version] class BankAccount: """Aggregate rebuilt from events""" def __init__(self, account_id: str, event_store: EventStore): self.account_id = account_id self.event_store = event_store self.balance = 0 self._rebuild_state() def _rebuild_state(self): """Derive current state from event history""" for event in self.event_store.events: self._apply(event) def _apply(self, event: Event): if event.event_type == 'deposited': self.balance += event.data['amount'] elif event.event_type == 'withdrawn': self.balance -= event.data['amount'] def deposit(self, amount: float): event = self.event_store.append('deposited', { 'account_id': self.account_id, 'amount': amount }) self._apply(event) def withdraw(self, amount: float): if amount > self.balance: raise ValueError("Insufficient funds") event = self.event_store.append('withdrawn', { 'account_id': self.account_id, 'amount': amount }) self._apply(event) # Benefits: # 1. Complete audit trail # 2. Time travel (state at any point) # 3. Event replay for debugging # 4. Easy to add new projections
Idempotency Keys
# Idempotency: safe retries without duplicate effects import uuid import hashlib class IdempotentProcessor: def __init__(self): self.processed_keys = {} # key -> result self.expiry_seconds = 3600 def process(self, idempotency_key: str, operation): """ Execute operation exactly once for a given key. Retries with same key return cached result. """ # Check if already processed if idempotency_key in self.processed_keys: return self.processed_keys[idempotency_key] # Execute operation try: result = operation() self.processed_keys[idempotency_key] = { 'status': 'success', 'result': result } return self.processed_keys[idempotency_key] except Exception as e: # Don't cache failures (allow retry) raise @staticmethod def generate_key(*args): """Generate deterministic idempotency key""" content = '|'.join(str(arg) for arg in args) return hashlib.sha256(content.encode()).hexdigest() # Usage: processor = IdempotentProcessor() def create_payment(user_id, amount, idempotency_key): def do_payment(): # Actually create the payment return {'payment_id': str(uuid.uuid4()), 'amount': amount} return processor.process(idempotency_key, do_payment) # Client can safely retry: key = IdempotentProcessor.generate_key(user_id, amount, request_id) result1 = create_payment(user_id, 100, key) result2 = create_payment(user_id, 100, key) # Same result, no duplicate payment
Change Data Capture
# CDC: capture database changes as a stream from typing import Callable, List from enum import Enum from dataclasses import dataclass class OperationType(Enum): INSERT = 'insert' UPDATE = 'update' DELETE = 'delete' @dataclass class ChangeEvent: table: str operation: OperationType key: dict before: dict # For update/delete after: dict # For insert/update timestamp: float sequence: int class CDCProducer: """Publish changes from database write-ahead log""" def __init__(self, publisher): self.publisher = publisher self.sequence = 0 def capture_insert(self, table: str, key: dict, data: dict): self.sequence += 1 event = ChangeEvent( table=table, operation=OperationType.INSERT, key=key, before=None, after=data, timestamp=time.time(), sequence=self.sequence ) self.publisher.publish(event) def capture_update(self, table: str, key: dict, before: dict, after: dict): self.sequence += 1 event = ChangeEvent( table=table, operation=OperationType.UPDATE, key=key, before=before, after=after, timestamp=time.time(), sequence=self.sequence ) self.publisher.publish(event) class CDCConsumer: """Consume changes and maintain derived view""" def __init__(self): self.handlers: dict[str, List[Callable]] = {} self.last_sequence = 0 def register(self, table: str, handler: Callable): if table not in self.handlers: self.handlers[table] = [] self.handlers[table].append(handler) def process(self, event: ChangeEvent): # Ensure ordering if event.sequence <= self.last_sequence: return # Already processed if event.table in self.handlers: for handler in self.handlers[event.table]: handler(event) self.last_sequence = event.sequence # Usage: maintain search index from database changes def update_search_index(event: ChangeEvent): if event.operation == OperationType.DELETE: search_index.delete(event.key) else: search_index.index(event.key, event.after) consumer.register('products', update_search_index)
Stream Processing
# Stream processing patterns from collections import defaultdict from typing import Iterator, TypeVar, Callable T = TypeVar('T') class StreamProcessor: """Stateful stream processing""" def __init__(self): self.state = {} def process(self, stream: Iterator[T], handler: Callable[[T, dict], None]): """Process stream with access to state""" for record in stream: handler(record, self.state) yield self.state.copy() def windowed_count(window_size_seconds: int): """Tumbling window aggregation""" def handler(event, state): window_start = (event['timestamp'] // window_size_seconds) * window_size_seconds key = (event['key'], window_start) if key not in state: state[key] = {'count': 0, 'window_start': window_start} state[key]['count'] += 1 # Emit completed windows current_window = (time.time() // window_size_seconds) * window_size_seconds completed = [k for k in state if k[1] < current_window - window_size_seconds] for k in completed: emit(state.pop(k)) return handler def exactly_once_processing(processor, checkpointer): """ Exactly-once semantics via: 1. Idempotent writes 2. Transactional checkpointing """ def process_with_guarantees(records): for record in records: # Get last checkpoint last_offset = checkpointer.get_offset() if record.offset <= last_offset: continue # Already processed # Process and checkpoint atomically with transaction(): result = processor.process(record) checkpointer.save_offset(record.offset) yield result return process_with_guarantees
Mental Model
Kleppmann approaches data systems by asking:
- What are the consistency requirements? Linearizable, serializable, eventual?
- What are the access patterns? Read-heavy, write-heavy, mixed?
- How do we handle failures? Retries, idempotency, compensation?
- How does data evolve? Schema changes, backward compatibility?
- What happens at the edges? Network partitions, slow nodes?
Signature Kleppmann Moves
- Event sourcing for auditability
- Idempotency keys for safe retries
- CDC over dual writes
- Understanding isolation levels deeply
- Log-based messaging for durability
- Making trade-offs explicit