Vibeship-spawner-skills digital-twin

id: digital-twin

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: simulation/digital-twin/skill.yaml
source content

id: digital-twin name: Digital Twin Development category: simulation description: | Build and maintain digital twins - virtual representations of physical systems that synchronize with real-world counterparts for monitoring, prediction, and optimization. version: 1.0.0

triggers:

  • "digital twin"
  • "virtual model"
  • "real-time synchronization"
  • "physical-virtual coupling"
  • "predictive maintenance"
  • "asset modeling"
  • "system replica"
  • "live simulation"

provides:

  • Digital twin architecture design
  • State synchronization patterns
  • Sensor integration strategies
  • Physics model calibration
  • Predictive analytics pipelines
  • Edge-cloud hybrid deployment

patterns: twin_architecture: description: "Core digital twin system structure" example: | from dataclasses import dataclass, field from typing import Dict, List, Optional, Callable from datetime import datetime import numpy as np from abc import ABC, abstractmethod

  @dataclass
  class SensorReading:
      """Single sensor measurement."""
      sensor_id: str
      timestamp: datetime
      value: float
      unit: str
      quality: float = 1.0  # Data quality score 0-1

  @dataclass
  class TwinState:
      """Complete state of the digital twin."""
      timestamp: datetime
      physical_state: Dict[str, float]  # Measured values
      virtual_state: Dict[str, float]   # Simulated values
      residuals: Dict[str, float]       # Differences
      confidence: Dict[str, float]      # State confidence

  class PhysicsModel(ABC):
      """Abstract physics model for simulation."""

      @abstractmethod
      def predict(self, state: Dict[str, float], dt: float) -> Dict[str, float]:
          """Predict next state given current state and time step."""
          pass

      @abstractmethod
      def jacobian(self, state: Dict[str, float]) -> np.ndarray:
          """Compute Jacobian for state estimation."""
          pass

  class DigitalTwin:
      """
      Digital twin with state synchronization.

      Combines physics-based prediction with sensor updates
      using Extended Kalman Filter for state estimation.
      """

      def __init__(
          self,
          model: PhysicsModel,
          state_dim: int,
          measurement_dim: int,
          process_noise: float = 0.01,
          measurement_noise: float = 0.1
      ):
          self.model = model
          self.state_dim = state_dim
          self.measurement_dim = measurement_dim

          # State estimate and covariance
          self.state = np.zeros(state_dim)
          self.covariance = np.eye(state_dim)

          # Noise covariances
          self.Q = np.eye(state_dim) * process_noise
          self.R = np.eye(measurement_dim) * measurement_noise

          # History for analysis
          self.state_history: List[TwinState] = []
          self.last_update = datetime.now()

          # Callbacks for state changes
          self.callbacks: List[Callable[[TwinState], None]] = []

      def predict(self, dt: float) -> np.ndarray:
          """Predict state forward using physics model."""
          state_dict = self._array_to_dict(self.state)

          # Physics prediction
          predicted_dict = self.model.predict(state_dict, dt)
          predicted = self._dict_to_array(predicted_dict)

          # Jacobian for covariance propagation
          F = self.model.jacobian(state_dict)

          # Propagate covariance
          self.covariance = F @ self.covariance @ F.T + self.Q

          self.state = predicted
          return predicted

      def update(self, measurements: Dict[str, SensorReading]) -> TwinState:
          """
          Update state with sensor measurements.

          Uses Extended Kalman Filter to fuse predictions with measurements.
          """
          now = datetime.now()
          dt = (now - self.last_update).total_seconds()

          # Predict to current time
          if dt > 0:
              self.predict(dt)

          # Convert measurements to array
          z = np.array([m.value for m in measurements.values()])
          qualities = np.array([m.quality for m in measurements.values()])

          # Adjust measurement noise based on quality
          R_adjusted = self.R / np.diag(qualities + 0.01)

          # Measurement matrix (maps state to measurements)
          H = self._measurement_matrix(list(measurements.keys()))

          # Kalman gain
          S = H @ self.covariance @ H.T + R_adjusted
          K = self.covariance @ H.T @ np.linalg.inv(S)

          # Innovation (measurement residual)
          innovation = z - H @ self.state

          # Update state
          self.state = self.state + K @ innovation
          self.covariance = (np.eye(self.state_dim) - K @ H) @ self.covariance

          # Create twin state
          twin_state = TwinState(
              timestamp=now,
              physical_state={k: m.value for k, m in measurements.items()},
              virtual_state=self._array_to_dict(self.state),
              residuals={k: float(innovation[i]) for i, k in enumerate(measurements.keys())},
              confidence=self._compute_confidence()
          )

          self.state_history.append(twin_state)
          self.last_update = now

          # Notify callbacks
          for callback in self.callbacks:
              callback(twin_state)

          return twin_state

      def _compute_confidence(self) -> Dict[str, float]:
          """Compute confidence from covariance diagonal."""
          variances = np.diag(self.covariance)
          # Convert variance to confidence (0-1 scale)
          confidence = 1.0 / (1.0 + variances)
          return self._array_to_dict(confidence)

      def _measurement_matrix(self, sensor_ids: List[str]) -> np.ndarray:
          """Build measurement matrix H for given sensors."""
          # Override in subclass for specific sensor configurations
          return np.eye(len(sensor_ids), self.state_dim)

      def _array_to_dict(self, arr: np.ndarray) -> Dict[str, float]:
          """Convert state array to named dictionary."""
          # Override with actual state names
          return {f"state_{i}": float(v) for i, v in enumerate(arr)}

      def _dict_to_array(self, d: Dict[str, float]) -> np.ndarray:
          """Convert named dictionary to state array."""
          return np.array(list(d.values()))

state_synchronization: description: "Bidirectional sync between physical and virtual" example: | import asyncio from typing import Protocol, Optional from dataclasses import dataclass from datetime import datetime, timedelta import json

  class SyncStrategy(Protocol):
      """Protocol for synchronization strategies."""

      async def sync_to_virtual(self, physical_state: dict) -> dict:
          """Update virtual from physical."""
          ...

      async def sync_to_physical(self, virtual_state: dict) -> dict:
          """Command physical from virtual (if supported)."""
          ...

  @dataclass
  class SyncConfig:
      """Configuration for state synchronization."""
      sync_interval: float = 1.0  # seconds
      stale_threshold: float = 10.0  # seconds before data considered stale
      conflict_resolution: str = "physical_wins"  # or "virtual_wins", "merge"
      enable_commands: bool = False  # Allow virtual to control physical

  class StateSynchronizer:
      """
      Manages state synchronization between physical and digital twin.

      Handles:
      - Periodic sync from sensors
      - Conflict resolution
      - Stale data detection
      - Command dispatch (optional)
      """

      def __init__(
          self,
          twin: 'DigitalTwin',
          sensor_gateway: 'SensorGateway',
          config: SyncConfig = SyncConfig()
      ):
          self.twin = twin
          self.sensors = sensor_gateway
          self.config = config
          self.running = False

          # Track sync state
          self.last_physical_update = datetime.min
          self.last_virtual_update = datetime.min
          self.sync_errors = []

      async def start(self):
          """Start synchronization loop."""
          self.running = True
          while self.running:
              try:
                  await self._sync_cycle()
              except Exception as e:
                  self.sync_errors.append({
                      'time': datetime.now(),
                      'error': str(e)
                  })
              await asyncio.sleep(self.config.sync_interval)

      async def stop(self):
          """Stop synchronization loop."""
          self.running = False

      async def _sync_cycle(self):
          """Single synchronization cycle."""
          # 1. Fetch physical state from sensors
          readings = await self.sensors.get_latest()

          # 2. Check for stale data
          now = datetime.now()
          for reading in readings.values():
              age = (now - reading.timestamp).total_seconds()
              if age > self.config.stale_threshold:
                  reading.quality *= 0.5  # Reduce quality for stale data

          # 3. Update twin with physical measurements
          twin_state = self.twin.update(readings)
          self.last_physical_update = now

          # 4. Check for divergence
          max_residual = max(abs(r) for r in twin_state.residuals.values())
          if max_residual > 0.1:  # Threshold for significant divergence
              await self._handle_divergence(twin_state)

          # 5. Optionally sync commands back to physical
          if self.config.enable_commands:
              await self._dispatch_commands(twin_state)

      async def _handle_divergence(self, state: TwinState):
          """Handle significant divergence between physical and virtual."""
          if self.config.conflict_resolution == "physical_wins":
              # Trust sensors, recalibrate model
              pass
          elif self.config.conflict_resolution == "virtual_wins":
              # Trust model, flag sensor issues
              pass
          elif self.config.conflict_resolution == "merge":
              # Weighted combination based on confidence
              pass

      async def _dispatch_commands(self, state: TwinState):
          """Send control commands from virtual to physical."""
          # Only if twin has determined optimal setpoints
          pass

sensor_integration: description: "Multi-sensor data ingestion and fusion" example: | from typing import Dict, List, Callable, Optional from dataclasses import dataclass from datetime import datetime import asyncio from abc import ABC, abstractmethod

  @dataclass
  class SensorConfig:
      """Configuration for a sensor."""
      sensor_id: str
      name: str
      unit: str
      min_value: float
      max_value: float
      sample_rate: float  # Hz
      protocol: str  # "mqtt", "modbus", "opcua", "rest"
      address: str

  class SensorAdapter(ABC):
      """Abstract adapter for different sensor protocols."""

      @abstractmethod
      async def connect(self):
          pass

      @abstractmethod
      async def read(self) -> SensorReading:
          pass

      @abstractmethod
      async def disconnect(self):
          pass

  class MQTTSensorAdapter(SensorAdapter):
      """Adapter for MQTT-based sensors."""

      def __init__(self, config: SensorConfig, broker: str):
          self.config = config
          self.broker = broker
          self.client = None
          self.last_value = None

      async def connect(self):
          import aiomqtt
          self.client = aiomqtt.Client(self.broker)
          await self.client.connect()
          await self.client.subscribe(self.config.address)

      async def read(self) -> SensorReading:
          async for message in self.client.messages:
              value = float(message.payload.decode())
              return SensorReading(
                  sensor_id=self.config.sensor_id,
                  timestamp=datetime.now(),
                  value=self._validate(value),
                  unit=self.config.unit
              )

      def _validate(self, value: float) -> float:
          """Validate sensor reading against bounds."""
          return max(self.config.min_value,
                    min(self.config.max_value, value))

      async def disconnect(self):
          if self.client:
              await self.client.disconnect()

  class SensorGateway:
      """
      Central gateway for all sensors feeding the digital twin.

      Manages connections, buffering, and data quality.
      """

      def __init__(self):
          self.sensors: Dict[str, SensorAdapter] = {}
          self.latest: Dict[str, SensorReading] = {}
          self.buffers: Dict[str, List[SensorReading]] = {}
          self.buffer_size = 100

      async def register(self, config: SensorConfig, adapter: SensorAdapter):
          """Register a sensor with its adapter."""
          self.sensors[config.sensor_id] = adapter
          self.buffers[config.sensor_id] = []
          await adapter.connect()

      async def get_latest(self) -> Dict[str, SensorReading]:
          """Get latest reading from all sensors."""
          tasks = {
              sid: asyncio.create_task(adapter.read())
              for sid, adapter in self.sensors.items()
          }

          for sid, task in tasks.items():
              try:
                  reading = await asyncio.wait_for(task, timeout=1.0)
                  self.latest[sid] = reading
                  self._buffer_reading(sid, reading)
              except asyncio.TimeoutError:
                  # Use last known value with degraded quality
                  if sid in self.latest:
                      self.latest[sid].quality *= 0.8

          return self.latest

      def _buffer_reading(self, sensor_id: str, reading: SensorReading):
          """Buffer reading for historical analysis."""
          buf = self.buffers[sensor_id]
          buf.append(reading)
          if len(buf) > self.buffer_size:
              buf.pop(0)

predictive_maintenance: description: "Failure prediction and remaining useful life estimation" example: | import numpy as np from dataclasses import dataclass from typing import List, Tuple, Optional from datetime import datetime, timedelta from scipy import stats

  @dataclass
  class HealthIndicator:
      """Single health indicator for an asset."""
      name: str
      current_value: float
      threshold: float
      trend: float  # Rate of degradation
      last_updated: datetime

  @dataclass
  class MaintenancePrediction:
      """Prediction of maintenance need."""
      asset_id: str
      failure_mode: str
      probability: float
      rul_estimate: timedelta  # Remaining useful life
      rul_confidence: Tuple[timedelta, timedelta]  # 95% CI
      recommended_action: str
      urgency: str  # "immediate", "soon", "scheduled", "none"

  class DegradationModel:
      """
      Models asset degradation for RUL prediction.

      Uses exponential degradation model:
      h(t) = h0 * exp(beta * t)
      """

      def __init__(self, initial_health: float = 1.0):
          self.h0 = initial_health
          self.beta = None  # Degradation rate
          self.history: List[Tuple[float, float]] = []  # (time, health)

      def update(self, time: float, health: float):
          """Update model with new health observation."""
          self.history.append((time, health))

          if len(self.history) >= 3:
              # Fit exponential degradation model
              times = np.array([h[0] for h in self.history])
              healths = np.array([h[1] for h in self.history])

              # Log-linear regression for exponential fit
              log_healths = np.log(np.clip(healths, 1e-10, None))
              slope, intercept, r_value, p_value, std_err = stats.linregress(
                  times, log_healths
              )

              self.beta = slope
              self.h0 = np.exp(intercept)
              self.fit_error = std_err

      def predict_rul(self, threshold: float = 0.2) -> Tuple[float, Tuple[float, float]]:
          """
          Predict remaining useful life until health drops below threshold.

          Returns:
              (mean_rul, (lower_95, upper_95)) in time units
          """
          if self.beta is None or self.beta >= 0:
              return float('inf'), (float('inf'), float('inf'))

          current_time = self.history[-1][0]
          current_health = self.history[-1][1]

          # Time until threshold: h(t) = threshold
          # t = (log(threshold) - log(h0)) / beta
          time_to_threshold = (np.log(threshold) - np.log(current_health)) / self.beta
          rul_mean = time_to_threshold - current_time

          # Confidence interval from fit error
          rul_std = abs(rul_mean * self.fit_error / self.beta)
          rul_lower = max(0, rul_mean - 1.96 * rul_std)
          rul_upper = rul_mean + 1.96 * rul_std

          return max(0, rul_mean), (rul_lower, rul_upper)

  class PredictiveMaintenanceEngine:
      """
      Predictive maintenance engine for digital twin.

      Monitors health indicators, predicts failures, recommends actions.
      """

      def __init__(self, twin: 'DigitalTwin'):
          self.twin = twin
          self.degradation_models: Dict[str, DegradationModel] = {}
          self.health_indicators: Dict[str, HealthIndicator] = {}
          self.predictions: List[MaintenancePrediction] = []

      def register_indicator(
          self,
          name: str,
          extractor: Callable[[TwinState], float],
          threshold: float
      ):
          """Register a health indicator to monitor."""
          self.degradation_models[name] = DegradationModel()
          self.health_indicators[name] = HealthIndicator(
              name=name,
              current_value=1.0,
              threshold=threshold,
              trend=0.0,
              last_updated=datetime.now()
          )
          # Register with twin updates
          self.twin.callbacks.append(
              lambda state: self._update_indicator(name, extractor(state))
          )

      def _update_indicator(self, name: str, value: float):
          """Update indicator with new value."""
          now = datetime.now()
          indicator = self.health_indicators[name]
          model = self.degradation_models[name]

          # Convert to operating time (hours since start)
          time = (now - datetime(2024, 1, 1)).total_seconds() / 3600

          model.update(time, value)
          indicator.current_value = value
          indicator.trend = model.beta if model.beta else 0.0
          indicator.last_updated = now

      def get_predictions(self) -> List[MaintenancePrediction]:
          """Generate maintenance predictions for all indicators."""
          predictions = []

          for name, indicator in self.health_indicators.items():
              model = self.degradation_models[name]
              rul_mean, (rul_lower, rul_upper) = model.predict_rul(indicator.threshold)

              # Determine urgency
              if rul_mean < 24:  # Less than 1 day
                  urgency = "immediate"
              elif rul_mean < 168:  # Less than 1 week
                  urgency = "soon"
              elif rul_mean < 720:  # Less than 1 month
                  urgency = "scheduled"
              else:
                  urgency = "none"

              predictions.append(MaintenancePrediction(
                  asset_id=f"asset_{name}",
                  failure_mode=f"{name}_degradation",
                  probability=1.0 - indicator.current_value,
                  rul_estimate=timedelta(hours=rul_mean),
                  rul_confidence=(timedelta(hours=rul_lower), timedelta(hours=rul_upper)),
                  recommended_action=self._recommend_action(name, urgency),
                  urgency=urgency
              ))

          return predictions

      def _recommend_action(self, indicator: str, urgency: str) -> str:
          """Generate maintenance recommendation."""
          if urgency == "immediate":
              return f"Critical: Replace {indicator} component immediately"
          elif urgency == "soon":
              return f"Schedule {indicator} maintenance within 1 week"
          elif urgency == "scheduled":
              return f"Plan {indicator} maintenance for next shutdown"
          else:
              return f"Monitor {indicator}, no action needed"

edge_cloud_hybrid: description: "Distributed twin execution across edge and cloud" example: | from typing import Dict, List, Optional from dataclasses import dataclass from enum import Enum import asyncio

  class ProcessingLocation(Enum):
      EDGE = "edge"
      FOG = "fog"
      CLOUD = "cloud"

  @dataclass
  class ComputeRequirements:
      """Resource requirements for a computation."""
      latency_max_ms: float
      memory_mb: int
      cpu_cores: float
      requires_gpu: bool = False

  @dataclass
  class EdgeNode:
      """Edge computing node near physical asset."""
      node_id: str
      location: str
      available_memory_mb: int
      available_cpu_cores: float
      has_gpu: bool
      connected: bool

  class HybridTwinOrchestrator:
      """
      Orchestrates digital twin across edge and cloud.

      Decisions based on:
      - Latency requirements (real-time control = edge)
      - Compute requirements (ML inference = cloud or GPU edge)
      - Data volume (reduce before sending to cloud)
      """

      def __init__(
          self,
          edge_nodes: List[EdgeNode],
          cloud_endpoint: str
      ):
          self.edge_nodes = {n.node_id: n for n in edge_nodes}
          self.cloud_endpoint = cloud_endpoint

          # What runs where
          self.task_assignments: Dict[str, ProcessingLocation] = {}

      def assign_task(
          self,
          task_id: str,
          requirements: ComputeRequirements
      ) -> ProcessingLocation:
          """Assign task to optimal processing location."""

          # Real-time requires edge
          if requirements.latency_max_ms < 50:
              edge = self._find_capable_edge(requirements)
              if edge:
                  self.task_assignments[task_id] = ProcessingLocation.EDGE
                  return ProcessingLocation.EDGE

          # Heavy compute goes to cloud
          if requirements.requires_gpu or requirements.memory_mb > 4096:
              self.task_assignments[task_id] = ProcessingLocation.CLOUD
              return ProcessingLocation.CLOUD

          # Default to fog (intermediate)
          self.task_assignments[task_id] = ProcessingLocation.FOG
          return ProcessingLocation.FOG

      def _find_capable_edge(
          self,
          requirements: ComputeRequirements
      ) -> Optional[EdgeNode]:
          """Find edge node that meets requirements."""
          for node in self.edge_nodes.values():
              if not node.connected:
                  continue
              if node.available_memory_mb < requirements.memory_mb:
                  continue
              if node.available_cpu_cores < requirements.cpu_cores:
                  continue
              if requirements.requires_gpu and not node.has_gpu:
                  continue
              return node
          return None

      async def execute_distributed(
          self,
          twin_state: TwinState
      ) -> Dict[str, any]:
          """
          Execute twin computations across edge and cloud.

          Returns combined results from all locations.
          """
          tasks = {}

          # Group by location
          edge_tasks = [t for t, loc in self.task_assignments.items()
                      if loc == ProcessingLocation.EDGE]
          cloud_tasks = [t for t, loc in self.task_assignments.items()
                       if loc == ProcessingLocation.CLOUD]

          # Execute in parallel
          results = await asyncio.gather(
              self._execute_edge(edge_tasks, twin_state),
              self._execute_cloud(cloud_tasks, twin_state)
          )

          return {**results[0], **results[1]}

      async def _execute_edge(
          self,
          task_ids: List[str],
          state: TwinState
      ) -> Dict[str, any]:
          """Execute tasks on edge nodes."""
          # Real implementation would dispatch to actual edge nodes
          return {}

      async def _execute_cloud(
          self,
          task_ids: List[str],
          state: TwinState
      ) -> Dict[str, any]:
          """Execute tasks in cloud."""
          # Real implementation would call cloud API
          return {}

anti_patterns:

  • pattern: "No physics model, pure data-driven" problem: "Can't predict novel scenarios, poor extrapolation" solution: "Use physics-informed models with data-driven calibration"

  • pattern: "Synchronous sensor polling" problem: "Blocks on slow sensors, wastes time waiting" solution: "Use async I/O with timeouts and last-known-value fallback"

  • pattern: "Direct sensor values without filtering" problem: "Noise propagates to twin state, noisy predictions" solution: "Apply Kalman filtering or exponential smoothing"

  • pattern: "No data quality tracking" problem: "Stale or corrupt data treated as valid" solution: "Track timestamps, apply quality scores, degrade gracefully"

  • pattern: "Monolithic twin running in cloud" problem: "Latency prevents real-time control, bandwidth costs" solution: "Edge-cloud hybrid with latency-aware task placement"

  • pattern: "No model calibration feedback" problem: "Model drifts from reality over time" solution: "Monitor residuals, trigger recalibration when divergent"

  • pattern: "Missing twin instance management" problem: "Can't track multiple assets, no fleet view" solution: "Twin registry with lifecycle management"

handoffs:

  • to: physics-simulation when: "Need detailed physics modeling for the virtual component" context: "Digital twin needs accurate physics simulation engine"

  • to: sensor-fusion when: "Multiple sensors need fusion for state estimation" context: "Kalman filtering and sensor data fusion"

  • to: control-systems when: "Twin drives control decisions" context: "Twin provides model for MPC or control optimization"

  • to: embedded-systems when: "Edge processing on embedded hardware" context: "Deploy twin components to edge devices"

  • to: monte-carlo when: "Uncertainty quantification needed" context: "Probabilistic predictions and risk analysis"

ecosystem: platforms: - "Azure Digital Twins - Enterprise platform" - "AWS IoT TwinMaker - AWS offering" - "NVIDIA Omniverse - 3D visualization" - "Siemens MindSphere - Industrial IoT" - "PTC ThingWorx - Industrial platform"

open_source: - "Eclipse Ditto - Open source twin framework" - "Apache PLC4X - Industrial protocols" - "Node-RED - Flow-based IoT" - "Grafana - Visualization"

protocols: - "MQTT - Lightweight messaging" - "OPC UA - Industrial automation" - "Modbus - Legacy industrial" - "AMQP - Enterprise messaging"

simulation: - "Modelica - Multi-domain modeling" - "FMI/FMU - Model exchange standard" - "Gazebo - Robotics simulation" - "COMSOL - Multiphysics"

references: standards: - "ISO 23247 - Digital Twin Framework for Manufacturing" - "Asset Administration Shell (AAS) - Industrie 4.0" - "Digital Twin Consortium Reference Architecture"

papers: - "Digital Twin: Origin to Future - Rasheed et al." - "Physics-Informed Digital Twins - Kapteyn et al." - "Predictive Digital Twin - Tao et al."

books: - "Digital Twin Driven Smart Manufacturing - Tao et al." - "Industrial Digital Twins - Proceedings"