git clone https://github.com/vibeforge1111/vibeship-spawner-skills
hardware/sensor-fusion/skill.yamlSensor Fusion Skill
State estimation and multi-sensor integration
id: sensor-fusion name: Sensor Fusion category: hardware complexity: advanced requires_skills:
- statistical-analysis
- embedded-systems
description: | Patterns for multi-sensor data fusion including Kalman filters (KF, EKF, UKF), complementary filters, particle filters, and factor graph optimization. Covers state estimation, covariance tuning, outlier rejection, and real-time implementation for robotics and autonomous systems.
patterns:
kalman_filter_basic: name: Linear Kalman Filter description: Optimal state estimator for linear systems with Gaussian noise pattern: | import numpy as np
class KalmanFilter: """Linear Kalman filter for state estimation. State: x (n x 1) Process model: x_k = F @ x_{k-1} + B @ u + w, w ~ N(0, Q) Measurement model: z_k = H @ x_k + v, v ~ N(0, R) """ def __init__(self, dim_state: int, dim_measurement: int, dim_control: int = 0): self.n = dim_state self.m = dim_measurement # State estimate and covariance self.x = np.zeros((dim_state, 1)) self.P = np.eye(dim_state) # Model matrices self.F = np.eye(dim_state) # State transition self.H = np.zeros((dim_measurement, dim_state)) # Measurement self.B = np.zeros((dim_state, max(dim_control, 1))) # Control self.Q = np.eye(dim_state) * 0.01 # Process noise self.R = np.eye(dim_measurement) * 0.1 # Measurement noise def predict(self, u: np.ndarray = None) -> np.ndarray: """Predict step: propagate state and covariance.""" if u is None: u = np.zeros((self.B.shape[1], 1)) # State prediction self.x = self.F @ self.x + self.B @ u # Covariance prediction self.P = self.F @ self.P @ self.F.T + self.Q return self.x.copy() def update(self, z: np.ndarray) -> np.ndarray: """Update step: incorporate measurement.""" # Innovation (measurement residual) y = z - self.H @ self.x # Innovation covariance S = self.H @ self.P @ self.H.T + self.R # Kalman gain K = self.P @ self.H.T @ np.linalg.inv(S) # State update self.x = self.x + K @ y # Covariance update (Joseph form for numerical stability) I_KH = np.eye(self.n) - K @ self.H self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T return self.x.copy() def get_state(self) -> tuple: """Return current state estimate and covariance.""" return self.x.copy(), self.P.copy() # Example: 1D position-velocity tracking kf = KalmanFilter(dim_state=2, dim_measurement=1) dt = 0.1 # State transition: constant velocity model kf.F = np.array([ [1, dt], # position = position + velocity * dt [0, 1] # velocity = velocity ]) # Measurement: we only observe position kf.H = np.array([[1, 0]]) # Process noise: acceleration uncertainty q = 0.1 # acceleration noise kf.Q = np.array([ [dt**4/4, dt**3/2], [dt**3/2, dt**2] ]) * q**2 # Measurement noise kf.R = np.array([[1.0]]) # position measurement variance why: "KF is optimal for linear Gaussian systems and foundation for all variants"
extended_kalman_filter: name: Extended Kalman Filter (EKF) description: State estimation for nonlinear systems via linearization critical: true pattern: | import numpy as np from typing import Callable, Tuple
class ExtendedKalmanFilter: """EKF for nonlinear state estimation. Uses first-order Taylor expansion (Jacobians) for linearization. Requires: f(x, u) - process model, h(x) - measurement model F_jacobian(x, u), H_jacobian(x) - Jacobian functions """ def __init__(self, dim_state: int, dim_measurement: int): self.n = dim_state self.m = dim_measurement self.x = np.zeros((dim_state, 1)) self.P = np.eye(dim_state) self.Q = np.eye(dim_state) * 0.01 self.R = np.eye(dim_measurement) * 0.1 def predict(self, f: Callable, F_jacobian: Callable, u: np.ndarray = None) -> np.ndarray: """Predict using nonlinear process model.""" # Nonlinear state prediction self.x = f(self.x, u) # Linearize: compute Jacobian at current state F = F_jacobian(self.x, u) # Covariance prediction (uses linearized model) self.P = F @ self.P @ F.T + self.Q return self.x.copy() def update(self, z: np.ndarray, h: Callable, H_jacobian: Callable) -> np.ndarray: """Update using nonlinear measurement model.""" # Predicted measurement z_pred = h(self.x) # Linearize measurement model H = H_jacobian(self.x) # Innovation y = z - z_pred # Innovation covariance S = H @ self.P @ H.T + self.R # Kalman gain K = self.P @ H.T @ np.linalg.inv(S) # State update self.x = self.x + K @ y # Covariance update I_KH = np.eye(self.n) - K @ H self.P = I_KH @ self.P @ I_KH.T + K @ self.R @ K.T return self.x.copy() def normalized_innovation_squared(self, y: np.ndarray, S: np.ndarray) -> float: """NEES/NIS for filter consistency monitoring.""" return float(y.T @ np.linalg.inv(S) @ y) # Example: 2D robot localization (x, y, theta) def process_model(x, u): """Differential drive kinematics.""" v, omega = u[0, 0], u[1, 0] # linear/angular velocity theta = x[2, 0] dt = 0.1 if abs(omega) < 1e-6: # Straight line motion dx = v * dt * np.cos(theta) dy = v * dt * np.sin(theta) dtheta = 0 else: # Arc motion r = v / omega dx = r * (np.sin(theta + omega * dt) - np.sin(theta)) dy = r * (np.cos(theta) - np.cos(theta + omega * dt)) dtheta = omega * dt return x + np.array([[dx], [dy], [dtheta]]) def process_jacobian(x, u): """Jacobian of process model w.r.t. state.""" v, omega = u[0, 0], u[1, 0] theta = x[2, 0] dt = 0.1 F = np.eye(3) if abs(omega) < 1e-6: F[0, 2] = -v * dt * np.sin(theta) F[1, 2] = v * dt * np.cos(theta) else: r = v / omega F[0, 2] = r * (np.cos(theta + omega * dt) - np.cos(theta)) F[1, 2] = r * (np.sin(theta + omega * dt) - np.sin(theta)) return F why: "EKF handles nonlinear systems but requires careful Jacobian computation"
unscented_kalman_filter: name: Unscented Kalman Filter (UKF) description: Derivative-free nonlinear estimation using sigma points pattern: | import numpy as np from scipy.linalg import cholesky
class UnscentedKalmanFilter: """UKF using sigma point propagation. No Jacobians needed - handles nonlinearity better than EKF. Uses unscented transform to propagate mean and covariance. """ def __init__(self, dim_state: int, dim_measurement: int, alpha: float = 1e-3, beta: float = 2.0, kappa: float = 0.0): self.n = dim_state self.m = dim_measurement self.x = np.zeros((dim_state, 1)) self.P = np.eye(dim_state) self.Q = np.eye(dim_state) * 0.01 self.R = np.eye(dim_measurement) * 0.1 # Sigma point parameters self.alpha = alpha self.beta = beta self.kappa = kappa self._compute_weights() def _compute_weights(self): """Compute sigma point weights.""" n = self.n lam = self.alpha**2 * (n + self.kappa) - n # Weight for mean self.Wm = np.zeros(2 * n + 1) self.Wm[0] = lam / (n + lam) self.Wm[1:] = 1.0 / (2 * (n + lam)) # Weight for covariance self.Wc = self.Wm.copy() self.Wc[0] += 1 - self.alpha**2 + self.beta self.gamma = np.sqrt(n + lam) def _sigma_points(self, x: np.ndarray, P: np.ndarray) -> np.ndarray: """Generate 2n+1 sigma points.""" n = self.n sigmas = np.zeros((2 * n + 1, n)) # Cholesky decomposition try: sqrtP = cholesky(P, lower=True) except np.linalg.LinAlgError: # Fall back if not positive definite sqrtP = cholesky(P + np.eye(n) * 1e-6, lower=True) sigmas[0] = x.flatten() for i in range(n): sigmas[i + 1] = x.flatten() + self.gamma * sqrtP[:, i] sigmas[n + i + 1] = x.flatten() - self.gamma * sqrtP[:, i] return sigmas def predict(self, f, u=None): """Predict step using sigma point propagation.""" # Generate sigma points sigmas = self._sigma_points(self.x, self.P) # Propagate each sigma point through f sigmas_f = np.array([f(s.reshape(-1, 1), u).flatten() for s in sigmas]) # Weighted mean self.x = np.sum(self.Wm[:, np.newaxis] * sigmas_f, axis=0).reshape(-1, 1) # Weighted covariance self.P = self.Q.copy() for i, s in enumerate(sigmas_f): d = (s - self.x.flatten()).reshape(-1, 1) self.P += self.Wc[i] * (d @ d.T) return self.x.copy() def update(self, z, h): """Update step using sigma point propagation.""" # Generate sigma points sigmas = self._sigma_points(self.x, self.P) # Transform sigma points through measurement model sigmas_h = np.array([h(s.reshape(-1, 1)).flatten() for s in sigmas]) # Predicted measurement z_pred = np.sum(self.Wm[:, np.newaxis] * sigmas_h, axis=0).reshape(-1, 1) # Innovation covariance Pzz and cross-covariance Pxz Pzz = self.R.copy() Pxz = np.zeros((self.n, self.m)) for i in range(len(sigmas)): dz = (sigmas_h[i] - z_pred.flatten()).reshape(-1, 1) dx = (sigmas[i] - self.x.flatten()).reshape(-1, 1) Pzz += self.Wc[i] * (dz @ dz.T) Pxz += self.Wc[i] * (dx @ dz.T) # Kalman gain K = Pxz @ np.linalg.inv(Pzz) # Update self.x = self.x + K @ (z - z_pred) self.P = self.P - K @ Pzz @ K.T return self.x.copy() why: "UKF avoids Jacobian computation and handles nonlinearity better than EKF"
complementary_filter: name: Complementary Filter description: Simple sensor fusion for IMU attitude estimation pattern: | import numpy as np
class ComplementaryFilter: """Complementary filter for attitude estimation. Fuses: - Gyroscope: accurate short-term, drifts long-term - Accelerometer: accurate long-term, noisy short-term Much simpler than Kalman filter, often sufficient for basic IMU fusion. """ def __init__(self, alpha: float = 0.98, dt: float = 0.01): self.alpha = alpha # High-pass weight for gyro self.dt = dt self.roll = 0.0 self.pitch = 0.0 def update(self, gyro: np.ndarray, accel: np.ndarray) -> tuple: """ Fuse gyroscope and accelerometer readings. Args: gyro: [gx, gy, gz] in rad/s accel: [ax, ay, az] in m/s^2 (normalized or raw) Returns: (roll, pitch) in radians """ # Integrate gyroscope (high-frequency, drifts) roll_gyro = self.roll + gyro[0] * self.dt pitch_gyro = self.pitch + gyro[1] * self.dt # Calculate angles from accelerometer (low-frequency, stable) roll_accel = np.arctan2(accel[1], accel[2]) pitch_accel = np.arctan2(-accel[0], np.sqrt(accel[1]**2 + accel[2]**2)) # Complementary filter: trust gyro short-term, accel long-term self.roll = self.alpha * roll_gyro + (1 - self.alpha) * roll_accel self.pitch = self.alpha * pitch_gyro + (1 - self.alpha) * pitch_accel return self.roll, self.pitch # Usage for typical 100Hz IMU cf = ComplementaryFilter(alpha=0.98, dt=0.01) # In sensor callback roll, pitch = cf.update( gyro=np.array([gx, gy, gz]), # rad/s accel=np.array([ax, ay, az]) # m/s^2 ) why: "Simple and effective for attitude estimation without Kalman complexity"
imu_preintegration: name: IMU Preintegration description: Efficient IMU integration for visual-inertial odometry pattern: | import numpy as np from scipy.spatial.transform import Rotation
class IMUPreintegrator: """IMU preintegration for efficient VIO. Preintegrates IMU measurements between keyframes without needing to reintegrate when pose estimates change. Used in: VINS-Mono, ORB-SLAM3, GTSAM """ def __init__(self, acc_noise: float = 0.1, gyro_noise: float = 0.01, acc_bias_noise: float = 0.001, gyro_bias_noise: float = 0.0001): # Preintegrated measurements self.delta_R = np.eye(3) # Rotation self.delta_v = np.zeros(3) # Velocity self.delta_p = np.zeros(3) # Position # Jacobians for bias correction self.J_R_bg = np.zeros((3, 3)) # d(delta_R)/d(gyro_bias) self.J_v_ba = np.zeros((3, 3)) # d(delta_v)/d(acc_bias) self.J_v_bg = np.zeros((3, 3)) self.J_p_ba = np.zeros((3, 3)) self.J_p_bg = np.zeros((3, 3)) # Covariance self.cov = np.zeros((9, 9)) # Noise parameters self.acc_noise = acc_noise self.gyro_noise = gyro_noise self.dt_sum = 0.0 def integrate(self, acc: np.ndarray, gyro: np.ndarray, dt: float, acc_bias: np.ndarray = None, gyro_bias: np.ndarray = None): """Integrate single IMU measurement.""" if acc_bias is None: acc_bias = np.zeros(3) if gyro_bias is None: gyro_bias = np.zeros(3) # Bias-corrected measurements acc_unbiased = acc - acc_bias gyro_unbiased = gyro - gyro_bias # Rotation increment dtheta = gyro_unbiased * dt dR = Rotation.from_rotvec(dtheta).as_matrix() # Update preintegrated rotation R_prev = self.delta_R.copy() self.delta_R = self.delta_R @ dR # Rotated acceleration acc_world = R_prev @ acc_unbiased # Update velocity and position self.delta_p += self.delta_v * dt + 0.5 * acc_world * dt**2 self.delta_v += acc_world * dt # Update Jacobians for bias correction self._update_jacobians(acc_unbiased, gyro_unbiased, R_prev, dt) # Update covariance self._update_covariance(R_prev, dt) self.dt_sum += dt def _update_jacobians(self, acc, gyro, R, dt): """Update Jacobians for first-order bias correction.""" # Skew-symmetric matrix def skew(v): return np.array([ [0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0] ]) self.J_R_bg = self.J_R_bg - self.delta_R.T @ skew(gyro) * dt self.J_v_ba = self.J_v_ba - R * dt self.J_v_bg = self.J_v_bg - R @ skew(acc) @ self.J_R_bg * dt self.J_p_ba = self.J_p_ba + self.J_v_ba * dt self.J_p_bg = self.J_p_bg + self.J_v_bg * dt def _update_covariance(self, R, dt): """Propagate preintegration covariance.""" # Noise covariance Q = np.diag([ self.gyro_noise**2, self.gyro_noise**2, self.gyro_noise**2, self.acc_noise**2, self.acc_noise**2, self.acc_noise**2, 0, 0, 0 # No noise on position directly ]) # State transition (simplified) A = np.eye(9) A[3:6, 0:3] = -R @ self._skew(self.delta_v) * dt A[6:9, 0:3] = -R @ self._skew(self.delta_p) * dt A[6:9, 3:6] = np.eye(3) * dt self.cov = A @ self.cov @ A.T + Q * dt**2 @staticmethod def _skew(v): return np.array([ [0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0] ]) def correct_bias(self, delta_ba: np.ndarray, delta_bg: np.ndarray): """First-order bias correction without reintegration.""" # Correct rotation self.delta_R = self.delta_R @ Rotation.from_rotvec( self.J_R_bg @ delta_bg ).as_matrix() # Correct velocity and position self.delta_v += self.J_v_ba @ delta_ba + self.J_v_bg @ delta_bg self.delta_p += self.J_p_ba @ delta_ba + self.J_p_bg @ delta_bg why: "Preintegration enables efficient pose graph optimization in VIO"
outlier_rejection: name: Outlier Rejection with Mahalanobis Gating description: Detect and reject spurious measurements pattern: | import numpy as np from scipy.stats import chi2
class RobustKalmanFilter: """Kalman filter with Mahalanobis distance gating. Rejects measurements that are statistically unlikely given the current state estimate. """ def __init__(self, kf, gate_threshold: float = None): self.kf = kf # Chi-squared threshold (95% confidence for m DOF) if gate_threshold is None: self.gate_threshold = chi2.ppf(0.95, df=kf.m) else: self.gate_threshold = gate_threshold self.rejected_count = 0 self.total_count = 0 def update(self, z: np.ndarray, **kwargs) -> tuple: """Update with outlier gating.""" self.total_count += 1 # Compute innovation if hasattr(self.kf, 'h'): z_pred = self.kf.h(self.kf.x) H = self.kf.H_jacobian(self.kf.x) else: z_pred = self.kf.H @ self.kf.x H = self.kf.H y = z - z_pred # Innovation covariance S = H @ self.kf.P @ H.T + self.kf.R # Mahalanobis distance squared d2 = float(y.T @ np.linalg.inv(S) @ y) # Gating decision if d2 > self.gate_threshold: self.rejected_count += 1 # Optionally inflate covariance instead of rejecting # self.kf.P *= 1.1 return self.kf.x.copy(), False # Rejected # Normal update return self.kf.update(z, **kwargs), True # Accepted def get_rejection_rate(self) -> float: """Monitor filter health via rejection rate.""" if self.total_count == 0: return 0.0 return self.rejected_count / self.total_count # Huber loss for soft outlier handling def huber_weight(residual: float, delta: float = 1.0) -> float: """Huber weighting function for robust estimation.""" abs_r = abs(residual) if abs_r <= delta: return 1.0 else: return delta / abs_r why: "Outliers can corrupt filter estimates; gating prevents divergence"
anti_patterns:
hardcoded_noise: name: Hardcoded Noise Covariances problem: "Q and R matrices guessed without calibration" solution: "Use Allan variance for IMU, empirical measurement for sensors"
no_health_monitoring: name: No Filter Health Monitoring problem: "Filter diverges silently, no NEES/NIS checking" solution: "Monitor normalized innovation, detect inconsistency early"
wrong_frame_transform: name: Incorrect Frame Transformations problem: "Sensor data not transformed to common frame" solution: "Establish body frame, transform all sensors consistently"
ekf_high_nonlinearity: name: EKF on Highly Nonlinear Systems problem: "EKF linearization breaks down, filter diverges" solution: "Use UKF or iterated EKF for highly nonlinear systems"
handoffs:
-
to: embedded-systems when: "Implementing filter on microcontroller" pass: "Fixed-point Q format, memory constraints, timing requirements"
-
to: ros2-robotics when: "Integrating with ROS2 navigation" pass: "Message types, TF frames, covariance format"
-
to: control-systems when: "Using estimates for control" pass: "State estimates, covariance, update rate"
ecosystem: libraries: - "FilterPy - Python Kalman filter library" - "robot_localization - ROS2 EKF/UKF package" - "GTSAM - Factor graph optimization" - "Eigen - C++ linear algebra" - "Ceres Solver - Nonlinear least squares"
frameworks: - "ORB-SLAM3 - Visual-Inertial SLAM" - "VINS-Mono - Monocular VIO" - "OpenVINS - Research VIO platform" - "Kalibr - IMU/camera calibration"
tools: - "Allan variance tools - IMU noise characterization" - "PlotJuggler - Real-time plotting" - "rosbag2 - Data recording for offline tuning"