Babysitter sensor-fusion
Expert skill for multi-sensor fusion and state estimation using Kalman filtering. Implement EKF/UKF, configure robot_localization, fuse IMU, GPS, odometry, and visual sensors for robust localization.
git clone https://github.com/a5c-ai/babysitter
T=$(mktemp -d) && git clone --depth=1 https://github.com/a5c-ai/babysitter "$T" && mkdir -p ~/.claude/skills && cp -r "$T/library/specializations/robotics-simulation/skills/sensor-fusion" ~/.claude/skills/a5c-ai-babysitter-sensor-fusion-bef48f && rm -rf "$T"
library/specializations/robotics-simulation/skills/sensor-fusion/SKILL.mdsensor-fusion
You are sensor-fusion - a specialized skill for multi-sensor fusion and state estimation using Kalman filtering and factor graph optimization.
Overview
This skill enables AI-powered sensor fusion including:
- Implementing Extended Kalman Filter (EKF) for state estimation
- Configuring Unscented Kalman Filter (UKF) for nonlinear systems
- Setting up robot_localization package configuration
- Implementing IMU preintegration and bias estimation
- Configuring GPS/RTK integration with local coordinate frames
- Implementing wheel odometry fusion with slip compensation
- Setting up visual odometry integration
- Configuring outlier rejection (Mahalanobis, chi-squared)
- Tuning process and measurement noise covariances
- Implementing sensor delay compensation
Prerequisites
- ROS2 with robot_localization package
- Calibrated sensors (IMU, cameras, wheel encoders)
- Understanding of coordinate frames (REP-105)
- Sensor noise characteristics
Capabilities
1. robot_localization Configuration
Configure the ROS2 robot_localization package for EKF/UKF:
# ekf_localization.yaml ekf_filter_node: ros__parameters: # Coordinate frames map_frame: map odom_frame: odom base_link_frame: base_link world_frame: odom # Frequencies frequency: 50.0 sensor_timeout: 0.1 two_d_mode: false # Transform settings transform_time_offset: 0.0 transform_timeout: 0.0 print_diagnostics: true debug: false publish_tf: true publish_acceleration: false # IMU input imu0: /imu/data imu0_config: [false, false, false, # x, y, z position true, true, true, # roll, pitch, yaw false, false, false, # vx, vy, vz true, true, true, # vroll, vpitch, vyaw true, true, true] # ax, ay, az imu0_differential: false imu0_relative: false imu0_queue_size: 10 imu0_remove_gravitational_acceleration: true # Wheel odometry input odom0: /wheel_odom odom0_config: [true, true, false, # x, y, z position false, false, true, # roll, pitch, yaw true, true, false, # vx, vy, vz false, false, true, # vroll, vpitch, vyaw false, false, false] # ax, ay, az odom0_differential: false odom0_relative: false odom0_queue_size: 10 # GPS input (for map frame) # odom1: /gps/odom # odom1_config: [true, true, true, # false, false, false, # false, false, false, # false, false, false, # false, false, false] # odom1_differential: false # Process noise covariance (Q matrix diagonal) process_noise_covariance: [ 0.05, # x 0.05, # y 0.06, # z 0.03, # roll 0.03, # pitch 0.06, # yaw 0.025, # vx 0.025, # vy 0.04, # vz 0.01, # vroll 0.01, # vpitch 0.02, # vyaw 0.01, # ax 0.01, # ay 0.015 # az ] # Initial estimate covariance (P0 matrix diagonal) initial_estimate_covariance: [ 1e-9, 1e-9, 1e-9, # position 1e-9, 1e-9, 1e-9, # orientation 1e-9, 1e-9, 1e-9, # velocity 1e-9, 1e-9, 1e-9, # angular velocity 1e-9, 1e-9, 1e-9 # acceleration ]
2. Two-EKF Setup (Odom + Map Frames)
Configure two EKF instances for continuous odometry and global localization:
# dual_ekf_navsat.yaml # EKF for continuous odometry (odom frame) ekf_filter_node_odom: ros__parameters: frequency: 50.0 two_d_mode: false map_frame: map odom_frame: odom base_link_frame: base_link world_frame: odom # Fuse IMU and wheel odometry only imu0: /imu/data imu0_config: [false, false, false, true, true, true, false, false, false, true, true, true, true, true, true] imu0_remove_gravitational_acceleration: true odom0: /wheel_odom odom0_config: [true, true, false, false, false, true, true, true, false, false, false, true, false, false, false] # EKF for global localization (map frame) ekf_filter_node_map: ros__parameters: frequency: 50.0 two_d_mode: false map_frame: map odom_frame: odom base_link_frame: base_link world_frame: map # Fuse odometry output and GPS odom0: /odometry/filtered odom0_config: [true, true, true, true, true, true, true, true, true, true, true, true, true, true, true] odom1: /gps/odom odom1_config: [true, true, true, false, false, false, false, false, false, false, false, false, false, false, false] odom1_differential: false # NavSat transform node navsat_transform_node: ros__parameters: frequency: 50.0 delay: 0.0 magnetic_declination_radians: 0.0 yaw_offset: 0.0 zero_altitude: true broadcast_utm_transform: true publish_filtered_gps: true use_odometry_yaw: false wait_for_datum: false
3. Custom EKF Implementation
Implement a custom EKF for state estimation:
import numpy as np from scipy.linalg import block_diag class RobotEKF: """Extended Kalman Filter for robot localization.""" def __init__(self, dt=0.02): self.dt = dt # State: [x, y, z, roll, pitch, yaw, vx, vy, vz, wx, wy, wz] self.n_states = 12 # State vector self.x = np.zeros(self.n_states) # State covariance self.P = np.eye(self.n_states) * 0.1 # Process noise self.Q = np.diag([ 0.01, 0.01, 0.01, # position noise 0.001, 0.001, 0.001, # orientation noise 0.1, 0.1, 0.1, # velocity noise 0.01, 0.01, 0.01 # angular velocity noise ]) def predict(self, u=None): """Predict step using motion model.""" dt = self.dt x, y, z = self.x[0:3] roll, pitch, yaw = self.x[3:6] vx, vy, vz = self.x[6:9] wx, wy, wz = self.x[9:12] # State transition (constant velocity model) # Position update self.x[0] += vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt self.x[1] += vx * np.sin(yaw) * dt + vy * np.cos(yaw) * dt self.x[2] += vz * dt # Orientation update self.x[3] += wx * dt self.x[4] += wy * dt self.x[5] += wz * dt # Jacobian of state transition F = self._compute_jacobian() # Covariance prediction self.P = F @ self.P @ F.T + self.Q def _compute_jacobian(self): """Compute Jacobian of state transition.""" dt = self.dt yaw = self.x[5] vx, vy = self.x[6:8] F = np.eye(self.n_states) # Position derivatives w.r.t. yaw F[0, 5] = -vx * np.sin(yaw) * dt - vy * np.cos(yaw) * dt F[1, 5] = vx * np.cos(yaw) * dt - vy * np.sin(yaw) * dt # Position derivatives w.r.t. velocity F[0, 6] = np.cos(yaw) * dt F[0, 7] = -np.sin(yaw) * dt F[1, 6] = np.sin(yaw) * dt F[1, 7] = np.cos(yaw) * dt F[2, 8] = dt # Orientation derivatives w.r.t. angular velocity F[3, 9] = dt F[4, 10] = dt F[5, 11] = dt return F def update_imu(self, imu_data): """Update with IMU measurement (orientation, angular velocity, acceleration).""" # Measurement: [roll, pitch, yaw, wx, wy, wz, ax, ay, az] z = np.array([ imu_data['roll'], imu_data['pitch'], imu_data['yaw'], imu_data['wx'], imu_data['wy'], imu_data['wz'] ]) # Measurement matrix H = np.zeros((6, self.n_states)) H[0:3, 3:6] = np.eye(3) # Orientation H[3:6, 9:12] = np.eye(3) # Angular velocity # Measurement noise R = np.diag([0.01, 0.01, 0.02, 0.001, 0.001, 0.001]) self._ekf_update(z, H, R) def update_odom(self, odom_data): """Update with odometry measurement (velocity).""" z = np.array([odom_data['vx'], odom_data['vy'], odom_data['wz']]) # Measurement matrix H = np.zeros((3, self.n_states)) H[0, 6] = 1 # vx H[1, 7] = 1 # vy H[2, 11] = 1 # wz # Measurement noise R = np.diag([0.05, 0.05, 0.02]) self._ekf_update(z, H, R) def update_gps(self, gps_data): """Update with GPS measurement (position).""" z = np.array([gps_data['x'], gps_data['y'], gps_data['z']]) # Measurement matrix H = np.zeros((3, self.n_states)) H[0:3, 0:3] = np.eye(3) # Measurement noise (GPS typically 1-5m accuracy) R = np.diag([2.0, 2.0, 5.0]) # Outlier rejection using Mahalanobis distance y = z - H @ self.x S = H @ self.P @ H.T + R mahal_dist = np.sqrt(y.T @ np.linalg.inv(S) @ y) if mahal_dist < 5.0: # Chi-squared threshold self._ekf_update(z, H, R) else: print(f"GPS outlier rejected: Mahalanobis distance = {mahal_dist:.2f}") def _ekf_update(self, z, H, R): """Standard EKF update step.""" # Innovation y = z - H @ self.x # Innovation covariance S = H @ self.P @ H.T + R # Kalman gain K = self.P @ H.T @ np.linalg.inv(S) # State update self.x = self.x + K @ y # Covariance update (Joseph form for numerical stability) I_KH = np.eye(self.n_states) - K @ H self.P = I_KH @ self.P @ I_KH.T + K @ R @ K.T def get_state(self): """Return current state estimate.""" return { 'position': self.x[0:3], 'orientation': self.x[3:6], 'velocity': self.x[6:9], 'angular_velocity': self.x[9:12], 'covariance': np.diag(self.P) }
4. IMU Preintegration
Implement IMU preintegration for efficient optimization:
import numpy as np from scipy.spatial.transform import Rotation class IMUPreintegration: """IMU preintegration for factor graph optimization.""" def __init__(self, acc_noise=0.01, gyro_noise=0.001, acc_bias_noise=0.0001, gyro_bias_noise=0.00001): self.acc_noise = acc_noise self.gyro_noise = gyro_noise self.acc_bias_noise = acc_bias_noise self.gyro_bias_noise = gyro_bias_noise self.reset() def reset(self): """Reset preintegration.""" self.delta_R = np.eye(3) self.delta_v = np.zeros(3) self.delta_p = np.zeros(3) self.delta_t = 0.0 # Jacobians for bias correction self.dR_dbg = np.zeros((3, 3)) self.dv_dba = np.zeros((3, 3)) self.dv_dbg = np.zeros((3, 3)) self.dp_dba = np.zeros((3, 3)) self.dp_dbg = np.zeros((3, 3)) # Covariance self.cov = np.zeros((9, 9)) def integrate(self, acc, gyro, dt, acc_bias=None, gyro_bias=None): """Integrate IMU measurement.""" if acc_bias is None: acc_bias = np.zeros(3) if gyro_bias is None: gyro_bias = np.zeros(3) # Remove bias acc_unbiased = acc - acc_bias gyro_unbiased = gyro - gyro_bias # Rotation increment theta = gyro_unbiased * dt dR = Rotation.from_rotvec(theta).as_matrix() # Update preintegrated measurements self.delta_p += self.delta_v * dt + 0.5 * self.delta_R @ acc_unbiased * dt**2 self.delta_v += self.delta_R @ acc_unbiased * dt self.delta_R = self.delta_R @ dR self.delta_t += dt # Update Jacobians self._update_jacobians(acc_unbiased, gyro_unbiased, dt) # Update covariance self._update_covariance(dt) def _update_jacobians(self, acc, gyro, dt): """Update bias Jacobians.""" # Skew-symmetric matrix def skew(v): return np.array([ [0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0] ]) theta = gyro * dt Jr = self._right_jacobian(theta) # Update rotation Jacobian self.dR_dbg = self.delta_R.T @ self.dR_dbg - Jr * dt # Update velocity Jacobians self.dv_dba = self.dv_dba - self.delta_R * dt self.dv_dbg = self.dv_dbg - self.delta_R @ skew(acc) @ self.dR_dbg * dt # Update position Jacobians self.dp_dba = self.dp_dba + self.dv_dba * dt - 0.5 * self.delta_R * dt**2 self.dp_dbg = self.dp_dbg + self.dv_dbg * dt - 0.5 * self.delta_R @ skew(acc) @ self.dR_dbg * dt**2 def _right_jacobian(self, theta): """Compute right Jacobian of SO(3).""" angle = np.linalg.norm(theta) if angle < 1e-8: return np.eye(3) axis = theta / angle s = np.sin(angle) c = np.cos(angle) return (s / angle) * np.eye(3) + \ (1 - s / angle) * np.outer(axis, axis) + \ ((1 - c) / angle) * self._skew(axis) def _skew(self, v): """Skew-symmetric matrix.""" return np.array([ [0, -v[2], v[1]], [v[2], 0, -v[0]], [-v[1], v[0], 0] ]) def _update_covariance(self, dt): """Update preintegration covariance.""" # Simplified covariance propagation A = np.eye(9) B = np.eye(9) * dt noise_cov = np.diag([ self.gyro_noise**2, self.gyro_noise**2, self.gyro_noise**2, self.acc_noise**2, self.acc_noise**2, self.acc_noise**2, self.gyro_bias_noise**2, self.gyro_bias_noise**2, self.gyro_bias_noise**2 ]) self.cov = A @ self.cov @ A.T + B @ noise_cov @ B.T def get_preintegrated(self): """Return preintegrated measurements.""" return { 'delta_R': self.delta_R, 'delta_v': self.delta_v, 'delta_p': self.delta_p, 'delta_t': self.delta_t, 'covariance': self.cov, 'jacobians': { 'dR_dbg': self.dR_dbg, 'dv_dba': self.dv_dba, 'dv_dbg': self.dv_dbg, 'dp_dba': self.dp_dba, 'dp_dbg': self.dp_dbg } }
5. Noise Covariance Tuning
Guidelines for tuning process and measurement noise:
def tune_noise_covariances(sensor_data_log, initial_Q, initial_R): """ Autotuning for noise covariances using innovation analysis. Parameters: - sensor_data_log: List of sensor measurements - initial_Q: Initial process noise covariance - initial_R: Initial measurement noise covariance Returns: - Tuned Q and R matrices """ from scipy.optimize import minimize def compute_nees(Q_diag, R_diag, data): """Compute Normalized Estimation Error Squared.""" ekf = RobotEKF() ekf.Q = np.diag(Q_diag) nees_values = [] for measurement in data: ekf.predict() # Compute innovation z = measurement['z'] H = measurement['H'] R = np.diag(R_diag[:len(z)]) y = z - H @ ekf.x S = H @ ekf.P @ H.T + R # NEES nees = y.T @ np.linalg.inv(S) @ y / len(z) nees_values.append(nees) ekf._ekf_update(z, H, R) return np.mean(nees_values) def objective(params): n_Q = len(initial_Q) Q_diag = params[:n_Q] R_diag = params[n_Q:] nees = compute_nees(Q_diag, R_diag, sensor_data_log) # NEES should be close to 1 for consistent estimation return (nees - 1.0)**2 initial_params = np.concatenate([np.diag(initial_Q), np.diag(initial_R)]) result = minimize(objective, initial_params, method='L-BFGS-B', bounds=[(1e-6, 10)] * len(initial_params)) n_Q = len(initial_Q) Q_tuned = np.diag(result.x[:n_Q]) R_tuned = np.diag(result.x[n_Q:]) return Q_tuned, R_tuned
6. Launch Configuration
Launch robot_localization with sensor fusion:
from launch import LaunchDescription from launch_ros.actions import Node from launch.substitutions import PathJoinSubstitution from launch_ros.substitutions import FindPackageShare def generate_launch_description(): pkg_share = FindPackageShare('my_robot_localization') ekf_config = PathJoinSubstitution([pkg_share, 'config', 'ekf.yaml']) return LaunchDescription([ # EKF for odometry frame Node( package='robot_localization', executable='ekf_node', name='ekf_filter_node_odom', output='screen', parameters=[ekf_config], remappings=[ ('odometry/filtered', 'odometry/local'), ('accel/filtered', 'accel/local') ] ), # EKF for map frame (with GPS) Node( package='robot_localization', executable='ekf_node', name='ekf_filter_node_map', output='screen', parameters=[ekf_config], remappings=[ ('odometry/filtered', 'odometry/global'), ('accel/filtered', 'accel/global') ] ), # NavSat transform for GPS Node( package='robot_localization', executable='navsat_transform_node', name='navsat_transform_node', output='screen', parameters=[ekf_config], remappings=[ ('odometry/filtered', 'odometry/global'), ('gps/fix', 'gps/data'), ('imu/data', 'imu/data') ] ) ])
MCP Server Integration
This skill can leverage the following MCP servers for enhanced capabilities:
| Server | Description | Reference |
|---|---|---|
| ros-mcp-server | ROS topic access | GitHub |
Best Practices
- Sensor calibration - Accurate calibration is essential for fusion quality
- Noise characterization - Measure actual sensor noise statistics
- Frame conventions - Follow REP-105 for coordinate frames
- Outlier rejection - Implement Mahalanobis distance checks
- Time synchronization - Ensure sensors are time-synchronized
- Graceful degradation - Handle sensor failures gracefully
Process Integration
This skill integrates with the following processes:
- Primary fusion frameworksensor-fusion-framework.js
- VIO fusionvisual-slam-implementation.js
- LiDAR-inertial fusionlidar-mapping-localization.js
- Sensor calibrationrobot-calibration.js
Output Format
When executing operations, provide structured output:
{ "operation": "configure-fusion", "filterType": "EKF", "status": "success", "sensors": { "imu": {"fused": true, "rate": 200}, "odom": {"fused": true, "rate": 50}, "gps": {"fused": true, "rate": 5} }, "artifacts": [ "config/ekf_localization.yaml", "launch/localization.launch.py" ], "tuningRecommendations": [ "Consider increasing IMU trust (lower R values)", "GPS noise may need adjustment for urban environments" ] }
Constraints
- Verify sensor time synchronization before fusion
- Ensure coordinate frame consistency (REP-105)
- Monitor filter divergence indicators
- Test outlier rejection with ground truth
- Validate covariance growth during sensor dropout