Claude-skill-registry event-flow
Add new events to Bob The Skull's event-driven architecture. Use when creating new events, event publishers, event handlers, or extending the event system with new event types.
git clone https://github.com/majiayu000/claude-skill-registry
T=$(mktemp -d) && git clone --depth=1 https://github.com/majiayu000/claude-skill-registry "$T" && mkdir -p ~/.claude/skills && cp -r "$T/skills/data/event-flow" ~/.claude/skills/majiayu000-claude-skill-registry-event-flow && rm -rf "$T"
skills/data/event-flow/SKILL.mdEvent Flow Management
Adds new events to Bob The Skull's event-driven architecture, ensuring all publishers and subscribers are properly connected.
When to Use
- Creating new event types
- Adding event publishers (components that emit events)
- Adding event subscribers (components that handle events)
- Documenting event flows
- Troubleshooting missing event handlers
Event-Driven Architecture Overview
Bob The Skull uses a central EventBus for all inter-component communication:
Component A → publish(Event) → EventBus → deliver → Component B.on_event()
No direct method calls between components! Everything goes through events.
Event Definition Pattern (events.py)
from dataclasses import dataclass, field from datetime import datetime from typing import Dict, Any, Optional @dataclass class YourEvent: """Published when [describe condition that triggers event]""" param1: type # Description param2: type # Description optional_data: Optional[Dict[str, Any]] = None timestamp: datetime = field(default_factory=datetime.now)
Common Event Types
State Change Events:
@dataclass class ComponentStateChangedEvent: """Component changed state""" component: str old_state: str new_state: str timestamp: datetime = field(default_factory=datetime.now)
Detection Events:
@dataclass class ThingDetectedEvent: """Thing was detected""" confidence: float location: tuple detector_id: str data: Dict[str, Any] timestamp: datetime = field(default_factory=datetime.now)
Request/Response Events:
@dataclass class ProcessRequestEvent: """Request to process something""" request_id: str input_data: Any timestamp: datetime = field(default_factory=datetime.now) @dataclass class ProcessResponseEvent: """Response to process request""" request_id: str result: Any success: bool timestamp: datetime = field(default_factory=datetime.now)
Publishing Events
In component code:
from events import YourEvent class YourComponent: def __init__(self, event_bus): self.event_bus = event_bus def do_something(self): # When condition occurs, publish event event = YourEvent( param1=value1, param2=value2 ) self.event_bus.publish(event) logger.info(f"Published YourEvent: {event}")
Subscribing to Events
In component code:
from events import YourEvent class YourComponent: def __init__(self, event_bus): self.event_bus = event_bus # Subscribe to event during initialization self.event_bus.subscribe(YourEvent, self.on_your_event) def on_your_event(self, event: YourEvent): """Handle YourEvent""" logger.info(f"Received YourEvent: {event}") # Process event self.do_something_with(event.param1, event.param2)
State Machine Integration
State machine is a special subscriber that coordinates system behavior:
# In state_machine/state_machine.py def _setup_event_handlers(self): """Subscribe to events""" self.event_bus.subscribe(YourEvent, self.on_your_event) def on_your_event(self, event: YourEvent): """Handle YourEvent in state machine""" logger.info(f"State machine received: {event}") # Check current state if self.current_state == State.WAITING: # Transition to new state self._transition_to(State.PROCESSING) # Trigger actions self._handle_processing(event)
Event Flow Checklist
When adding a new event:
- Define event dataclass in
events.py - Find all components that should publish this event
- Find all components that should subscribe to this event
- Add publishers:
event_bus.publish(YourEvent(...)) - Add subscribers:
event_bus.subscribe(YourEvent, self.on_event) - Add handlers:
def on_your_event(self, event: YourEvent): - Update state machine if needed
- Test event flow end-to-end
- Document event flow in comments/docstrings
Finding Event Usage
Find where event is published:
grep -r "YourEvent(" .
Find where event is subscribed:
grep -r "subscribe.*YourEvent" .
Find event handlers:
grep -r "def on_your_event" .
Common Event Patterns
Vision Detection Events
- Published by: Detector plugins
- Subscribed by: State machine, vision processor
- Pattern:
,*DetectedEvent*ClearedEvent
Speech Events
- Published by: STT, wake word detector
- Subscribed by: State machine
- Pattern:
,*RecognizedEvent*DetectedEvent
LLM Events
- Published by: LLM processor
- Subscribed by: State machine, TTS
- Pattern:
,*ResponseEvent*ToolCallEvent
Hardware Events
- Published by: Hardware controllers
- Subscribed by: State machine
- Pattern:
,*StatusEvent*ErrorEvent
Event Bus Implementation
Bob uses two event bus implementations:
LocalEventBus - In-process pub/sub
from local_event_bus import LocalEventBus event_bus = LocalEventBus()
MQTTEventBus - Distributed pub/sub
from mqtt_event_bus import MQTTEventBus event_bus = MQTTEventBus(broker="192.168.1.44")
Both implement the same interface:
- Send eventpublish(event)
- Register handlersubscribe(event_type, handler)
- Remove handlerunsubscribe(event_type, handler)
Existing Events Reference
Core events in
:events.py
- Wake word heardWakeWordDetectedEvent
- Speech transcribedSpeechRecognizedEvent
- LLM response availableLLMResponseReadyEvent
- LLM wants to call toolLLMToolCallEvent
/TTSStartedEvent
- TTS lifecycleTTSCompletedEvent
/FaceDetectedEvent
- Vision eventsFaceIdentifiedEvent
/MotionDetectedEvent
- Motion eventsMotionClearedEvent
- State machine transitionsStateChangedEvent
Event Naming Conventions
Event Names:
- Use descriptive nouns:
, notThingDetectedDetectThing - End with
:EventFaceDetectedEvent - Use past tense:
, notDetectedDetect
Handler Names:
- Prefix with
:on_on_face_detected - Use snake_case:
on_speech_recognized - Match event name:
→FaceDetectedEventon_face_detected
Debugging Event Flows
Enable event logging:
# In LocalEventBus or MQTTEventBus logger.setLevel(logging.DEBUG)
Trace event flow:
# In event_bus.py def publish(self, event): logger.debug(f"Publishing: {type(event).__name__} to {len(self.subscribers.get(type(event), []))} subscribers") # ... publish logic
Check subscriptions:
# In component logger.info(f"Subscribed to: {[t.__name__ for t in self.event_bus.subscribers.keys()]}")
Pro Tips
- Events are immutable - Use dataclasses with frozen=False for mutable data
- Include context - Always add detector_id, component_id, etc.
- Timestamp everything - Use
field(default_factory=datetime.now) - Document triggers - Explain in docstring what causes the event
- Test isolation - Mock event_bus for unit tests
- Avoid event storms - Debounce high-frequency events
- Use type hints - Makes IDE autocomplete work perfectly