Vibeship-spawner-skills sensor-fusion

Sensor Fusion Skill

install
source · Clone the upstream repo
git clone https://github.com/vibeforge1111/vibeship-spawner-skills
manifest: hardware/sensor-fusion/skill.yaml
source content

Sensor Fusion Skill

State estimation and multi-sensor integration

id: sensor-fusion name: Sensor Fusion category: hardware complexity: advanced requires_skills:

  • statistical-analysis
  • embedded-systems

description: | Patterns for multi-sensor data fusion including Kalman filters (KF, EKF, UKF), complementary filters, particle filters, and factor graph optimization. Covers state estimation, covariance tuning, outlier rejection, and real-time implementation for robotics and autonomous systems.

patterns:

kalman_filter_basic: name: Linear Kalman Filter description: Optimal state estimator for linear systems with Gaussian noise pattern: | import numpy as np

  class KalmanFilter:
      """Linear Kalman filter for state estimation.

      State: x (n x 1)
      Process model: x_k = F @ x_{k-1} + B @ u + w, w ~ N(0, Q)
      Measurement model: z_k = H @ x_k + v, v ~ N(0, R)
      """

      def __init__(self, dim_state: int, dim_measurement: int, dim_control: int = 0):
          self.n = dim_state
          self.m = dim_measurement

          # State estimate and covariance
          self.x = np.zeros((dim_state, 1))
          self.P = np.eye(dim_state)

          # Model matrices
          self.F = np.eye(dim_state)           # State transition
          self.H = np.zeros((dim_measurement, dim_state))  # Measurement
          self.B = np.zeros((dim_state, max(dim_control, 1)))  # Control
          self.Q = np.eye(dim_state) * 0.01    # Process noise
          self.R = np.eye(dim_measurement) * 0.1  # Measurement noise

      def predict(self, u: np.ndarray = None) -> np.ndarray:
          """Predict step: propagate state and covariance."""
          if u is None:
              u = np.zeros((self.B.shape[1], 1))

          # State prediction
          self.x = self.F @ self.x + self.B @ u

          # Covariance prediction
          self.P = self.F @ self.P @ self.F.T + self.Q

          return self.x.copy()

      def update(self, z: np.ndarray) -> np.ndarray:
          """Update step: incorporate measurement."""
          # Innovation (measurement residual)
          y = z - self.H @ self.x

          # Innovation covariance
          S = self.H @ self.P @ self.H.T + self.R

          # Kalman gain
          K = self.P @ self.H.T @ np.linalg.inv(S)

          # State update
          self.x = self.x + K @ y

          # Covariance update (Joseph form for numerical stability)
          I_KH = np.eye(self.n) - K @ self.H
          self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T

          return self.x.copy()

      def get_state(self) -> tuple:
          """Return current state estimate and covariance."""
          return self.x.copy(), self.P.copy()

  # Example: 1D position-velocity tracking
  kf = KalmanFilter(dim_state=2, dim_measurement=1)
  dt = 0.1

  # State transition: constant velocity model
  kf.F = np.array([
      [1, dt],  # position = position + velocity * dt
      [0, 1]    # velocity = velocity
  ])

  # Measurement: we only observe position
  kf.H = np.array([[1, 0]])

  # Process noise: acceleration uncertainty
  q = 0.1  # acceleration noise
  kf.Q = np.array([
      [dt**4/4, dt**3/2],
      [dt**3/2, dt**2]
  ]) * q**2

  # Measurement noise
  kf.R = np.array([[1.0]])  # position measurement variance
why: "KF is optimal for linear Gaussian systems and foundation for all variants"

extended_kalman_filter: name: Extended Kalman Filter (EKF) description: State estimation for nonlinear systems via linearization critical: true pattern: | import numpy as np from typing import Callable, Tuple

  class ExtendedKalmanFilter:
      """EKF for nonlinear state estimation.

      Uses first-order Taylor expansion (Jacobians) for linearization.
      Requires: f(x, u) - process model, h(x) - measurement model
                F_jacobian(x, u), H_jacobian(x) - Jacobian functions
      """

      def __init__(self, dim_state: int, dim_measurement: int):
          self.n = dim_state
          self.m = dim_measurement

          self.x = np.zeros((dim_state, 1))
          self.P = np.eye(dim_state)
          self.Q = np.eye(dim_state) * 0.01
          self.R = np.eye(dim_measurement) * 0.1

      def predict(self,
                  f: Callable,
                  F_jacobian: Callable,
                  u: np.ndarray = None) -> np.ndarray:
          """Predict using nonlinear process model."""
          # Nonlinear state prediction
          self.x = f(self.x, u)

          # Linearize: compute Jacobian at current state
          F = F_jacobian(self.x, u)

          # Covariance prediction (uses linearized model)
          self.P = F @ self.P @ F.T + self.Q

          return self.x.copy()

      def update(self,
                 z: np.ndarray,
                 h: Callable,
                 H_jacobian: Callable) -> np.ndarray:
          """Update using nonlinear measurement model."""
          # Predicted measurement
          z_pred = h(self.x)

          # Linearize measurement model
          H = H_jacobian(self.x)

          # Innovation
          y = z - z_pred

          # Innovation covariance
          S = H @ self.P @ H.T + self.R

          # Kalman gain
          K = self.P @ H.T @ np.linalg.inv(S)

          # State update
          self.x = self.x + K @ y

          # Covariance update
          I_KH = np.eye(self.n) - K @ H
          self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T

          return self.x.copy()

      def normalized_innovation_squared(self, y: np.ndarray, S: np.ndarray) -> float:
          """NEES/NIS for filter consistency monitoring."""
          return float(y.T @ np.linalg.inv(S) @ y)

  # Example: 2D robot localization (x, y, theta)
  def process_model(x, u):
      """Differential drive kinematics."""
      v, omega = u[0, 0], u[1, 0]  # linear/angular velocity
      theta = x[2, 0]
      dt = 0.1

      if abs(omega) < 1e-6:
          # Straight line motion
          dx = v * dt * np.cos(theta)
          dy = v * dt * np.sin(theta)
          dtheta = 0
      else:
          # Arc motion
          r = v / omega
          dx = r * (np.sin(theta + omega * dt) - np.sin(theta))
          dy = r * (np.cos(theta) - np.cos(theta + omega * dt))
          dtheta = omega * dt

      return x + np.array([[dx], [dy], [dtheta]])

  def process_jacobian(x, u):
      """Jacobian of process model w.r.t. state."""
      v, omega = u[0, 0], u[1, 0]
      theta = x[2, 0]
      dt = 0.1

      F = np.eye(3)
      if abs(omega) < 1e-6:
          F[0, 2] = -v * dt * np.sin(theta)
          F[1, 2] = v * dt * np.cos(theta)
      else:
          r = v / omega
          F[0, 2] = r * (np.cos(theta + omega * dt) - np.cos(theta))
          F[1, 2] = r * (np.sin(theta + omega * dt) - np.sin(theta))

      return F
why: "EKF handles nonlinear systems but requires careful Jacobian computation"

unscented_kalman_filter: name: Unscented Kalman Filter (UKF) description: Derivative-free nonlinear estimation using sigma points pattern: | import numpy as np from scipy.linalg import cholesky

  class UnscentedKalmanFilter:
      """UKF using sigma point propagation.

      No Jacobians needed - handles nonlinearity better than EKF.
      Uses unscented transform to propagate mean and covariance.
      """

      def __init__(self, dim_state: int, dim_measurement: int,
                   alpha: float = 1e-3, beta: float = 2.0, kappa: float = 0.0):
          self.n = dim_state
          self.m = dim_measurement

          self.x = np.zeros((dim_state, 1))
          self.P = np.eye(dim_state)
          self.Q = np.eye(dim_state) * 0.01
          self.R = np.eye(dim_measurement) * 0.1

          # Sigma point parameters
          self.alpha = alpha
          self.beta = beta
          self.kappa = kappa
          self._compute_weights()

      def _compute_weights(self):
          """Compute sigma point weights."""
          n = self.n
          lam = self.alpha**2 * (n + self.kappa) - n

          # Weight for mean
          self.Wm = np.zeros(2 * n + 1)
          self.Wm[0] = lam / (n + lam)
          self.Wm[1:] = 1.0 / (2 * (n + lam))

          # Weight for covariance
          self.Wc = self.Wm.copy()
          self.Wc[0] += 1 - self.alpha**2 + self.beta

          self.gamma = np.sqrt(n + lam)

      def _sigma_points(self, x: np.ndarray, P: np.ndarray) -> np.ndarray:
          """Generate 2n+1 sigma points."""
          n = self.n
          sigmas = np.zeros((2 * n + 1, n))

          # Cholesky decomposition
          try:
              sqrtP = cholesky(P, lower=True)
          except np.linalg.LinAlgError:
              # Fall back if not positive definite
              sqrtP = cholesky(P + np.eye(n) * 1e-6, lower=True)

          sigmas[0] = x.flatten()
          for i in range(n):
              sigmas[i + 1] = x.flatten() + self.gamma * sqrtP[:, i]
              sigmas[n + i + 1] = x.flatten() - self.gamma * sqrtP[:, i]

          return sigmas

      def predict(self, f, u=None):
          """Predict step using sigma point propagation."""
          # Generate sigma points
          sigmas = self._sigma_points(self.x, self.P)

          # Propagate each sigma point through f
          sigmas_f = np.array([f(s.reshape(-1, 1), u).flatten()
                               for s in sigmas])

          # Weighted mean
          self.x = np.sum(self.Wm[:, np.newaxis] * sigmas_f, axis=0).reshape(-1, 1)

          # Weighted covariance
          self.P = self.Q.copy()
          for i, s in enumerate(sigmas_f):
              d = (s - self.x.flatten()).reshape(-1, 1)
              self.P += self.Wc[i] * (d @ d.T)

          return self.x.copy()

      def update(self, z, h):
          """Update step using sigma point propagation."""
          # Generate sigma points
          sigmas = self._sigma_points(self.x, self.P)

          # Transform sigma points through measurement model
          sigmas_h = np.array([h(s.reshape(-1, 1)).flatten()
                               for s in sigmas])

          # Predicted measurement
          z_pred = np.sum(self.Wm[:, np.newaxis] * sigmas_h, axis=0).reshape(-1, 1)

          # Innovation covariance Pzz and cross-covariance Pxz
          Pzz = self.R.copy()
          Pxz = np.zeros((self.n, self.m))

          for i in range(len(sigmas)):
              dz = (sigmas_h[i] - z_pred.flatten()).reshape(-1, 1)
              dx = (sigmas[i] - self.x.flatten()).reshape(-1, 1)
              Pzz += self.Wc[i] * (dz @ dz.T)
              Pxz += self.Wc[i] * (dx @ dz.T)

          # Kalman gain
          K = Pxz @ np.linalg.inv(Pzz)

          # Update
          self.x = self.x + K @ (z - z_pred)
          self.P = self.P - K @ Pzz @ K.T

          return self.x.copy()
why: "UKF avoids Jacobian computation and handles nonlinearity better than EKF"

complementary_filter: name: Complementary Filter description: Simple sensor fusion for IMU attitude estimation pattern: | import numpy as np

  class ComplementaryFilter:
      """Complementary filter for attitude estimation.

      Fuses:
      - Gyroscope: accurate short-term, drifts long-term
      - Accelerometer: accurate long-term, noisy short-term

      Much simpler than Kalman filter, often sufficient for basic IMU fusion.
      """

      def __init__(self, alpha: float = 0.98, dt: float = 0.01):
          self.alpha = alpha  # High-pass weight for gyro
          self.dt = dt
          self.roll = 0.0
          self.pitch = 0.0

      def update(self, gyro: np.ndarray, accel: np.ndarray) -> tuple:
          """
          Fuse gyroscope and accelerometer readings.

          Args:
              gyro: [gx, gy, gz] in rad/s
              accel: [ax, ay, az] in m/s^2 (normalized or raw)

          Returns:
              (roll, pitch) in radians
          """
          # Integrate gyroscope (high-frequency, drifts)
          roll_gyro = self.roll + gyro[0] * self.dt
          pitch_gyro = self.pitch + gyro[1] * self.dt

          # Calculate angles from accelerometer (low-frequency, stable)
          roll_accel = np.arctan2(accel[1], accel[2])
          pitch_accel = np.arctan2(-accel[0],
                                    np.sqrt(accel[1]**2 + accel[2]**2))

          # Complementary filter: trust gyro short-term, accel long-term
          self.roll = self.alpha * roll_gyro + (1 - self.alpha) * roll_accel
          self.pitch = self.alpha * pitch_gyro + (1 - self.alpha) * pitch_accel

          return self.roll, self.pitch

  # Usage for typical 100Hz IMU
  cf = ComplementaryFilter(alpha=0.98, dt=0.01)

  # In sensor callback
  roll, pitch = cf.update(
      gyro=np.array([gx, gy, gz]),  # rad/s
      accel=np.array([ax, ay, az])  # m/s^2
  )
why: "Simple and effective for attitude estimation without Kalman complexity"

imu_preintegration: name: IMU Preintegration description: Efficient IMU integration for visual-inertial odometry pattern: | import numpy as np from scipy.spatial.transform import Rotation

  class IMUPreintegrator:
      """IMU preintegration for efficient VIO.

      Preintegrates IMU measurements between keyframes without
      needing to reintegrate when pose estimates change.

      Used in: VINS-Mono, ORB-SLAM3, GTSAM
      """

      def __init__(self, acc_noise: float = 0.1, gyro_noise: float = 0.01,
                   acc_bias_noise: float = 0.001, gyro_bias_noise: float = 0.0001):
          # Preintegrated measurements
          self.delta_R = np.eye(3)  # Rotation
          self.delta_v = np.zeros(3)  # Velocity
          self.delta_p = np.zeros(3)  # Position

          # Jacobians for bias correction
          self.J_R_bg = np.zeros((3, 3))  # d(delta_R)/d(gyro_bias)
          self.J_v_ba = np.zeros((3, 3))  # d(delta_v)/d(acc_bias)
          self.J_v_bg = np.zeros((3, 3))
          self.J_p_ba = np.zeros((3, 3))
          self.J_p_bg = np.zeros((3, 3))

          # Covariance
          self.cov = np.zeros((9, 9))

          # Noise parameters
          self.acc_noise = acc_noise
          self.gyro_noise = gyro_noise

          self.dt_sum = 0.0

      def integrate(self, acc: np.ndarray, gyro: np.ndarray, dt: float,
                    acc_bias: np.ndarray = None, gyro_bias: np.ndarray = None):
          """Integrate single IMU measurement."""
          if acc_bias is None:
              acc_bias = np.zeros(3)
          if gyro_bias is None:
              gyro_bias = np.zeros(3)

          # Bias-corrected measurements
          acc_unbiased = acc - acc_bias
          gyro_unbiased = gyro - gyro_bias

          # Rotation increment
          dtheta = gyro_unbiased * dt
          dR = Rotation.from_rotvec(dtheta).as_matrix()

          # Update preintegrated rotation
          R_prev = self.delta_R.copy()
          self.delta_R = self.delta_R @ dR

          # Rotated acceleration
          acc_world = R_prev @ acc_unbiased

          # Update velocity and position
          self.delta_p += self.delta_v * dt + 0.5 * acc_world * dt**2
          self.delta_v += acc_world * dt

          # Update Jacobians for bias correction
          self._update_jacobians(acc_unbiased, gyro_unbiased, R_prev, dt)

          # Update covariance
          self._update_covariance(R_prev, dt)

          self.dt_sum += dt

      def _update_jacobians(self, acc, gyro, R, dt):
          """Update Jacobians for first-order bias correction."""
          # Skew-symmetric matrix
          def skew(v):
              return np.array([
                  [0, -v[2], v[1]],
                  [v[2], 0, -v[0]],
                  [-v[1], v[0], 0]
              ])

          self.J_R_bg = self.J_R_bg - self.delta_R.T @ skew(gyro) * dt
          self.J_v_ba = self.J_v_ba - R * dt
          self.J_v_bg = self.J_v_bg - R @ skew(acc) @ self.J_R_bg * dt
          self.J_p_ba = self.J_p_ba + self.J_v_ba * dt
          self.J_p_bg = self.J_p_bg + self.J_v_bg * dt

      def _update_covariance(self, R, dt):
          """Propagate preintegration covariance."""
          # Noise covariance
          Q = np.diag([
              self.gyro_noise**2, self.gyro_noise**2, self.gyro_noise**2,
              self.acc_noise**2, self.acc_noise**2, self.acc_noise**2,
              0, 0, 0  # No noise on position directly
          ])

          # State transition (simplified)
          A = np.eye(9)
          A[3:6, 0:3] = -R @ self._skew(self.delta_v) * dt
          A[6:9, 0:3] = -R @ self._skew(self.delta_p) * dt
          A[6:9, 3:6] = np.eye(3) * dt

          self.cov = A @ self.cov @ A.T + Q * dt**2

      @staticmethod
      def _skew(v):
          return np.array([
              [0, -v[2], v[1]],
              [v[2], 0, -v[0]],
              [-v[1], v[0], 0]
          ])

      def correct_bias(self, delta_ba: np.ndarray, delta_bg: np.ndarray):
          """First-order bias correction without reintegration."""
          # Correct rotation
          self.delta_R = self.delta_R @ Rotation.from_rotvec(
              self.J_R_bg @ delta_bg
          ).as_matrix()

          # Correct velocity and position
          self.delta_v += self.J_v_ba @ delta_ba + self.J_v_bg @ delta_bg
          self.delta_p += self.J_p_ba @ delta_ba + self.J_p_bg @ delta_bg
why: "Preintegration enables efficient pose graph optimization in VIO"

outlier_rejection: name: Outlier Rejection with Mahalanobis Gating description: Detect and reject spurious measurements pattern: | import numpy as np from scipy.stats import chi2

  class RobustKalmanFilter:
      """Kalman filter with Mahalanobis distance gating.

      Rejects measurements that are statistically unlikely
      given the current state estimate.
      """

      def __init__(self, kf, gate_threshold: float = None):
          self.kf = kf
          # Chi-squared threshold (95% confidence for m DOF)
          if gate_threshold is None:
              self.gate_threshold = chi2.ppf(0.95, df=kf.m)
          else:
              self.gate_threshold = gate_threshold

          self.rejected_count = 0
          self.total_count = 0

      def update(self, z: np.ndarray, **kwargs) -> tuple:
          """Update with outlier gating."""
          self.total_count += 1

          # Compute innovation
          if hasattr(self.kf, 'h'):
              z_pred = self.kf.h(self.kf.x)
              H = self.kf.H_jacobian(self.kf.x)
          else:
              z_pred = self.kf.H @ self.kf.x
              H = self.kf.H

          y = z - z_pred

          # Innovation covariance
          S = H @ self.kf.P @ H.T + self.kf.R

          # Mahalanobis distance squared
          d2 = float(y.T @ np.linalg.inv(S) @ y)

          # Gating decision
          if d2 > self.gate_threshold:
              self.rejected_count += 1
              # Optionally inflate covariance instead of rejecting
              # self.kf.P *= 1.1
              return self.kf.x.copy(), False  # Rejected

          # Normal update
          return self.kf.update(z, **kwargs), True  # Accepted

      def get_rejection_rate(self) -> float:
          """Monitor filter health via rejection rate."""
          if self.total_count == 0:
              return 0.0
          return self.rejected_count / self.total_count

  # Huber loss for soft outlier handling
  def huber_weight(residual: float, delta: float = 1.0) -> float:
      """Huber weighting function for robust estimation."""
      abs_r = abs(residual)
      if abs_r <= delta:
          return 1.0
      else:
          return delta / abs_r
why: "Outliers can corrupt filter estimates; gating prevents divergence"

anti_patterns:

hardcoded_noise: name: Hardcoded Noise Covariances problem: "Q and R matrices guessed without calibration" solution: "Use Allan variance for IMU, empirical measurement for sensors"

no_health_monitoring: name: No Filter Health Monitoring problem: "Filter diverges silently, no NEES/NIS checking" solution: "Monitor normalized innovation, detect inconsistency early"

wrong_frame_transform: name: Incorrect Frame Transformations problem: "Sensor data not transformed to common frame" solution: "Establish body frame, transform all sensors consistently"

ekf_high_nonlinearity: name: EKF on Highly Nonlinear Systems problem: "EKF linearization breaks down, filter diverges" solution: "Use UKF or iterated EKF for highly nonlinear systems"

handoffs:

  • to: embedded-systems when: "Implementing filter on microcontroller" pass: "Fixed-point Q format, memory constraints, timing requirements"

  • to: ros2-robotics when: "Integrating with ROS2 navigation" pass: "Message types, TF frames, covariance format"

  • to: control-systems when: "Using estimates for control" pass: "State estimates, covariance, update rate"

ecosystem: libraries: - "FilterPy - Python Kalman filter library" - "robot_localization - ROS2 EKF/UKF package" - "GTSAM - Factor graph optimization" - "Eigen - C++ linear algebra" - "Ceres Solver - Nonlinear least squares"

frameworks: - "ORB-SLAM3 - Visual-Inertial SLAM" - "VINS-Mono - Monocular VIO" - "OpenVINS - Research VIO platform" - "Kalibr - IMU/camera calibration"

tools: - "Allan variance tools - IMU noise characterization" - "PlotJuggler - Real-time plotting" - "rosbag2 - Data recording for offline tuning"