#!/usr/bin/env python3
"""ROS node base class with a pythonic parameter interface."""
from typing import Any, Callable, Dict
import rospy
import yaml
from cachetools import TTLCache, cached
[docs]class NodeBase:
"""Abstract ROS Node class with additional functionality.
Args:
name (str): Name of the node
parameter_cache_time (int) = 1: Duration for which parameters will be cached
for performance
log_level (int) = rospy.INFO: Loglevel with which the node works.
A basic node with a subscriber and publisher can be created in the following way:
>>> from simulation.utils.ros_base.node_base import NodeBase
>>> class NodeSubClass(NodeBase):
... def __init__(self):
... super(NodeSubClass,self).__init__("node_name") # Important!
... # ...
... self.run() # Calls .start() if self.param.active is True (default: True)
... def start(self):
... # self.subscriber = ...
... # self.publisher = ...
... super().start() # Make sure to call this!
... # Called when deactivating the node by setting self.param.active to false
... # E.g. through command line with: rosparam set .../node/active false
... # or when ROS is shutting down
... def stop(self):
... # self.subscriber.unregister()
... # self.publisher.unregister()
... super().stop() # Make sure to call this!
Attributes:
param (ParameterObject): Attribute of type :class:`ParameterObject`,
which provides an abstraction layer to access ROS parameters.
The following line shows how to access a ROS parameter in any subclass
of called *param_1*:
>>> self.param.param_1 # doctest: +SKIP
\'value_1\'
This is equivalent to:
>>> rospy.get_param(\'~param_1\') # doctest: +SKIP
'value_1'
Setting a parameter is equally easy:
>>> self.param.param_2 = \'value_2\' # doctest: +SKIP
This is equivalent to:
>>> rospy.set_param(\'~param_2\', \'value_2\') # doctest: +SKIP
The magic begins when parameters are defined in a hierarchical structure.
After starting a node with the following YAML parameter file:
.. highlight:: yaml
.. code-block:: yaml
car:
name: 'dr_drift'
size:
length: 0.6
width: 0.4
...
the cars dimensions can be retrieved just like any other python attribute:
>>> self.param.car.size.length # doctest: +SKIP
0.6
and changes are also synchronized with ROS:
>>> rospy.get_param(\"~car/name\") # doctest: +SKIP
\'dr_drift\'
>>> self.param.car.name = \'captain_rapid\' # doctest: +SKIP
>>> rospy.get_param(\"~car/name\") # doctest: +SKIP
\'captain_rapid\'
"""
def __init__(
self, *, name: str, parameter_cache_time: float = 1, log_level: int = rospy.INFO
):
rospy.init_node(name, log_level=log_level)
# Parameters
self._parameter_cache = TTLCache(maxsize=128, ttl=parameter_cache_time)
self.param = ParameterObject(
ns="~", set_param_func=self._set_param, get_param_func=self._get_param
)
# Node is not yet active
self.__active = False
# Always call stop on shutdown!
rospy.on_shutdown(self.__shutdown)
# Node is by default active
try:
self.param.active
except KeyError:
self.param.active = True
def __shutdown(self):
"""Called when ROS is shutting down.
If the node was active before, self.stop is called.
"""
if self.__active:
self.__active = False
self.stop()
[docs] def _get_param(self, key: str) -> Any:
"""Get (possibly) cached ROS parameter.
Arguments:
key (str): Name of the ROS parameter
Returns:
If the parameter is in the parameter cache, the cached value is returned.
Otherwise rospy.get_param(key) is returned.
"""
# Cached version of rospy.get_param:
get_cached_param = cached(cache=self._parameter_cache)(rospy.get_param)
# request param
return get_cached_param(key)
[docs] def _set_param(self, key: str, value: Any):
"""Set ROS parameter. Also the parameter cache is cleared, to prevent incoherence.
Arguments:
key (str): Name of the ROS parameter
value (Any): New value
"""
# To ensure that there are no cache conflicts
self._parameter_cache.clear()
# Set the parameter
rospy.set_param(key, value)
[docs] def run(self, *, function: Callable = None, rate: float = 1):
"""Helper function, starting the node and shutting it down once ROS signals to. Can
only be called if the subclass implements start and stop functions.
Args:
rate (float): Rate with which to update active/ not active status of the node
function: Called with a frequency of ``rate`` when node is active
"""
rate = rospy.Rate(rate)
while not rospy.is_shutdown():
# Node should be active, but is not.
if self.param.active and not self.__active:
self.__active = True
self.start()
rospy.loginfo(f"Activating {rospy.get_name()}")
elif not self.param.active and self.__active:
self.__active = False
self.stop()
rospy.loginfo(f"Deactivating {rospy.get_name()}")
if self.__active and function:
function()
rate.sleep()
[docs] def start(self):
"""Called when activating the node."""
pass
[docs] def stop(self):
"""Called when deactivating or shutting down the node."""
pass
[docs]class ParameterObject:
"""ROS parameter wrapper to recursively get and set parameters.
This class enables to access nested parameters within nodes.
For example in any subclass of NodeBase one can call nested parameters
(if they are defined!) in the following way:
>>> self.param.dict_of_parameters.key # doctest: +SKIP
Which is the same as calling:
>>> rospy.get_param(\"~dict_of_parameters/key\") # doctest: +SKIP
This is achieved by overriding the __getattr__ and __setattr__ functions
and passing calls through to the
Arguments:
ns: Namespace this parameter dictionary operates in
set_param_func: Callable object which gets called when a parameter is set
get_param_func: Callable object which gets called when a parameter is accessed
Attributes:
_ns (str): Namespace of this object
_set_param (Callable[[str, Any], None]): Called when a new parameter should be set
_get_param (Callable[[str], Any]): Called when a parameter is requested
"""
def __init__(
self,
*,
ns: str,
set_param_func: Callable[[str, Any], None],
get_param_func: Callable[[str], Any],
):
self._ns = ns
self._set_param = set_param_func
self._get_param = get_param_func
def __getattr__(self, key: str) -> Any:
"""Retrieving a parameter.
Returns:
Value of the parameter in this namespace with key ``key`` or a ParameterObject
in the subnamespace of ``key``.
Raises:
KeyError if parameter is not found.
"""
# Load the parameter
item = self._get_param(f"{self._ns}{key}")
# If the parameter is a dictionary, a new ParameterObject is returned
# with the current key appended to the namespace!
if type(item) == dict:
return ParameterObject(
ns=f"{self._ns}{key}/",
set_param_func=self._set_param,
get_param_func=self._get_param,
)
return item
def __setattr__(self, key: str, value: Any):
"""Setting the value of an attribute."""
# No parameter
if key.startswith("_"):
object.__setattr__(self, key, value)
return
# Parameter
self._set_param(f"{self._ns}{key}", value)
[docs] def as_dict(self) -> Dict[str, Any]:
"""Return all parameters in current namespace as dicts."""
return self._get_param(self._ns)
def __repr__(self) -> str:
return yaml.dump(self._get_param(self._ns), default_flow_style=False)