git clone https://github.com/vibeforge1111/vibeship-spawner-skills
simulation/digital-twin/skill.yamlid: digital-twin name: Digital Twin Development category: simulation description: | Build and maintain digital twins - virtual representations of physical systems that synchronize with real-world counterparts for monitoring, prediction, and optimization. version: 1.0.0
triggers:
- "digital twin"
- "virtual model"
- "real-time synchronization"
- "physical-virtual coupling"
- "predictive maintenance"
- "asset modeling"
- "system replica"
- "live simulation"
provides:
- Digital twin architecture design
- State synchronization patterns
- Sensor integration strategies
- Physics model calibration
- Predictive analytics pipelines
- Edge-cloud hybrid deployment
patterns: twin_architecture: description: "Core digital twin system structure" example: | from dataclasses import dataclass, field from typing import Dict, List, Optional, Callable from datetime import datetime import numpy as np from abc import ABC, abstractmethod
@dataclass class SensorReading: """Single sensor measurement.""" sensor_id: str timestamp: datetime value: float unit: str quality: float = 1.0 # Data quality score 0-1 @dataclass class TwinState: """Complete state of the digital twin.""" timestamp: datetime physical_state: Dict[str, float] # Measured values virtual_state: Dict[str, float] # Simulated values residuals: Dict[str, float] # Differences confidence: Dict[str, float] # State confidence class PhysicsModel(ABC): """Abstract physics model for simulation.""" @abstractmethod def predict(self, state: Dict[str, float], dt: float) -> Dict[str, float]: """Predict next state given current state and time step.""" pass @abstractmethod def jacobian(self, state: Dict[str, float]) -> np.ndarray: """Compute Jacobian for state estimation.""" pass class DigitalTwin: """ Digital twin with state synchronization. Combines physics-based prediction with sensor updates using Extended Kalman Filter for state estimation. """ def __init__( self, model: PhysicsModel, state_dim: int, measurement_dim: int, process_noise: float = 0.01, measurement_noise: float = 0.1 ): self.model = model self.state_dim = state_dim self.measurement_dim = measurement_dim # State estimate and covariance self.state = np.zeros(state_dim) self.covariance = np.eye(state_dim) # Noise covariances self.Q = np.eye(state_dim) * process_noise self.R = np.eye(measurement_dim) * measurement_noise # History for analysis self.state_history: List[TwinState] = [] self.last_update = datetime.now() # Callbacks for state changes self.callbacks: List[Callable[[TwinState], None]] = [] def predict(self, dt: float) -> np.ndarray: """Predict state forward using physics model.""" state_dict = self._array_to_dict(self.state) # Physics prediction predicted_dict = self.model.predict(state_dict, dt) predicted = self._dict_to_array(predicted_dict) # Jacobian for covariance propagation F = self.model.jacobian(state_dict) # Propagate covariance self.covariance = F @ self.covariance @ F.T + self.Q self.state = predicted return predicted def update(self, measurements: Dict[str, SensorReading]) -> TwinState: """ Update state with sensor measurements. Uses Extended Kalman Filter to fuse predictions with measurements. """ now = datetime.now() dt = (now - self.last_update).total_seconds() # Predict to current time if dt > 0: self.predict(dt) # Convert measurements to array z = np.array([m.value for m in measurements.values()]) qualities = np.array([m.quality for m in measurements.values()]) # Adjust measurement noise based on quality R_adjusted = self.R / np.diag(qualities + 0.01) # Measurement matrix (maps state to measurements) H = self._measurement_matrix(list(measurements.keys())) # Kalman gain S = H @ self.covariance @ H.T + R_adjusted K = self.covariance @ H.T @ np.linalg.inv(S) # Innovation (measurement residual) innovation = z - H @ self.state # Update state self.state = self.state + K @ innovation self.covariance = (np.eye(self.state_dim) - K @ H) @ self.covariance # Create twin state twin_state = TwinState( timestamp=now, physical_state={k: m.value for k, m in measurements.items()}, virtual_state=self._array_to_dict(self.state), residuals={k: float(innovation[i]) for i, k in enumerate(measurements.keys())}, confidence=self._compute_confidence() ) self.state_history.append(twin_state) self.last_update = now # Notify callbacks for callback in self.callbacks: callback(twin_state) return twin_state def _compute_confidence(self) -> Dict[str, float]: """Compute confidence from covariance diagonal.""" variances = np.diag(self.covariance) # Convert variance to confidence (0-1 scale) confidence = 1.0 / (1.0 + variances) return self._array_to_dict(confidence) def _measurement_matrix(self, sensor_ids: List[str]) -> np.ndarray: """Build measurement matrix H for given sensors.""" # Override in subclass for specific sensor configurations return np.eye(len(sensor_ids), self.state_dim) def _array_to_dict(self, arr: np.ndarray) -> Dict[str, float]: """Convert state array to named dictionary.""" # Override with actual state names return {f"state_{i}": float(v) for i, v in enumerate(arr)} def _dict_to_array(self, d: Dict[str, float]) -> np.ndarray: """Convert named dictionary to state array.""" return np.array(list(d.values()))
state_synchronization: description: "Bidirectional sync between physical and virtual" example: | import asyncio from typing import Protocol, Optional from dataclasses import dataclass from datetime import datetime, timedelta import json
class SyncStrategy(Protocol): """Protocol for synchronization strategies.""" async def sync_to_virtual(self, physical_state: dict) -> dict: """Update virtual from physical.""" ... async def sync_to_physical(self, virtual_state: dict) -> dict: """Command physical from virtual (if supported).""" ... @dataclass class SyncConfig: """Configuration for state synchronization.""" sync_interval: float = 1.0 # seconds stale_threshold: float = 10.0 # seconds before data considered stale conflict_resolution: str = "physical_wins" # or "virtual_wins", "merge" enable_commands: bool = False # Allow virtual to control physical class StateSynchronizer: """ Manages state synchronization between physical and digital twin. Handles: - Periodic sync from sensors - Conflict resolution - Stale data detection - Command dispatch (optional) """ def __init__( self, twin: 'DigitalTwin', sensor_gateway: 'SensorGateway', config: SyncConfig = SyncConfig() ): self.twin = twin self.sensors = sensor_gateway self.config = config self.running = False # Track sync state self.last_physical_update = datetime.min self.last_virtual_update = datetime.min self.sync_errors = [] async def start(self): """Start synchronization loop.""" self.running = True while self.running: try: await self._sync_cycle() except Exception as e: self.sync_errors.append({ 'time': datetime.now(), 'error': str(e) }) await asyncio.sleep(self.config.sync_interval) async def stop(self): """Stop synchronization loop.""" self.running = False async def _sync_cycle(self): """Single synchronization cycle.""" # 1. Fetch physical state from sensors readings = await self.sensors.get_latest() # 2. Check for stale data now = datetime.now() for reading in readings.values(): age = (now - reading.timestamp).total_seconds() if age > self.config.stale_threshold: reading.quality *= 0.5 # Reduce quality for stale data # 3. Update twin with physical measurements twin_state = self.twin.update(readings) self.last_physical_update = now # 4. Check for divergence max_residual = max(abs(r) for r in twin_state.residuals.values()) if max_residual > 0.1: # Threshold for significant divergence await self._handle_divergence(twin_state) # 5. Optionally sync commands back to physical if self.config.enable_commands: await self._dispatch_commands(twin_state) async def _handle_divergence(self, state: TwinState): """Handle significant divergence between physical and virtual.""" if self.config.conflict_resolution == "physical_wins": # Trust sensors, recalibrate model pass elif self.config.conflict_resolution == "virtual_wins": # Trust model, flag sensor issues pass elif self.config.conflict_resolution == "merge": # Weighted combination based on confidence pass async def _dispatch_commands(self, state: TwinState): """Send control commands from virtual to physical.""" # Only if twin has determined optimal setpoints pass
sensor_integration: description: "Multi-sensor data ingestion and fusion" example: | from typing import Dict, List, Callable, Optional from dataclasses import dataclass from datetime import datetime import asyncio from abc import ABC, abstractmethod
@dataclass class SensorConfig: """Configuration for a sensor.""" sensor_id: str name: str unit: str min_value: float max_value: float sample_rate: float # Hz protocol: str # "mqtt", "modbus", "opcua", "rest" address: str class SensorAdapter(ABC): """Abstract adapter for different sensor protocols.""" @abstractmethod async def connect(self): pass @abstractmethod async def read(self) -> SensorReading: pass @abstractmethod async def disconnect(self): pass class MQTTSensorAdapter(SensorAdapter): """Adapter for MQTT-based sensors.""" def __init__(self, config: SensorConfig, broker: str): self.config = config self.broker = broker self.client = None self.last_value = None async def connect(self): import aiomqtt self.client = aiomqtt.Client(self.broker) await self.client.connect() await self.client.subscribe(self.config.address) async def read(self) -> SensorReading: async for message in self.client.messages: value = float(message.payload.decode()) return SensorReading( sensor_id=self.config.sensor_id, timestamp=datetime.now(), value=self._validate(value), unit=self.config.unit ) def _validate(self, value: float) -> float: """Validate sensor reading against bounds.""" return max(self.config.min_value, min(self.config.max_value, value)) async def disconnect(self): if self.client: await self.client.disconnect() class SensorGateway: """ Central gateway for all sensors feeding the digital twin. Manages connections, buffering, and data quality. """ def __init__(self): self.sensors: Dict[str, SensorAdapter] = {} self.latest: Dict[str, SensorReading] = {} self.buffers: Dict[str, List[SensorReading]] = {} self.buffer_size = 100 async def register(self, config: SensorConfig, adapter: SensorAdapter): """Register a sensor with its adapter.""" self.sensors[config.sensor_id] = adapter self.buffers[config.sensor_id] = [] await adapter.connect() async def get_latest(self) -> Dict[str, SensorReading]: """Get latest reading from all sensors.""" tasks = { sid: asyncio.create_task(adapter.read()) for sid, adapter in self.sensors.items() } for sid, task in tasks.items(): try: reading = await asyncio.wait_for(task, timeout=1.0) self.latest[sid] = reading self._buffer_reading(sid, reading) except asyncio.TimeoutError: # Use last known value with degraded quality if sid in self.latest: self.latest[sid].quality *= 0.8 return self.latest def _buffer_reading(self, sensor_id: str, reading: SensorReading): """Buffer reading for historical analysis.""" buf = self.buffers[sensor_id] buf.append(reading) if len(buf) > self.buffer_size: buf.pop(0)
predictive_maintenance: description: "Failure prediction and remaining useful life estimation" example: | import numpy as np from dataclasses import dataclass from typing import List, Tuple, Optional from datetime import datetime, timedelta from scipy import stats
@dataclass class HealthIndicator: """Single health indicator for an asset.""" name: str current_value: float threshold: float trend: float # Rate of degradation last_updated: datetime @dataclass class MaintenancePrediction: """Prediction of maintenance need.""" asset_id: str failure_mode: str probability: float rul_estimate: timedelta # Remaining useful life rul_confidence: Tuple[timedelta, timedelta] # 95% CI recommended_action: str urgency: str # "immediate", "soon", "scheduled", "none" class DegradationModel: """ Models asset degradation for RUL prediction. Uses exponential degradation model: h(t) = h0 * exp(beta * t) """ def __init__(self, initial_health: float = 1.0): self.h0 = initial_health self.beta = None # Degradation rate self.history: List[Tuple[float, float]] = [] # (time, health) def update(self, time: float, health: float): """Update model with new health observation.""" self.history.append((time, health)) if len(self.history) >= 3: # Fit exponential degradation model times = np.array([h[0] for h in self.history]) healths = np.array([h[1] for h in self.history]) # Log-linear regression for exponential fit log_healths = np.log(np.clip(healths, 1e-10, None)) slope, intercept, r_value, p_value, std_err = stats.linregress( times, log_healths ) self.beta = slope self.h0 = np.exp(intercept) self.fit_error = std_err def predict_rul(self, threshold: float = 0.2) -> Tuple[float, Tuple[float, float]]: """ Predict remaining useful life until health drops below threshold. Returns: (mean_rul, (lower_95, upper_95)) in time units """ if self.beta is None or self.beta >= 0: return float('inf'), (float('inf'), float('inf')) current_time = self.history[-1][0] current_health = self.history[-1][1] # Time until threshold: h(t) = threshold # t = (log(threshold) - log(h0)) / beta time_to_threshold = (np.log(threshold) - np.log(current_health)) / self.beta rul_mean = time_to_threshold - current_time # Confidence interval from fit error rul_std = abs(rul_mean * self.fit_error / self.beta) rul_lower = max(0, rul_mean - 1.96 * rul_std) rul_upper = rul_mean + 1.96 * rul_std return max(0, rul_mean), (rul_lower, rul_upper) class PredictiveMaintenanceEngine: """ Predictive maintenance engine for digital twin. Monitors health indicators, predicts failures, recommends actions. """ def __init__(self, twin: 'DigitalTwin'): self.twin = twin self.degradation_models: Dict[str, DegradationModel] = {} self.health_indicators: Dict[str, HealthIndicator] = {} self.predictions: List[MaintenancePrediction] = [] def register_indicator( self, name: str, extractor: Callable[[TwinState], float], threshold: float ): """Register a health indicator to monitor.""" self.degradation_models[name] = DegradationModel() self.health_indicators[name] = HealthIndicator( name=name, current_value=1.0, threshold=threshold, trend=0.0, last_updated=datetime.now() ) # Register with twin updates self.twin.callbacks.append( lambda state: self._update_indicator(name, extractor(state)) ) def _update_indicator(self, name: str, value: float): """Update indicator with new value.""" now = datetime.now() indicator = self.health_indicators[name] model = self.degradation_models[name] # Convert to operating time (hours since start) time = (now - datetime(2024, 1, 1)).total_seconds() / 3600 model.update(time, value) indicator.current_value = value indicator.trend = model.beta if model.beta else 0.0 indicator.last_updated = now def get_predictions(self) -> List[MaintenancePrediction]: """Generate maintenance predictions for all indicators.""" predictions = [] for name, indicator in self.health_indicators.items(): model = self.degradation_models[name] rul_mean, (rul_lower, rul_upper) = model.predict_rul(indicator.threshold) # Determine urgency if rul_mean < 24: # Less than 1 day urgency = "immediate" elif rul_mean < 168: # Less than 1 week urgency = "soon" elif rul_mean < 720: # Less than 1 month urgency = "scheduled" else: urgency = "none" predictions.append(MaintenancePrediction( asset_id=f"asset_{name}", failure_mode=f"{name}_degradation", probability=1.0 - indicator.current_value, rul_estimate=timedelta(hours=rul_mean), rul_confidence=(timedelta(hours=rul_lower), timedelta(hours=rul_upper)), recommended_action=self._recommend_action(name, urgency), urgency=urgency )) return predictions def _recommend_action(self, indicator: str, urgency: str) -> str: """Generate maintenance recommendation.""" if urgency == "immediate": return f"Critical: Replace {indicator} component immediately" elif urgency == "soon": return f"Schedule {indicator} maintenance within 1 week" elif urgency == "scheduled": return f"Plan {indicator} maintenance for next shutdown" else: return f"Monitor {indicator}, no action needed"
edge_cloud_hybrid: description: "Distributed twin execution across edge and cloud" example: | from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum import asyncio
class ProcessingLocation(Enum): EDGE = "edge" FOG = "fog" CLOUD = "cloud" @dataclass class ComputeRequirements: """Resource requirements for a computation.""" latency_max_ms: float memory_mb: int cpu_cores: float requires_gpu: bool = False @dataclass class EdgeNode: """Edge computing node near physical asset.""" node_id: str location: str available_memory_mb: int available_cpu_cores: float has_gpu: bool connected: bool class HybridTwinOrchestrator: """ Orchestrates digital twin across edge and cloud. Decisions based on: - Latency requirements (real-time control = edge) - Compute requirements (ML inference = cloud or GPU edge) - Data volume (reduce before sending to cloud) """ def __init__( self, edge_nodes: List[EdgeNode], cloud_endpoint: str ): self.edge_nodes = {n.node_id: n for n in edge_nodes} self.cloud_endpoint = cloud_endpoint # What runs where self.task_assignments: Dict[str, ProcessingLocation] = {} def assign_task( self, task_id: str, requirements: ComputeRequirements ) -> ProcessingLocation: """Assign task to optimal processing location.""" # Real-time requires edge if requirements.latency_max_ms < 50: edge = self._find_capable_edge(requirements) if edge: self.task_assignments[task_id] = ProcessingLocation.EDGE return ProcessingLocation.EDGE # Heavy compute goes to cloud if requirements.requires_gpu or requirements.memory_mb > 4096: self.task_assignments[task_id] = ProcessingLocation.CLOUD return ProcessingLocation.CLOUD # Default to fog (intermediate) self.task_assignments[task_id] = ProcessingLocation.FOG return ProcessingLocation.FOG def _find_capable_edge( self, requirements: ComputeRequirements ) -> Optional[EdgeNode]: """Find edge node that meets requirements.""" for node in self.edge_nodes.values(): if not node.connected: continue if node.available_memory_mb < requirements.memory_mb: continue if node.available_cpu_cores < requirements.cpu_cores: continue if requirements.requires_gpu and not node.has_gpu: continue return node return None async def execute_distributed( self, twin_state: TwinState ) -> Dict[str, any]: """ Execute twin computations across edge and cloud. Returns combined results from all locations. """ tasks = {} # Group by location edge_tasks = [t for t, loc in self.task_assignments.items() if loc == ProcessingLocation.EDGE] cloud_tasks = [t for t, loc in self.task_assignments.items() if loc == ProcessingLocation.CLOUD] # Execute in parallel results = await asyncio.gather( self._execute_edge(edge_tasks, twin_state), self._execute_cloud(cloud_tasks, twin_state) ) return {**results[0], **results[1]} async def _execute_edge( self, task_ids: List[str], state: TwinState ) -> Dict[str, any]: """Execute tasks on edge nodes.""" # Real implementation would dispatch to actual edge nodes return {} async def _execute_cloud( self, task_ids: List[str], state: TwinState ) -> Dict[str, any]: """Execute tasks in cloud.""" # Real implementation would call cloud API return {}
anti_patterns:
-
pattern: "No physics model, pure data-driven" problem: "Can't predict novel scenarios, poor extrapolation" solution: "Use physics-informed models with data-driven calibration"
-
pattern: "Synchronous sensor polling" problem: "Blocks on slow sensors, wastes time waiting" solution: "Use async I/O with timeouts and last-known-value fallback"
-
pattern: "Direct sensor values without filtering" problem: "Noise propagates to twin state, noisy predictions" solution: "Apply Kalman filtering or exponential smoothing"
-
pattern: "No data quality tracking" problem: "Stale or corrupt data treated as valid" solution: "Track timestamps, apply quality scores, degrade gracefully"
-
pattern: "Monolithic twin running in cloud" problem: "Latency prevents real-time control, bandwidth costs" solution: "Edge-cloud hybrid with latency-aware task placement"
-
pattern: "No model calibration feedback" problem: "Model drifts from reality over time" solution: "Monitor residuals, trigger recalibration when divergent"
-
pattern: "Missing twin instance management" problem: "Can't track multiple assets, no fleet view" solution: "Twin registry with lifecycle management"
handoffs:
-
to: physics-simulation when: "Need detailed physics modeling for the virtual component" context: "Digital twin needs accurate physics simulation engine"
-
to: sensor-fusion when: "Multiple sensors need fusion for state estimation" context: "Kalman filtering and sensor data fusion"
-
to: control-systems when: "Twin drives control decisions" context: "Twin provides model for MPC or control optimization"
-
to: embedded-systems when: "Edge processing on embedded hardware" context: "Deploy twin components to edge devices"
-
to: monte-carlo when: "Uncertainty quantification needed" context: "Probabilistic predictions and risk analysis"
ecosystem: platforms: - "Azure Digital Twins - Enterprise platform" - "AWS IoT TwinMaker - AWS offering" - "NVIDIA Omniverse - 3D visualization" - "Siemens MindSphere - Industrial IoT" - "PTC ThingWorx - Industrial platform"
open_source: - "Eclipse Ditto - Open source twin framework" - "Apache PLC4X - Industrial protocols" - "Node-RED - Flow-based IoT" - "Grafana - Visualization"
protocols: - "MQTT - Lightweight messaging" - "OPC UA - Industrial automation" - "Modbus - Legacy industrial" - "AMQP - Enterprise messaging"
simulation: - "Modelica - Multi-domain modeling" - "FMI/FMU - Model exchange standard" - "Gazebo - Robotics simulation" - "COMSOL - Multiphysics"
references: standards: - "ISO 23247 - Digital Twin Framework for Manufacturing" - "Asset Administration Shell (AAS) - Industrie 4.0" - "Digital Twin Consortium Reference Architecture"
papers: - "Digital Twin: Origin to Future - Rasheed et al." - "Physics-Informed Digital Twins - Kapteyn et al." - "Predictive Digital Twin - Tao et al."
books: - "Digital Twin Driven Smart Manufacturing - Tao et al." - "Industrial Digital Twins - Proceedings"