How to interpret YAML parameters in ROS2 as a Python dictionary?
I’m working on a ROS2 project and need to access the following parameter as a dictionary:
my_node:
category_sector_mapping:
Plastic: 0
Paper: 1
Glass: 2
Aluminium: 2
Organic: 2
Undifferentiated: 2
Empty: -1
How can I retrieve this parameter within a ROS2 node as a Python dictionary? I want to create an attribute in my node that looks like this:
self.category_sector_mapping = {
"Plastic": 0,
"Paper": 1,
"Glass": 2,
"Aluminium": 2,
"Organic": 2,
"Undifferentiated": 2,
"Empty": -1
}
What is the proper way to load and parse YAML parameters in ROS2 nodes to achieve this structure?
You can interpret YAML parameters in ROS2 as a Python dictionary by using the pyYAML library to directly parse the YAML file, or by accessing parameters through ROS2’s parameter server API. The most direct approach is to use yaml.safe_load() to read your parameter file and extract the dictionary structure, then access it through the node’s parameter interface.
Contents
- Direct YAML Parsing with pyYAML
- Parameter Access in ROS2 Nodes
- Loading Parameters from Launch Files
- Best Practices and Considerations
- Complete Example Implementation
Direct YAML Parsing with pyYAML
The most straightforward method to interpret YAML parameters as a Python dictionary is using the pyYAML library. This approach gives you direct access to the parsed data structure.
First, install pyYAML if you haven’t already:
pip install pyyaml
Here’s how to parse your YAML file and extract the dictionary:
import yaml
import os
from ament_index_python.packages import get_package_share_directory
def load_yaml_dict(package_name, file_name):
"""Load a YAML file and return its content as a dictionary."""
yaml_file_path = os.path.join(
get_package_share_directory(package_name),
'config',
file_name
)
with open(yaml_file_path, 'r') as file:
try:
data = yaml.safe_load(file)
return data
except yaml.YAMLError as exc:
print(f"Error parsing YAML file: {exc}")
return None
# Usage in your node
category_mapping = load_yaml_dict('your_package_name', 'your_config.yaml')
self.category_sector_mapping = category_mapping.get('my_node', {}).get('category_sector_mapping', {})
This method gives you complete control over the parsing process and allows you to manipulate the data before using it in your node.
Parameter Access in ROS2 Nodes
When parameters are loaded through ROS2’s parameter system, you can access them using the node’s parameter interface. Here’s how to access your dictionary parameter:
from rclpy.node import Node
from rclpy.parameter import Parameter
class YourNode(Node):
def __init__(self):
super().__init__('your_node')
# Declare the parameter
self.declare_parameter('category_sector_mapping', {})
# Get the parameter as a dictionary
category_mapping = self.get_parameter('category_sector_mapping').value
# Store as instance attribute
self.category_sector_mapping = category_mapping
self.get_logger().info(f"Loaded category mapping: {self.category_sector_mapping}")
The parameter system automatically handles the conversion from YAML to Python data types, so nested dictionaries are preserved as Python dictionaries.
Loading Parameters from Launch Files
You can load parameters from YAML files in your launch file and pass them to your node:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os
def generate_launch_description():
# Declare launch argument for the parameter file
param_file = DeclareLaunchArgument(
'param_file',
default_value='config/your_config.yaml',
description='Path to the parameter file'
)
# Create the node with parameters from the YAML file
node = Node(
package='your_package',
executable='your_node',
name='my_node',
parameters=[[
LaunchConfiguration('param_file'),
{'category_sector_mapping': {
'Plastic': 0,
'Paper': 1,
'Glass': 2,
'Aluminium': 2,
'Organic': 2,
'Undifferentiated': 2,
'Empty': -1
}}
]]
)
return LaunchDescription([
param_file,
node
])
Alternatively, you can load the entire YAML file and pass it as parameters:
import yaml
from launch.substitutions import PythonExpression
# In your launch file:
with open(os.path.join(
get_package_share_directory('your_package'),
'config',
'your_config.yaml'
), 'r') as f:
param_data = yaml.safe_load(f)
node = Node(
package='your_package',
executable='your_node',
parameters=[param_data]
)
Best Practices and Considerations
Parameter Validation
Always validate your parameters after loading:
def validate_category_mapping(mapping):
"""Validate the category mapping dictionary."""
if not isinstance(mapping, dict):
self.get_logger().error("Category mapping must be a dictionary")
return False
required_keys = ['Plastic', 'Paper', 'Glass', 'Aluminium', 'Organic', 'Undifferentiated', 'Empty']
for key in required_keys:
if key not in mapping:
self.get_logger().error(f"Missing required key: {key}")
return False
return True
# Usage
if validate_category_mapping(self.category_sector_mapping):
# Proceed with valid parameters
pass
Parameter Updates
Handle parameter updates dynamically:
def __init__(self):
super().__init__('your_node')
# Declare parameters
self.declare_parameter('category_sector_mapping', {})
# Add parameter callback
self.add_on_set_parameters_callback(self.parameters_callback)
# Initial load
self.update_category_mapping()
def parameters_callback(self, params):
"""Callback for parameter updates."""
for param in params:
if param.name == 'category_sector_mapping':
self.update_category_mapping(param.value)
return SetParametersResult(successful=True)
def update_category_mapping(self, mapping=None):
"""Update the category mapping from parameters."""
if mapping is None:
mapping = self.get_parameter('category_sector_mapping').value
self.category_sector_mapping = mapping
self.get_logger().info("Updated category mapping")
Error Handling
Implement robust error handling for parameter loading:
def load_parameters_safely(self):
"""Safely load parameters with error handling."""
try:
# Method 1: Through parameter server
mapping = self.get_parameter('category_sector_mapping').value
# Method 2: Direct YAML parsing (fallback)
if not mapping:
mapping = self.load_yaml_dict_fallback()
if mapping:
self.category_sector_mapping = mapping
return True
else:
self.get_logger().error("Failed to load category mapping")
return False
except Exception as e:
self.get_logger().error(f"Error loading parameters: {str(e)}")
return False
Complete Example Implementation
Here’s a complete ROS2 node implementation that demonstrates how to load and use your YAML parameters:
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.executors import SingleThreadedExecutor
from rclpy.action import ActionServer
import yaml
import os
from ament_index_python.packages import get_package_share_directory
class CategoryMappingNode(Node):
def __init__(self):
super().__init__('category_mapping_node')
# Declare the parameter
self.declare_parameter('category_sector_mapping', {})
# Load parameters
self.load_category_mapping()
# Create a timer to demonstrate parameter access
self.timer = self.create_timer(1.0, self.timer_callback)
self.get_logger().info("Category mapping node initialized")
def load_category_mapping(self):
"""Load the category mapping from parameters or YAML file."""
try:
# Try to get from parameter server first
param_value = self.get_parameter('category_sector_mapping').value
if param_value and isinstance(param_value, dict):
self.category_sector_mapping = param_value
self.get_logger().info("Loaded category mapping from parameter server")
return
# Fallback: load from YAML file directly
self.load_from_yaml_file()
except Exception as e:
self.get_logger().error(f"Error loading category mapping: {str(e)}")
def load_from_yaml_file(self):
"""Load category mapping from YAML file as fallback."""
try:
package_name = 'your_package_name' # Replace with your package name
config_file = 'your_config.yaml' # Replace with your config file
yaml_path = os.path.join(
get_package_share_directory(package_name),
'config',
config_file
)
with open(yaml_path, 'r') as file:
config_data = yaml.safe_load(file)
# Extract the specific parameter
if 'my_node' in config_data and 'category_sector_mapping' in config_data['my_node']:
self.category_sector_mapping = config_data['my_node']['category_sector_mapping']
self.get_logger().info("Loaded category mapping from YAML file")
else:
self.get_logger().error("Category mapping not found in YAML file")
self.category_sector_mapping = {}
except FileNotFoundError:
self.get_logger().error("YAML configuration file not found")
self.category_sector_mapping = {}
except yaml.YAMLError as e:
self.get_logger().error(f"Error parsing YAML file: {str(e)}")
self.category_sector_mapping = {}
except Exception as e:
self.get_logger().error(f"Unexpected error loading YAML: {str(e)}")
self.category_sector_mapping = {}
def timer_callback(self):
"""Timer callback to demonstrate parameter usage."""
self.get_logger().info(f"Current category mapping: {self.category_sector_mapping}")
# Example usage: get sector for a category
category = 'Plastic'
if category in self.category_sector_mapping:
sector = self.category_sector_mapping[category]
self.get_logger().info(f"'{category}' maps to sector {sector}")
else:
self.get_logger().warning(f"Category '{category}' not found in mapping")
def main(args=None):
rclpy.init(args=args)
node = CategoryMappingNode()
executor = SingleThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
To use this with your specific YAML structure, make sure your launch file loads the parameters correctly:
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os
def generate_launch_description():
# Declare launch argument
param_file = DeclareLaunchArgument(
'param_file',
default_value='config/your_config.yaml',
description='Path to parameter file'
)
# Create node with parameters
node = Node(
package='your_package_name',
executable='category_mapping_node',
name='my_node',
parameters=[
LaunchConfiguration('param_file'),
{
'category_sector_mapping': {
'Plastic': 0,
'Paper': 1,
'Glass': 2,
'Aluminium': 2,
'Organic': 2,
'Undifferentiated': 2,
'Empty': -1
}
}
]
)
return LaunchDescription([
param_file,
node
])
This comprehensive approach gives you multiple ways to access your YAML parameters as Python dictionaries in ROS2, with fallback mechanisms and proper error handling to ensure robust operation.
Sources
- Stack Overflow - Interpreting YAML parameters in ROS2 as dict
- ROS2 python-launchfile: Load parameters.yaml into node’s parameter
- Robotics Back-End - ROS2 YAML For Parameters
- ROS2 Documentation - Understanding Parameters
- ROS Documentation - Parameters
Conclusion
To interpret YAML parameters in ROS2 as Python dictionaries, you have several effective approaches:
- Use pyYAML for direct file parsing when you need fine-grained control over the loading process
- Leverage ROS2’s parameter system for automatic type conversion and parameter management
- Implement robust error handling with fallback mechanisms to ensure your node always has valid parameters
- Use parameter validation to ensure your dictionary structure meets your requirements
- Consider dynamic parameter updates if your configuration might change at runtime
The most reliable approach combines ROS2’s parameter system with direct YAML parsing as a fallback, ensuring your node can handle various deployment scenarios while maintaining the dictionary structure you need for efficient category mapping operations.