ROS2 Development Skill
When to Use This Skill
- Building ROS2 packages, nodes, or component containers
- Setting up colcon workspaces, ament_cmake, or ament_python packages
- Writing CMakeLists.txt, package.xml, or setup.py for ROS2
- Defining custom messages, services, or actions
- Writing Python launch files with conditional logic
- Configuring DDS middleware and QoS profiles
- Implementing lifecycle (managed) nodes
- Working with Nav2, MoveIt2, or other ROS2 frameworks
- Debugging DDS discovery, QoS mismatches, or build failures
- Deploying ROS2 to production or embedded systems (micro-ROS)
- Setting up CI/CD for ROS2 packages
Core Architecture
1. Node Design Patterns
Basic Node (rclpy):
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String
class PerceptionNode(Node):
def __init__(self):
super().__init__('perception_node')
# 1. Declare parameters with types and descriptions
self.declare_parameter('rate_hz', 30.0,
descriptor=ParameterDescriptor(
description='Processing rate in Hz',
floating_point_range=[FloatingPointRange(
from_value=1.0, to_value=120.0, step=0.0
)]
))
self.declare_parameter('confidence_threshold', 0.7)
self.declare_parameter('frame_id', 'camera_link')
# 2. Read parameters
rate_hz = self.get_parameter('rate_hz').value
self.threshold = self.get_parameter('confidence_threshold').value
self.frame_id = self.get_parameter('frame_id').value
# 3. Set up QoS profiles
sensor_qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1
)
reliable_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10
)
# 4. Publishers first, then subscribers
self.det_pub = self.create_publisher(
DetectionArray, 'detections', reliable_qos)
self.image_sub = self.create_subscription(
Image, 'camera/image_raw', self.image_callback, sensor_qos)
# 5. Timers for periodic work
self.timer = self.create_timer(1.0 / rate_hz, self.timer_callback)
# 6. Parameter change callback
self.add_on_set_parameters_callback(self.param_callback)
self.get_logger().info(
f'Perception node started at {rate_hz}Hz, '
f'threshold={self.threshold}')
def param_callback(self, params):
"""Handle runtime parameter changes (replaces dynamic_reconfigure)"""
for param in params:
if param.name == 'confidence_threshold':
self.threshold = param.value
self.get_logger().info(f'Threshold updated to {param.value}')
return SetParametersResult(successful=True)
def image_callback(self, msg):
# Process incoming images
pass
def timer_callback(self):
# Periodic work
pass
def main(args=None):
rclpy.init(args=args)
node = PerceptionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Basic Node (rclcpp):
#include <rclcpp/rclcpp.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <vision_msgs/msg/detection2_d.hpp>
#include <memory>
class PerceptionNode : public rclcpp::Node {
public:
PerceptionNode() : Node("perception_node") {
// Declare and get parameters
this->declare_parameter("rate_hz", 30.0);
this->declare_parameter("confidence_threshold", 0.7);
double rate_hz = this->get_parameter("rate_hz").as_double();
// QoS
auto sensor_qos = rclcpp::SensorDataQoS();
auto reliable_qos = rclcpp::QoS(10).reliable();
// Publishers and subscribers
det_pub_ = this->create_publisher<vision_msgs::msg::Detection2D>("detections", reliable_qos);
image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
"camera/image_raw", sensor_qos, [this](const std::shared_ptr<const sensor_msgs::msg::Image>& msg){
this->image_callback(msg);
});
timer_ = this->create_wall_timer(
std::chrono::milliseconds(static_cast<int>(1000.0 / rate_hz)),
[this](){ this->timer_callback(); });
RCLCPP_INFO(this->get_logger(), "Perception node started at %.1fHz", rate_hz);
}
private:
void image_callback(const std::shared_ptr<const sensor_msgs::msg::Image>& msg) {
// Use shared_ptr for zero-copy potential
}
void timer_callback() {}
rclcpp::Publisher<vision_msgs::msg::Detection2D>::SharedPtr det_pub_;
rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
rclcpp::TimerBase::SharedPtr timer_;
};
int main(int argc, char** argv) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<PerceptionNode>());
rclcpp::shutdown();
return 0;
}
2. Lifecycle (Managed) Nodes
Use lifecycle nodes for production systems where you need deterministic startup, shutdown, and error recovery. This is one of ROS2's most important features over ROS1.
State Machine: Unconfigured → Inactive → Active → Finalized
from rclpy.lifecycle import Node as LifecycleNode, TransitionCallbackReturn
class ManagedPerception(LifecycleNode):
def __init__(self):
super().__init__('managed_perception')
self.get_logger().info('Node created (unconfigured)')
def on_configure(self, state) -> TransitionCallbackReturn:
"""Load params, allocate memory, set up pubs/subs (but don't activate)"""
self.declare_parameter('model_path', '')
model_path = self.get_parameter('model_path').value
try:
self.model = load_model(model_path)
self.det_pub = self.create_lifecycle_publisher(
DetectionArray, 'detections', 10)
self.get_logger().info(f'Configured with model: {model_path}')
return TransitionCallbackReturn.SUCCESS
except Exception as e:
self.get_logger().error(f'Configuration failed: {e}')
return TransitionCallbackReturn.FAILURE
def on_activate(self, state) -> TransitionCallbackReturn:
"""Start processing — subscriptions go live here"""
self.image_sub = self.create_subscription(
Image, 'camera/image_raw', self.image_callback, 1)
self.get_logger().info('Activated — processing images')
return TransitionCallbackReturn.SUCCESS
def on_deactivate(self, state) -> TransitionCallbackReturn:
"""Pause processing — safe to reconfigure after this"""
self.destroy_subscription(self.image_sub)
self.get_logger().info('Deactivated — stopped processing')
return TransitionCallbackReturn.SUCCESS
def on_cleanup(self, state) -> TransitionCallbackReturn:
"""Release resources, return to unconfigured"""
del self.model
self.get_logger().info('Cleaned up')
return TransitionCallbackReturn.SUCCESS
def on_shutdown(self, state) -> TransitionCallbackReturn:
"""Final cleanup before destruction"""
self.get_logger().info('Shutting down')
return TransitionCallbackReturn.SUCCESS
def on_error(self, state) -> TransitionCallbackReturn:
"""Handle errors — try to recover or fail gracefully"""
self.get_logger().error(f'Error in state {state.label}')
return TransitionCallbackReturn.SUCCESS # Transition to unconfigured
Orchestrating Lifecycle Nodes with a launch file:
from launch import LaunchDescription
from launch_ros.actions import LifecycleNode
from launch_ros.eve