Source code for simulation.src.gazebo_simulation.src.gazebo_rate_control.node

"""Adjusts Gazebos update rate to guarantee sensor updates with desired frequency."""

import contextlib

import rospy
import rostopic
from gazebo_msgs.srv import (
    GetPhysicsProperties,
    GetPhysicsPropertiesRequest,
    SetPhysicsProperties,
    SetPhysicsPropertiesRequest,
)
from std_srvs.srv import Empty as EmptySrv

from simulation.utils.ros_base.node_base import NodeBase


[docs]class GazeboRateControlNode(NodeBase): """Control gazebos update rate to meet desired sensor update rates. Attributes: relevant_targets (List[Dict[str, Any]]): Topics that should be monitored and update rate that the topic should have. _last_target_update (float): Last time the targets were updated. _last_target_frequencies (List[float]): Last known update rate of each target. """ def __init__(self): """Initialize the node.""" super().__init__(name="gazebo_rate_control_node") self._last_target_frequencies = {} self._last_target_update = 0 self.relevant_targets = [] super().run(function=self.update, rate=self.param.update_rate.control.rate)
[docs] def start(self): rospy.wait_for_service(self.param.topics.get_physics, timeout=5) self.set_physics = rospy.ServiceProxy( self.param.topics.set_physics, SetPhysicsProperties, persistent=True ) self.get_physics = rospy.ServiceProxy( self.param.topics.get_physics, GetPhysicsProperties, persistent=True ) if self.param.use_sync: self.published_subscriber = rospy.Subscriber( self.param.sync.source_topic, rospy.AnyMsg, callback=self.receive_sync_source, queue_size=1, ) self.pause_physics_proxy = rospy.ServiceProxy( self.param.topics.pause_gazebo, EmptySrv ) self.unpause_physics_proxy = rospy.ServiceProxy( self.param.topics.unpause_gazebo, EmptySrv ) self.rater = rostopic.ROSTopicHz(5) # Start in very slow mode to ensure that everything is started before speeding up self._update_properties(update_rate=self.param.update_rate.min) self.subscribers = {} for target in self.param.targets: topic = target["topic"] self.subscribers[topic] = rospy.Subscriber( topic, rospy.AnyMsg, self.rater.callback_hz, callback_args=topic, ) # Wait for atleast one message on every target topic # Continue of none is received. The topic will then be disregarded (for now). with contextlib.suppress(rospy.ROSException): rospy.wait_for_message(topic, rospy.AnyMsg, timeout=0.1) super().start()
[docs] def stop(self): for sub in self.subscribers.values(): sub.unregister() self.subscribers.clear() if self.param.use_sync: self.published_subscriber.unregister() self.pause_physics_proxy.close() self.unpause_physics_proxy.close() self.set_physics.close() self.get_physics.close() super().stop()
[docs] def _calculate_update_rate( self, update_rate: float, frequency: float, desired_frequency: float, ) -> float: """Calculate new update rate. Args: update_rate: Gazebo's current maximum update_rate frequency: Current frequency desired_frequency: Optimal frequency Return: New maximum update rate. """ if frequency == 0: return # Calculate new update rate if frequency < desired_frequency: return max( update_rate - (update_rate - self.param.update_rate.min) * self.param.update_rate.control.down * desired_frequency / frequency, self.param.update_rate.min, ) elif frequency > desired_frequency: return min( update_rate + (self.param.update_rate.max - update_rate) * self.param.update_rate.control.up * desired_frequency / frequency, self.param.update_rate.max, )
[docs] def _update_properties(self, update_rate): current_properties = self.get_physics(GetPhysicsPropertiesRequest()) new_properties = SetPhysicsPropertiesRequest() new_properties.gravity = current_properties.gravity new_properties.ode_config = current_properties.ode_config new_properties.time_step = current_properties.time_step new_properties.max_update_rate = update_rate self.set_physics(new_properties)
[docs] def receive_sync_source(self, _): """Attempt to synchronize specified source topic and topic. The basic idea is that some run time dependent components (messages) should be published closely after another. This synchronization (pausing) ensures the same behavior in the simulation. """ try: self.pause_physics_proxy() rospy.wait_for_message( self.param.sync.topic, rospy.AnyMsg, self.param.sync.timeout ) except rospy.exceptions.ROSException: pass finally: self.unpause_physics_proxy()
[docs] def update(self): """Adjust Gazebos update rate to meet desired output frequency of the target topic.""" old_update_rate = self.get_physics(GetPhysicsPropertiesRequest()).max_update_rate # Update target topics every second if rospy.Time.now().to_sec() - self._last_target_update > 1: # Find available topics all_topics = {t[0] for t in rospy.get_published_topics()} rospy.logdebug(all_topics) self.relevant_targets = [ target for target in self.param.targets if target["topic"] in all_topics ] self._last_target_update = rospy.Time.now().to_sec() # Calculate new update rate considering all targets update_rates = [] for target in self.relevant_targets: topic = target["topic"] desired_frequency = target["desired"] frequency = self.rater.get_hz(topic) frequency = ( frequency[0] if frequency else self._last_target_frequencies.get(topic, 0) ) self._last_target_frequencies[topic] = frequency rate = self._calculate_update_rate( old_update_rate, frequency=frequency, desired_frequency=desired_frequency, ) if rate is not None: rospy.logdebug( f"Frequency of topic {topic}: {frequency}" f" (supposed: {desired_frequency})" ) update_rates.append(rate) if len(update_rates) > 0: new_update_rate = min(update_rates) else: return rospy.logdebug( ("Lowering" if new_update_rate < old_update_rate else "Increasing") + f" Gazebo's update rate: {old_update_rate}" f" -> {new_update_rate}" ) self._update_properties(new_update_rate)