Source code for simulation.src.simulation_evaluation.src.drive_test.node

import rospy
from simulation_brain_link.msg import MissionMode
from simulation_evaluation.msg import Referee as RefereeMsg
from simulation_groundtruth.msg import GroundtruthStatus
from std_msgs.msg import String as StringMsg

from simulation.utils.ros_base.node_base import NodeBase


[docs]class DriveTestNode(NodeBase): """Complete an automatic test using this node. Also the gazebo simulation, KITcar_brain and the CarStateNode needs to be running. """ def __init__(self, run: bool = True): """Initialize the node and optionally start it as well. Args: run: Indicate wether to call :py:func:`run`. In a test it can be useful to do this manually. """ super().__init__(name="drive_test_node", log_level=rospy.DEBUG) self._groundtruth_status = GroundtruthStatus.READY self._previous_state_machine_msg = None self._state_machine_msg = None self.state = -1 self.parking_successes = -1 if run: self.run()
[docs] def start(self): self.mission_mode_publisher = rospy.Publisher( self.param.topics.mission_mode, MissionMode, queue_size=1 ) self.groundtruth_status_subscriber = rospy.Subscriber( self.param.topics.groundtruth.status, GroundtruthStatus, callback=self.receive_groundtruth_update, ) # 1. Ensure that evaluation pipeline is started up # Otherwise it can happen that the groundtruth is reloaded # while the speaker is starting up. # Then the speakers will operate on a false groundtruth! rospy.wait_for_message(self.param.topics.referee.info, RefereeMsg) # For some reason this is necessary for the mission mode publisher to work correctly rospy.sleep(0.1) self.referee_subscriber = rospy.Subscriber( self.param.topics.referee.info, RefereeMsg, callback=self.referee_cb, queue_size=1, ) # Subscribe to referee self.sm_info_subscriber = rospy.Subscriber( self.param.topics.state_machine.info, StringMsg, callback=self.receive_state_machine_info, queue_size=1, ) # Subscribe to sm updates rospy.wait_for_message(self.param.topics.referee.info, RefereeMsg) # Goooo self.update_mission_mode(self.param.mission_mode)
[docs] def stop(self): self.referee_subscriber.unregister() self.sm_info_subscriber.unregister() self.groundtruth_status_subscriber.unregister() self.mission_mode_publisher.unregister()
[docs] def receive_groundtruth_update(self, msg: GroundtruthStatus): """Receive GroundtruthStatus message. Args: msg: New GroundtruthStatus message """ self._groundtruth_status = msg.status
[docs] def receive_state_machine_info(self, msg: StringMsg): """Receive info message when state machines change. Args: msg: New string message """ if msg.data == self._state_machine_msg: return if self._state_machine_msg is not None: self._previous_state_machine_msg = self._state_machine_msg self._state_machine_msg = msg.data
[docs] def update_mission_mode(self, mission_mode: int): """Change the car's mission mode. Args: mission_mode: Desired mission mode. """ rospy.loginfo(f"Updating the mission mode to {mission_mode}.") msg = MissionMode() msg.header.stamp = rospy.Time.now() msg.mission_mode = mission_mode self.mission_mode_publisher.publish(msg)
[docs] def referee_cb(self, msg: RefereeMsg): """Listen for changes in the referee status.""" rospy.logdebug(f"Referee state: {msg.state}") if msg.state == RefereeMsg.FAILED or msg.state == RefereeMsg.COMPLETED: # Drive is over: save result and shutdown! self.state = msg.state self.parking_successes = msg.parking_successes self.last_state_machine_transition = ( self._previous_state_machine_msg, self._state_machine_msg, ) # Shutdown node... rospy.signal_shutdown("Drive is finished...")