Source code for simulation.src.simulation_evaluation.src.referee.referee

"""Definition of the Referee class.

The referee is used to provide a score and other metrics to a simulated drive.

from dataclasses import dataclass
from typing import Callable

from simulation_evaluation.msg import Referee as RefereeMsg
from simulation_evaluation.msg import State as StateMsg

[docs]@dataclass class StateMachineConnector: """Helper class to store states and manipulate corresponding state machine. Args: state: State of the state machine. set_state: Call to change the state of the state_machine. """ previous_state: int """State the state machine was in before the current state.""" _state: int set_state: Callable[[int], None] """Change the state of the state_machine.""" def __init__(self, state: int, set_state: Callable[[int], None]): self.previous_state = state self._state = state self.set_state = set_state @property def state(self): """State of the state machine.""" return self._state @state.setter def state(self, val: int): self.previous_state = self._state self._state = val
[docs]@dataclass class Observation: """Track what happens while driving and calculate a score.""" start_time: float = 0 """Time [s] when the car left the start zone.""" current_time: float = 0 """Current time [s].""" start_distance: float = 0 """Distance [m] already driven in start zone.""" current_distance: float = 0 """Distance [m] already driven.""" multiplicator: float = 1 """Multiplicator used to calculate the score.""" mistakes: float = 0 """Mistakes [point] made by the car.""" parking_successes: int = 0 """Number of successful parking attempts.""" @property def score(self) -> float: """Score calculated from other attributes.""" return self.distance * self.multiplicator - self.mistakes @property def distance(self) -> float: """Difference between start and current distance.""" return self.current_distance - self.start_distance @property def duration(self) -> float: """Difference between start and current time.""" return self.current_time - self.start_time
[docs]class Referee: """Class to evaluate a drive by keeping track of the state_machines. Args: lane: Connector to lane state machine. progress: Connector to progress state machine. overtaking: Connector to overtaking state machine. parking: Connector to parking state machine. priority: Connector to priority state machine. initial_observation: Referee's observation initialization value. reset_callback: Function to reset the referee. """ def __init__( self, lane: StateMachineConnector, progress: StateMachineConnector, overtaking: StateMachineConnector, parking: StateMachineConnector, priority: StateMachineConnector, speed: StateMachineConnector, initial_observation: Observation = None, ): self.lane = lane self.progress = progress self.overtaking = overtaking self.parking = parking self.priority = priority self.speed = speed if initial_observation is None: initial_observation = Observation() self.observation = initial_observation self.state = RefereeMsg.READY self._parked = False
[docs] def update(self, time: float, distance: float): """Update the referee's observation. Args: time: Current time in seconds. distance: Distance driven in meters. """ if ( self.progress.state == StateMsg.PROGRESS_RUNNING and self.state == RefereeMsg.READY ): # This means that the car has not started driving before! self.observation.start_time = time self.observation.start_distance = distance self.state = RefereeMsg.DRIVING elif self.progress.state == StateMsg.PROGRESS_FINISHED: self.state = RefereeMsg.COMPLETED # Remainder of function only needs to be called if the test is still running. if not self.state == RefereeMsg.DRIVING: return # Get successful parking if self.parking.state == StateMsg.PARKING_SUCCESS: self._parked = True if self._parked and self.parking.state != StateMsg.PARKING_SUCCESS: self._parked = False self.observation.multiplicator += 1.0 self.observation.parking_successes += 1 print("PARKING COMPLETED SUCCESSFULLY (RECEIVED IN REFEREE)") self.observation.current_distance = distance # Get current distance self.observation.current_time = time # Get time in seconds if ( # Check for failures in one of the state machines self.lane.state < 0 or self.parking.state < 0 or self.overtaking.state < 0 or self.priority.state < 0 or self.speed.state < 0 # If the car is not in the right lane, it must be overtaking or parking! or ( self.lane.state != StateMsg.RIGHT_LANE and self.overtaking.state == StateMsg.OVERTAKING_BEFORE_START and self.parking.state == StateMsg.PARKING_BEFORE_START ) ): self.state = RefereeMsg.FAILED
[docs] def reset(self, initial_observation: Observation = None): """Reset referee observations, state and state machines.""" if initial_observation is None: initial_observation = Observation() self.observation = initial_observation def reset_connector(con, state): con.set_state(state) con.state = state reset_connector(self.lane, StateMsg.RIGHT_LANE) reset_connector(self.overtaking, StateMsg.OVERTAKING_BEFORE_START) reset_connector(self.parking, StateMsg.PARKING_BEFORE_START) reset_connector(self.priority, StateMsg.PRIORITY_BEFORE_START) reset_connector(self.progress, StateMsg.PROGRESS_BEFORE_START) reset_connector(self.speed, StateMsg.SPEED_UNLIMITED_ZONE) self.state = RefereeMsg.READY self._parked = False