git clone https://github.com/vibeforge1111/vibeship-spawner-skills
hardware/ros2-robotics/skill.yamlROS2 Robotics Skill
Robot Operating System 2 development patterns
id: ros2-robotics name: ROS2 Robotics category: hardware complexity: advanced requires_skills:
- devops
- test-architect
description: | Patterns for developing robotics applications with ROS2 (Robot Operating System 2). Covers nodes, topics, services, actions, launch files, lifecycle management, real-time considerations, and common pitfalls.
patterns:
node_structure: name: ROS2 Node Architecture description: Proper node structure with lifecycle management when: "Creating a new ROS2 node" pattern: | #!/usr/bin/env python3 import rclpy from rclpy.node import Node from rclpy.lifecycle import LifecycleNode, State, TransitionCallbackReturn from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy from std_msgs.msg import String from sensor_msgs.msg import LaserScan from geometry_msgs.msg import Twist from typing import Optional import threading
class RobotController(LifecycleNode): """ Lifecycle-managed robot controller node. Lifecycle states: unconfigured -> inactive -> active -> finalized Transitions: configure, activate, deactivate, cleanup, shutdown Use lifecycle nodes for: - Controlled startup/shutdown sequences - Resource management - Coordinated system bringup """ def __init__(self): super().__init__('robot_controller') # Declare parameters with defaults self.declare_parameter('max_speed', 1.0) self.declare_parameter('sensor_topic', '/scan') self.declare_parameter('cmd_topic', '/cmd_vel') # These will be created in on_configure self._laser_sub = None self._cmd_pub = None self._timer = None self.get_logger().info('Node created (unconfigured)') def on_configure(self, state: State) -> TransitionCallbackReturn: """Configure node resources.""" try: # Get parameters self._max_speed = self.get_parameter('max_speed').value sensor_topic = self.get_parameter('sensor_topic').value cmd_topic = self.get_parameter('cmd_topic').value # Create QoS profile for sensors sensor_qos = QoSProfile( reliability=ReliabilityPolicy.BEST_EFFORT, history=HistoryPolicy.KEEP_LAST, depth=10 ) # Create subscribers and publishers self._laser_sub = self.create_subscription( LaserScan, sensor_topic, self._laser_callback, sensor_qos ) self._cmd_pub = self.create_publisher( Twist, cmd_topic, 10 ) self.get_logger().info('Configured successfully') return TransitionCallbackReturn.SUCCESS except Exception as e: self.get_logger().error(f'Configuration failed: {e}') return TransitionCallbackReturn.FAILURE def on_activate(self, state: State) -> TransitionCallbackReturn: """Activate node - start processing.""" self._timer = self.create_timer(0.1, self._control_loop) self.get_logger().info('Activated') return TransitionCallbackReturn.SUCCESS def on_deactivate(self, state: State) -> TransitionCallbackReturn: """Deactivate node - stop processing.""" if self._timer: self._timer.cancel() self._timer = None # Stop robot self._publish_stop() self.get_logger().info('Deactivated') return TransitionCallbackReturn.SUCCESS def on_cleanup(self, state: State) -> TransitionCallbackReturn: """Clean up resources.""" self._laser_sub = None self._cmd_pub = None self.get_logger().info('Cleaned up') return TransitionCallbackReturn.SUCCESS def on_shutdown(self, state: State) -> TransitionCallbackReturn: """Final shutdown.""" self._publish_stop() self.get_logger().info('Shutting down') return TransitionCallbackReturn.SUCCESS def _laser_callback(self, msg: LaserScan): """Process laser scan data.""" self._latest_scan = msg def _control_loop(self): """Main control loop - called by timer.""" # Implement control logic pass def _publish_stop(self): """Publish zero velocity to stop robot.""" if self._cmd_pub: stop_cmd = Twist() self._cmd_pub.publish(stop_cmd) def main(args=None): rclpy.init(args=args) node = RobotController() # Use multi-threaded executor for callbacks executor = rclpy.executors.MultiThreadedExecutor() executor.add_node(node) try: executor.spin() except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() why: "Lifecycle nodes provide controlled startup and resource management"
topic_service_action: name: Communication Patterns description: Topics, services, and actions for different use cases pattern: | import rclpy from rclpy.node import Node from rclpy.action import ActionServer, ActionClient, GoalResponse, CancelResponse from rclpy.action.server import ServerGoalHandle from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup
from std_srvs.srv import SetBool from nav2_msgs.action import NavigateToPose """ WHEN TO USE EACH: TOPICS (Publisher/Subscriber): - Continuous data streams - Sensor data, state, transforms - No response needed - Examples: /cmd_vel, /odom, /scan SERVICES (Request/Response): - Quick, one-shot operations - Configuration changes - Queries - Examples: /set_mode, /get_state ACTIONS (Long-running with feedback): - Tasks that take time - Preemptable operations - Need progress feedback - Examples: /navigate_to_pose, /follow_path """ class CommunicationPatterns(Node): def __init__(self): super().__init__('comm_patterns') # Callback groups for parallel execution self._sensor_group = MutuallyExclusiveCallbackGroup() self._service_group = ReentrantCallbackGroup() self._action_group = ReentrantCallbackGroup() # Service server self._enable_service = self.create_service( SetBool, 'enable_robot', self._enable_callback, callback_group=self._service_group ) # Action server self._nav_action = ActionServer( self, NavigateToPose, 'navigate_to_pose', execute_callback=self._navigate_execute, goal_callback=self._navigate_goal_callback, cancel_callback=self._navigate_cancel_callback, callback_group=self._action_group ) # Action client self._nav_client = ActionClient( self, NavigateToPose, 'navigate_to_pose' ) def _enable_callback(self, request, response): """Service callback - quick operation.""" if request.data: self._enabled = True response.success = True response.message = 'Robot enabled' else: self._enabled = False response.success = True response.message = 'Robot disabled' return response async def _navigate_execute(self, goal_handle: ServerGoalHandle): """Action execute callback - long-running task.""" self.get_logger().info('Executing navigation goal') feedback = NavigateToPose.Feedback() while not self._goal_reached(): # Check if canceled if goal_handle.is_cancel_requested: goal_handle.canceled() return NavigateToPose.Result() # Update feedback feedback.current_pose = self._get_current_pose() feedback.distance_remaining = self._distance_to_goal() goal_handle.publish_feedback(feedback) await asyncio.sleep(0.1) goal_handle.succeed() result = NavigateToPose.Result() return result def _navigate_goal_callback(self, goal_request): """Accept or reject goal.""" if self._is_valid_goal(goal_request): return GoalResponse.ACCEPT return GoalResponse.REJECT def _navigate_cancel_callback(self, goal_handle): """Accept or reject cancel request.""" return CancelResponse.ACCEPT why: "Choosing the right communication pattern prevents design problems"
launch_system: name: Launch File Best Practices description: Python launch files for system bringup pattern: | # launch/robot_bringup.launch.py from launch import LaunchDescription from launch.actions import ( DeclareLaunchArgument, IncludeLaunchDescription, GroupAction, TimerAction, OpaqueFunction, LogInfo ) from launch.launch_description_sources import PythonLaunchDescriptionSource from launch.substitutions import ( LaunchConfiguration, PathJoinSubstitution, PythonExpression ) from launch.conditions import IfCondition, UnlessCondition from launch_ros.actions import Node, LifecycleNode, PushRosNamespace from launch_ros.substitutions import FindPackageShare from ament_index_python.packages import get_package_share_directory import os
def generate_launch_description(): # Declare arguments use_sim = DeclareLaunchArgument( 'use_sim', default_value='false', description='Use simulation instead of hardware' ) robot_name = DeclareLaunchArgument( 'robot_name', default_value='robot1', description='Unique robot name for namespacing' ) config_file = DeclareLaunchArgument( 'config_file', default_value=PathJoinSubstitution([ FindPackageShare('my_robot'), 'config', 'params.yaml' ]), description='Path to configuration file' ) # Get package paths pkg_share = get_package_share_directory('my_robot') # Define nodes robot_state_publisher = Node( package='robot_state_publisher', executable='robot_state_publisher', name='robot_state_publisher', namespace=LaunchConfiguration('robot_name'), parameters=[{ 'robot_description': open( os.path.join(pkg_share, 'urdf', 'robot.urdf') ).read() }], output='screen' ) # Lifecycle node with automatic configuration controller_node = LifecycleNode( package='my_robot', executable='controller_node', name='controller', namespace=LaunchConfiguration('robot_name'), parameters=[LaunchConfiguration('config_file')], output='screen' ) # Conditional node (only in simulation) sim_clock = Node( package='gazebo_ros', executable='gazebo', condition=IfCondition(LaunchConfiguration('use_sim')), output='screen' ) # Hardware driver (only on real robot) hardware_driver = Node( package='my_robot_driver', executable='driver_node', condition=UnlessCondition(LaunchConfiguration('use_sim')), parameters=[LaunchConfiguration('config_file')], output='screen' ) # Delayed start (wait for other nodes) delayed_planner = TimerAction( period=5.0, # Wait 5 seconds actions=[ Node( package='nav2_planner', executable='planner_server', name='planner_server', namespace=LaunchConfiguration('robot_name'), parameters=[LaunchConfiguration('config_file')], output='screen' ) ] ) # Include another launch file sensors_launch = IncludeLaunchDescription( PythonLaunchDescriptionSource([ PathJoinSubstitution([ FindPackageShare('my_robot'), 'launch', 'sensors.launch.py' ]) ]), launch_arguments={ 'robot_name': LaunchConfiguration('robot_name') }.items() ) return LaunchDescription([ use_sim, robot_name, config_file, robot_state_publisher, controller_node, sim_clock, hardware_driver, delayed_planner, sensors_launch ]) why: "Proper launch files enable modular, configurable robot bringup"
qos_configuration: name: QoS Configuration description: Quality of Service settings for reliability critical: true pattern: | from rclpy.qos import ( QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy, QoSLivelinessPolicy ) from rclpy.duration import Duration
""" QoS PROFILES FOR DIFFERENT USE CASES: SENSOR DATA: - Best effort reliability (drop if slow) - Keep last N messages - Volatile durability COMMANDS: - Reliable delivery (retry) - Keep last 1 - Volatile durability PARAMETERS/CONFIG: - Reliable delivery - Transient local (late subscribers get last) - Keep last 1 """ # Sensor data (high frequency, tolerates drops) SENSOR_QOS = QoSProfile( reliability=QoSReliabilityPolicy.BEST_EFFORT, history=QoSHistoryPolicy.KEEP_LAST, depth=5, durability=QoSDurabilityPolicy.VOLATILE ) # Commands (must arrive) COMMAND_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.VOLATILE ) # Parameters/state (late subscribers need last value) PARAMETER_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL ) # Map data (large, infrequent) MAP_QOS = QoSProfile( reliability=QoSReliabilityPolicy.RELIABLE, history=QoSHistoryPolicy.KEEP_LAST, depth=1, durability=QoSDurabilityPolicy.TRANSIENT_LOCAL ) # CRITICAL: QoS must match between publisher and subscriber # Mismatched QoS = silent connection failure! def check_qos_compatibility(pub_qos, sub_qos): """Check if publisher and subscriber QoS are compatible.""" # Reliability: publisher must be >= subscriber if (sub_qos.reliability == QoSReliabilityPolicy.RELIABLE and pub_qos.reliability == QoSReliabilityPolicy.BEST_EFFORT): return False, "Reliability mismatch" # Durability: publisher must be >= subscriber if (sub_qos.durability == QoSDurabilityPolicy.TRANSIENT_LOCAL and pub_qos.durability == QoSDurabilityPolicy.VOLATILE): return False, "Durability mismatch" return True, "Compatible" why: "QoS mismatches cause silent communication failures"
transforms: name: TF2 Transform Management description: Coordinate frame transforms pattern: | import rclpy from rclpy.node import Node from tf2_ros import TransformBroadcaster, StaticTransformBroadcaster from tf2_ros import TransformListener, Buffer from tf2_ros import TransformException from geometry_msgs.msg import TransformStamped import tf_transformations import numpy as np
class TransformManager(Node): """ TF2 Transform Best Practices: 1. Use standard frame names (base_link, odom, map) 2. Static transforms: Use static broadcaster 3. Dynamic transforms: Publish at sensor rate 4. Always check transform availability before use 5. Use lookup_transform with timeout """ def __init__(self): super().__init__('transform_manager') # Static broadcaster for fixed transforms self._static_broadcaster = StaticTransformBroadcaster(self) # Dynamic broadcaster for moving frames self._broadcaster = TransformBroadcaster(self) # Transform listener with buffer self._tf_buffer = Buffer() self._tf_listener = TransformListener(self._tf_buffer, self) # Publish static transforms once self._publish_static_transforms() def _publish_static_transforms(self): """Publish transforms that never change.""" # Sensor mount offset t = TransformStamped() t.header.stamp = self.get_clock().now().to_msg() t.header.frame_id = 'base_link' t.child_frame_id = 'laser_link' t.transform.translation.x = 0.1 t.transform.translation.y = 0.0 t.transform.translation.z = 0.2 t.transform.rotation.w = 1.0 self._static_broadcaster.sendTransform(t) def publish_odom_transform(self, x, y, theta): """Publish odometry transform.""" t = TransformStamped() t.header.stamp = self.get_clock().now().to_msg() t.header.frame_id = 'odom' t.child_frame_id = 'base_link' t.transform.translation.x = x t.transform.translation.y = y t.transform.translation.z = 0.0 q = tf_transformations.quaternion_from_euler(0, 0, theta) t.transform.rotation.x = q[0] t.transform.rotation.y = q[1] t.transform.rotation.z = q[2] t.transform.rotation.w = q[3] self._broadcaster.sendTransform(t) def lookup_transform_safe( self, target_frame: str, source_frame: str, timeout_sec: float = 1.0 ): """Safely look up transform with error handling.""" try: transform = self._tf_buffer.lookup_transform( target_frame, source_frame, rclpy.time.Time(), timeout=rclpy.duration.Duration(seconds=timeout_sec) ) return transform except TransformException as e: self.get_logger().warning( f'Could not get transform {source_frame} -> {target_frame}: {e}' ) return None def transform_point(self, point, source_frame, target_frame): """Transform a point between frames.""" from geometry_msgs.msg import PointStamped point_stamped = PointStamped() point_stamped.header.frame_id = source_frame point_stamped.header.stamp = self.get_clock().now().to_msg() point_stamped.point = point try: transformed = self._tf_buffer.transform( point_stamped, target_frame ) return transformed.point except TransformException as e: self.get_logger().error(f'Transform failed: {e}') return None why: "Proper TF management prevents coordinate frame errors"
anti_patterns:
topic_name_typo: name: Typo in Topic Names problem: "Mistyped topic name causes silent failure - no error" solution: "Use constants, remapping, and check with ros2 topic list"
blocking_callbacks: name: Blocking in Callbacks problem: "Long operations in callback block executor" solution: "Use async callbacks or separate threads"
qos_mismatch: name: QoS Mismatch Between Publisher and Subscriber problem: "Incompatible QoS causes no connection, no error" solution: "Check compatibility, use ros2 topic info --verbose"
handoffs:
-
to: sensor-fusion when: "Need to combine multiple sensor inputs" pass: "Sensor topics, frame IDs"
-
to: control-systems when: "Need control algorithm implementation" pass: "State feedback, actuation interface"
ecosystem: ros2_tools: - "ros2 cli - Command line interface" - "rviz2 - Visualization" - "rqt - GUI tools" - "rosbag2 - Data recording"
navigation: - "Nav2 - Navigation stack" - "SLAM Toolbox - Mapping" - "robot_localization - Sensor fusion"
simulation: - "Gazebo - Physics simulation" - "Isaac Sim - NVIDIA simulation" - "Webots - Cross-platform sim"
frameworks: - "MoveIt2 - Motion planning" - "micro-ROS - Microcontroller ROS" - "ROS2 Control - Hardware abstraction"