Source code for simulation.src.simulation_brain_link.src.vehicle_simulation_link.node

"""VehicleSimulationLinkNode."""

from contextlib import suppress

import geometry_msgs.msg
import rospy
import std_msgs.msg
from gazebo_simulation.msg import SetModelPose as SetModelPoseMsg
from gazebo_simulation.msg import SetModelTwist as SetModelTwistMsg
from pyquaternion import Quaternion
from simulation_brain_link.msg import State as StateEstimationMsg
from tf2_msgs.msg import TFMessage

from simulation.utils.geometry import Transform, Vector
from simulation.utils.ros_base.node_base import NodeBase


[docs]class VehicleSimulationLinkNode(NodeBase): """ROS node to translate state_estimation messages into movement of the vehicle in Gazebo and provide a simulation to world transformation. Whenever the pose of the vehicle is updated and a new message is received the updated twist is calculated and published. Attributes: set_model_twist_publisher (rospy.publisher): Publishes the new twist on the model plugins set_twist topic. get_model_pose_subscriber (rospy.subscriber): Receives the current pose of the model in Gazebo. state_estimation_subscriber (rospy.subscriber): Receives the state_estimation messages. latest_state_estimation (StateEstimationMsg): Latest received state estimation message. sub_tf (rospy.subscriber): Receive transformation updates pub_tf (rospy.publisher): Publish updates about the world to simulation transformation vehicle_world_tf (Transform): Transformation between the vehicle and world coordinate frame. """ def __init__(self): self.vehicle_world_tf = Transform([0, 0], 0) self.speed = Vector(0, 0) self.yaw_rate = 0 self.vehicle_simulation_rotation = Transform([0, 0], 0) self._last_tf_update = 0 super().__init__( name="vehicle_simulation_link_node" ) # Name can be overwritten in launch file self.run()
[docs] def start(self): """Start node.""" self.pub_tf = rospy.Publisher( "/tf", TFMessage, queue_size=100 ) # See: https://github.com/ros/geometry2/blob/melodic-devel/ # tf2_ros/src/tf2_ros/transform_broadcaster.py self.sub_tf = rospy.Subscriber("/tf", TFMessage, self.receive_tf) self.set_model_twist_publisher = rospy.Publisher( self.param.topics.model_plugin.namespace + "/" + self.param.car_name + "/" + self.param.topics.model_plugin.set.twist, SetModelTwistMsg, queue_size=1, ) self.set_model_pose_publisher = rospy.Publisher( self.param.topics.model_plugin.namespace + "/" + self.param.car_name + "/" + self.param.topics.model_plugin.set.pose, SetModelPoseMsg, queue_size=1, ) self.state_estimation_subscriber = rospy.Subscriber( self.param.topics.state_estimation, StateEstimationMsg, self.state_estimation_cb, queue_size=1, ) rospy.wait_for_message( self.param.topics.model_plugin.namespace + "/" + self.param.car_name + "/" + self.param.topics.model_plugin.get.pose, geometry_msgs.msg.Pose, ) rospy.wait_for_message(self.param.topics.state_estimation, StateEstimationMsg) self.get_model_pose_subscriber = rospy.Subscriber( self.param.topics.model_plugin.namespace + "/" + self.param.car_name + "/" + self.param.topics.model_plugin.get.pose, geometry_msgs.msg.Pose, callback=self.receive_model_pose_cb, queue_size=1, ) # Read initial position from parameters try: initial = self.param.initial_pose if len(initial) > 3: angle = initial[3] del initial[3] else: angle = 0 pos = Vector(initial) self.initial_tf = Transform(pos, angle) except KeyError: self.initial_tf = None super().start()
[docs] def stop(self): # Attribute errors can occur if the node has not been completely started # before shutting down. with suppress(AttributeError): self.state_estimation_subscriber.unregister() self.get_model_pose_subscriber.unregister() self.sub_tf.unregister() self.set_model_twist_publisher.unregister() self.pub_tf.unregister() super().stop()
[docs] def receive_model_pose_cb(self, msg: geometry_msgs.msg.Pose): """Receive new model pose.""" rot = Quaternion( msg.orientation.w, msg.orientation.x, msg.orientation.y, msg.orientation.z ) if self.param.set_twist: self.update_twist(self.latest_state_estimation, rot) if self.param.set_pose: self.update_pose() self.update_simulation_world_tf(vehicle_simulation_tf=Transform(msg))
[docs] def state_estimation_cb(self, msg: StateEstimationMsg): """Receive new state estimation.""" self.latest_state_estimation = msg
[docs] def update_twist( self, state_estimation: StateEstimationMsg, vehicle_simulation_rotation: Quaternion ): """Update the vehicle's twist with an update on the model plugins set_twist topic. Args: state_estimation (StateEstimationMsg): latest state estimation message vehicle_simulation_rotation (Transform): latest update of the rotation between the simulation and vehicle coordinate frames """ try: # The state_estimation message contains the speed in vehicle coordinates, # the vehicle world rotation converts the speed into simulation coordinates. speed_x, speed_y, speed_z = vehicle_simulation_rotation.rotate( (state_estimation.speed_x, state_estimation.speed_y, 0) ) new_vals = [] new_vals.append((SetModelTwistMsg.LINEAR_X, speed_x)) new_vals.append((SetModelTwistMsg.LINEAR_Y, speed_y)) if self.param.set_z_twist: new_vals.append((SetModelTwistMsg.LINEAR_Z, speed_z)) new_vals.append( (SetModelTwistMsg.ANGULAR_Z, self.latest_state_estimation.yaw_rate) ) keys, values = zip(*new_vals) msg = SetModelTwistMsg(keys, values) self.set_model_twist_publisher.publish(msg) rospy.logdebug(f"Updating the vehicle's twist with {new_vals}") except Exception as e: rospy.logerr(f"Error updating the vehicles twist {e}.")
[docs] def update_pose(self): try: tf = self.vehicle_world_tf if self.initial_tf is None: tf = self.vehicle_world_tf else: tf = self.initial_tf * self.vehicle_world_tf new_vals = [] new_vals.append((SetModelPoseMsg.POSITION_X, tf.translation.x)) new_vals.append((SetModelPoseMsg.POSITION_Y, tf.translation.y)) if self.param.set_z_pose: new_vals.append((SetModelPoseMsg.POSITION_Z, tf.translation.z)) new_vals.append((SetModelPoseMsg.ORIENTATION_W, tf.rotation.w)) new_vals.append((SetModelPoseMsg.ORIENTATION_X, tf.rotation.x)) new_vals.append((SetModelPoseMsg.ORIENTATION_Y, tf.rotation.y)) new_vals.append((SetModelPoseMsg.ORIENTATION_Z, tf.rotation.z)) keys, values = zip(*new_vals) msg = SetModelPoseMsg(keys, values) self.set_model_pose_publisher.publish(msg) rospy.logdebug(f"Updating the vehicle's pose with {new_vals}") except Exception as e: rospy.logerr(f"Error updating the vehicles pose {e}.")
[docs] def receive_tf(self, tf_msg: TFMessage): """Receive msg from the tf topic and extract the world to vehicle transformation.""" # Only select world to vehicle transformations def select_tf(tf): return ( tf.header.frame_id == self.param.frame.world and tf.child_frame_id == self.param.frame.vehicle ) # Select first tf (default: None) # Beware: Transformation goes from child to header frame !! vehicle_world: geometry_msgs.msg.TransformStamped = next( filter(select_tf, tf_msg.transforms), None ) if vehicle_world: self.vehicle_world_tf = Transform(vehicle_world.transform) rospy.logdebug( f"Received vehicle to world transform {vehicle_world}" f"(Converted to: {self.vehicle_world_tf} {self.vehicle_world_tf.rotation})" )
[docs] def update_simulation_world_tf(self, vehicle_simulation_tf: Transform): """Publish up to date simulation to world transformation to /tf. Args: vehicle_simulation_tf (Transform): Transformation between vehicle and simulation frames. """ if rospy.Time.now().to_sec() - self._last_tf_update == 0: return self._last_tf_update = rospy.Time.now().to_sec() tf_stamped = geometry_msgs.msg.TransformStamped() tf_stamped.header = std_msgs.msg.Header() tf_stamped.header.stamp = rospy.Time.now() # Transform from world to simulation tf_stamped.header.frame_id = self.param.frame.simulation tf_stamped.child_frame_id = self.param.frame.world # Transformation from world to # simulation == (world to vehicle -> vehicle to simulation) tf_stamped.transform = ( vehicle_simulation_tf * self.vehicle_world_tf.inverse ).to_geometry_msg() self.pub_tf.publish(TFMessage([tf_stamped]))