NeuroAgent

ROS2 YAML Parameters: Convert to Python Dictionary

Learn how to interpret YAML parameters in ROS2 as Python dictionaries. Complete guide with code examples for parameter loading and validation.

Question

How to interpret YAML parameters in ROS2 as a Python dictionary?

I’m working on a ROS2 project and need to access the following parameter as a dictionary:

yaml
my_node:
  category_sector_mapping: 
    Plastic: 0
    Paper: 1
    Glass: 2
    Aluminium: 2
    Organic: 2
    Undifferentiated: 2
    Empty: -1

How can I retrieve this parameter within a ROS2 node as a Python dictionary? I want to create an attribute in my node that looks like this:

python
self.category_sector_mapping = {
    "Plastic": 0,
    "Paper": 1,
    "Glass": 2,
    "Aluminium": 2,
    "Organic": 2,
    "Undifferentiated": 2,
    "Empty": -1
}

What is the proper way to load and parse YAML parameters in ROS2 nodes to achieve this structure?

NeuroAgent

You can interpret YAML parameters in ROS2 as a Python dictionary by using the pyYAML library to directly parse the YAML file, or by accessing parameters through ROS2’s parameter server API. The most direct approach is to use yaml.safe_load() to read your parameter file and extract the dictionary structure, then access it through the node’s parameter interface.

Contents

Direct YAML Parsing with pyYAML

The most straightforward method to interpret YAML parameters as a Python dictionary is using the pyYAML library. This approach gives you direct access to the parsed data structure.

First, install pyYAML if you haven’t already:

bash
pip install pyyaml

Here’s how to parse your YAML file and extract the dictionary:

python
import yaml
import os
from ament_index_python.packages import get_package_share_directory

def load_yaml_dict(package_name, file_name):
    """Load a YAML file and return its content as a dictionary."""
    yaml_file_path = os.path.join(
        get_package_share_directory(package_name),
        'config',
        file_name
    )
    
    with open(yaml_file_path, 'r') as file:
        try:
            data = yaml.safe_load(file)
            return data
        except yaml.YAMLError as exc:
            print(f"Error parsing YAML file: {exc}")
            return None

# Usage in your node
category_mapping = load_yaml_dict('your_package_name', 'your_config.yaml')
self.category_sector_mapping = category_mapping.get('my_node', {}).get('category_sector_mapping', {})

This method gives you complete control over the parsing process and allows you to manipulate the data before using it in your node.

Parameter Access in ROS2 Nodes

When parameters are loaded through ROS2’s parameter system, you can access them using the node’s parameter interface. Here’s how to access your dictionary parameter:

python
from rclpy.node import Node
from rclpy.parameter import Parameter

class YourNode(Node):
    def __init__(self):
        super().__init__('your_node')
        
        # Declare the parameter
        self.declare_parameter('category_sector_mapping', {})
        
        # Get the parameter as a dictionary
        category_mapping = self.get_parameter('category_sector_mapping').value
        
        # Store as instance attribute
        self.category_sector_mapping = category_mapping
        
        self.get_logger().info(f"Loaded category mapping: {self.category_sector_mapping}")

The parameter system automatically handles the conversion from YAML to Python data types, so nested dictionaries are preserved as Python dictionaries.

Loading Parameters from Launch Files

You can load parameters from YAML files in your launch file and pass them to your node:

python
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os

def generate_launch_description():
    # Declare launch argument for the parameter file
    param_file = DeclareLaunchArgument(
        'param_file',
        default_value='config/your_config.yaml',
        description='Path to the parameter file'
    )
    
    # Create the node with parameters from the YAML file
    node = Node(
        package='your_package',
        executable='your_node',
        name='my_node',
        parameters=[[
            LaunchConfiguration('param_file'),
            {'category_sector_mapping': {
                'Plastic': 0,
                'Paper': 1,
                'Glass': 2,
                'Aluminium': 2,
                'Organic': 2,
                'Undifferentiated': 2,
                'Empty': -1
            }}
        ]]
    )
    
    return LaunchDescription([
        param_file,
        node
    ])

Alternatively, you can load the entire YAML file and pass it as parameters:

python
import yaml
from launch.substitutions import PythonExpression

# In your launch file:
with open(os.path.join(
    get_package_share_directory('your_package'),
    'config',
    'your_config.yaml'
), 'r') as f:
    param_data = yaml.safe_load(f)

node = Node(
    package='your_package',
    executable='your_node',
    parameters=[param_data]
)

Best Practices and Considerations

Parameter Validation

Always validate your parameters after loading:

python
def validate_category_mapping(mapping):
    """Validate the category mapping dictionary."""
    if not isinstance(mapping, dict):
        self.get_logger().error("Category mapping must be a dictionary")
        return False
    
    required_keys = ['Plastic', 'Paper', 'Glass', 'Aluminium', 'Organic', 'Undifferentiated', 'Empty']
    for key in required_keys:
        if key not in mapping:
            self.get_logger().error(f"Missing required key: {key}")
            return False
    
    return True

# Usage
if validate_category_mapping(self.category_sector_mapping):
    # Proceed with valid parameters
    pass

Parameter Updates

Handle parameter updates dynamically:

python
def __init__(self):
    super().__init__('your_node')
    
    # Declare parameters
    self.declare_parameter('category_sector_mapping', {})
    
    # Add parameter callback
    self.add_on_set_parameters_callback(self.parameters_callback)
    
    # Initial load
    self.update_category_mapping()

def parameters_callback(self, params):
    """Callback for parameter updates."""
    for param in params:
        if param.name == 'category_sector_mapping':
            self.update_category_mapping(param.value)
    return SetParametersResult(successful=True)

def update_category_mapping(self, mapping=None):
    """Update the category mapping from parameters."""
    if mapping is None:
        mapping = self.get_parameter('category_sector_mapping').value
    
    self.category_sector_mapping = mapping
    self.get_logger().info("Updated category mapping")

Error Handling

Implement robust error handling for parameter loading:

python
def load_parameters_safely(self):
    """Safely load parameters with error handling."""
    try:
        # Method 1: Through parameter server
        mapping = self.get_parameter('category_sector_mapping').value
        
        # Method 2: Direct YAML parsing (fallback)
        if not mapping:
            mapping = self.load_yaml_dict_fallback()
            
        if mapping:
            self.category_sector_mapping = mapping
            return True
        else:
            self.get_logger().error("Failed to load category mapping")
            return False
            
    except Exception as e:
        self.get_logger().error(f"Error loading parameters: {str(e)}")
        return False

Complete Example Implementation

Here’s a complete ROS2 node implementation that demonstrates how to load and use your YAML parameters:

python
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.executors import SingleThreadedExecutor
from rclpy.action import ActionServer
import yaml
import os
from ament_index_python.packages import get_package_share_directory

class CategoryMappingNode(Node):
    def __init__(self):
        super().__init__('category_mapping_node')
        
        # Declare the parameter
        self.declare_parameter('category_sector_mapping', {})
        
        # Load parameters
        self.load_category_mapping()
        
        # Create a timer to demonstrate parameter access
        self.timer = self.create_timer(1.0, self.timer_callback)
        
        self.get_logger().info("Category mapping node initialized")

    def load_category_mapping(self):
        """Load the category mapping from parameters or YAML file."""
        try:
            # Try to get from parameter server first
            param_value = self.get_parameter('category_sector_mapping').value
            
            if param_value and isinstance(param_value, dict):
                self.category_sector_mapping = param_value
                self.get_logger().info("Loaded category mapping from parameter server")
                return
            
            # Fallback: load from YAML file directly
            self.load_from_yaml_file()
            
        except Exception as e:
            self.get_logger().error(f"Error loading category mapping: {str(e)}")

    def load_from_yaml_file(self):
        """Load category mapping from YAML file as fallback."""
        try:
            package_name = 'your_package_name'  # Replace with your package name
            config_file = 'your_config.yaml'    # Replace with your config file
            
            yaml_path = os.path.join(
                get_package_share_directory(package_name),
                'config',
                config_file
            )
            
            with open(yaml_path, 'r') as file:
                config_data = yaml.safe_load(file)
                
            # Extract the specific parameter
            if 'my_node' in config_data and 'category_sector_mapping' in config_data['my_node']:
                self.category_sector_mapping = config_data['my_node']['category_sector_mapping']
                self.get_logger().info("Loaded category mapping from YAML file")
            else:
                self.get_logger().error("Category mapping not found in YAML file")
                self.category_sector_mapping = {}
                
        except FileNotFoundError:
            self.get_logger().error("YAML configuration file not found")
            self.category_sector_mapping = {}
        except yaml.YAMLError as e:
            self.get_logger().error(f"Error parsing YAML file: {str(e)}")
            self.category_sector_mapping = {}
        except Exception as e:
            self.get_logger().error(f"Unexpected error loading YAML: {str(e)}")
            self.category_sector_mapping = {}

    def timer_callback(self):
        """Timer callback to demonstrate parameter usage."""
        self.get_logger().info(f"Current category mapping: {self.category_sector_mapping}")
        
        # Example usage: get sector for a category
        category = 'Plastic'
        if category in self.category_sector_mapping:
            sector = self.category_sector_mapping[category]
            self.get_logger().info(f"'{category}' maps to sector {sector}")
        else:
            self.get_logger().warning(f"Category '{category}' not found in mapping")

def main(args=None):
    rclpy.init(args=args)
    
    node = CategoryMappingNode()
    
    executor = SingleThreadedExecutor()
    executor.add_node(node)
    
    try:
        executor.spin()
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

To use this with your specific YAML structure, make sure your launch file loads the parameters correctly:

python
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
import os

def generate_launch_description():
    # Declare launch argument
    param_file = DeclareLaunchArgument(
        'param_file',
        default_value='config/your_config.yaml',
        description='Path to parameter file'
    )
    
    # Create node with parameters
    node = Node(
        package='your_package_name',
        executable='category_mapping_node',
        name='my_node',
        parameters=[
            LaunchConfiguration('param_file'),
            {
                'category_sector_mapping': {
                    'Plastic': 0,
                    'Paper': 1,
                    'Glass': 2,
                    'Aluminium': 2,
                    'Organic': 2,
                    'Undifferentiated': 2,
                    'Empty': -1
                }
            }
        ]
    )
    
    return LaunchDescription([
        param_file,
        node
    ])

This comprehensive approach gives you multiple ways to access your YAML parameters as Python dictionaries in ROS2, with fallback mechanisms and proper error handling to ensure robust operation.

Sources

  1. Stack Overflow - Interpreting YAML parameters in ROS2 as dict
  2. ROS2 python-launchfile: Load parameters.yaml into node’s parameter
  3. Robotics Back-End - ROS2 YAML For Parameters
  4. ROS2 Documentation - Understanding Parameters
  5. ROS Documentation - Parameters

Conclusion

To interpret YAML parameters in ROS2 as Python dictionaries, you have several effective approaches:

  • Use pyYAML for direct file parsing when you need fine-grained control over the loading process
  • Leverage ROS2’s parameter system for automatic type conversion and parameter management
  • Implement robust error handling with fallback mechanisms to ensure your node always has valid parameters
  • Use parameter validation to ensure your dictionary structure meets your requirements
  • Consider dynamic parameter updates if your configuration might change at runtime

The most reliable approach combines ROS2’s parameter system with direct YAML parsing as a fallback, ensuring your node can handle various deployment scenarios while maintaining the dictionary structure you need for efficient category mapping operations.