Source code for simulation.src.simulation_evaluation.src.referee.node

from typing import Tuple

import rospy
from simulation_evaluation.msg import Broadcast as BroadcastMsg
from simulation_evaluation.msg import Referee as RefereeMsg
from simulation_evaluation.msg import State as StateMsg
from std_msgs.msg import Bool as BoolMsg

from simulation.src.simulation_evaluation.src.referee.referee import (
    Referee,
    StateMachineConnector,
)
from simulation.utils.ros_base.node_base import NodeBase


[docs]class RefereeNode(NodeBase): """Evaluate the drive. Keep track of the state machines and the broadcast speaker to calculate a score. """ def __init__(self): super().__init__(name="referee_node") self.run()
[docs] def start(self): self.info_publisher = rospy.Publisher( self.param.topics.info, RefereeMsg, queue_size=1 ) # State machine subscribers self.lane_handler = self._connect_state_machine( self.param.topics.state_machine.lane ) self.progress_handler = self._connect_state_machine( self.param.topics.state_machine.progress ) self.overtaking_handler = self._connect_state_machine( self.param.topics.state_machine.overtaking ) self.parking_handler = self._connect_state_machine( self.param.topics.state_machine.parking ) self.priority_handler = self._connect_state_machine( self.param.topics.state_machine.priority ) self.speed_handler = self._connect_state_machine( self.param.topics.state_machine.speed ) self.referee = Referee( lane=self.lane_handler[2], progress=self.progress_handler[2], overtaking=self.overtaking_handler[2], parking=self.parking_handler[2], priority=self.priority_handler[2], speed=self.speed_handler[2], ) self.broadcast_subscriber = rospy.Subscriber( self.param.topics.speaker.broadcast, BroadcastMsg, self.update ) self.reset_subscriber = rospy.Subscriber( self.param.topics.reset, BoolMsg, self.reset_cb ) super().start()
[docs] def stop(self): self.broadcast_subscriber.unregister() self.reset_subscriber.unregister() self.info_publisher.unregister() def stop_handler(h): h[0].unregister() h[1].unregister() stop_handler(self.progress_handler) stop_handler(self.overtaking_handler) stop_handler(self.parking_handler) stop_handler(self.priority_handler) stop_handler(self.speed_handler) super().stop()
[docs] def _connect_state_machine( self, sm_topics ) -> Tuple[rospy.Subscriber, rospy.Publisher, StateMachineConnector]: set_topic = sm_topics.set state_topic = sm_topics.get publisher = rospy.Publisher(set_topic, StateMsg, queue_size=1) def set(state: int): """Change state of state machine.""" msg = StateMsg() msg.state = state publisher.publish(msg) rospy.loginfo(f"Manually setting {state} in {sm_topics}") connector = StateMachineConnector(state=0, set_state=set) def update_cb(msg: StateMsg): """Listen for changes in state machine.""" connector.state = msg.state rospy.loginfo(f"State changed to {connector.state} in {sm_topics}") subscriber = rospy.Subscriber(state_topic, StateMsg, update_cb) return subscriber, publisher, connector
[docs] def update(self, broadcast: BroadcastMsg): """Update referee output with new broadcast msg.""" # Update referee with current states output = self.referee.update(rospy.Time.now().to_sec(), broadcast.distance) if output: rospy.logdebug(output) # Create msg msg = RefereeMsg() msg.state = self.referee.state observation = self.referee.observation msg.distance = self.referee.observation.distance msg.duration = observation.duration msg.mistakes = observation.mistakes msg.multiplicator = observation.multiplicator msg.score = observation.score msg.parking_successes = observation.parking_successes self.info_publisher.publish(msg) rospy.logdebug( f"Publishing referee info with state {msg.state} and time {msg.duration}" )
[docs] def reset_cb(self, msg: BoolMsg): """Reset the referee. Args: msg: Reset observation's or only the state. """ self.referee.reset( initial_observation=self.referee.observation if msg.data else None )