Source code for simulation.src.simulation_evaluation.src.speaker.speakers.test.test_zone_speaker

import functools
import random
import unittest
from typing import List

import numpy
import simulation_groundtruth.msg as groundtruth_msgs
import simulation_groundtruth.srv as groundtruth_srvs
from simulation_evaluation.msg import Speaker as SpeakerMsg

import simulation.utils.road.sections.type as road_section_type
from simulation.src.simulation_evaluation.src.speaker.speakers import ZoneSpeaker
from simulation.utils.geometry import Line, Polygon, Pose, Transform

from . import fake_msgs


[docs]class ModuleTest(unittest.TestCase):
[docs] def setUp(self): n_section = 5 section_types = [ road_section_type.LEFT_CIRCULAR_ARC, road_section_type.CUBIC_BEZIER, road_section_type.INTERSECTION, road_section_type.INTERSECTION, road_section_type.PARKING_AREA, ] n_points = 100 lane_width = 0.4 overtaking_buffer = 2 self.yield_distance = (-1, -0.5) # Create points lines = fake_msgs.create_points( section_count=n_section, point_count=n_points, offset_right=lane_width, direction=0, deviation=0, ) # Fake section and lane msg proxies / usually in the groundtruth package section_msg_proxy = functools.partial( fake_msgs.section_msgs, section_count=n_section, section_types=section_types ) lane_msg_proxy = functools.partial(fake_msgs.lane_msgs, lines) parking_msg_proxy = functools.partial(fake_msgs.parking_msgs, [], [], [], []) surface_marking_msg_proxy = fake_msgs.empty_labeled_polygon_response # Create obstacles self.overtaking_intervals = [] self.inv_overtaking_intervals = [] obstacles = [] o = Polygon([[0, 0], [0, 1], [1, 1], [1, 0]]) self.inv_overtaking_intervals.append((0, 3)) width = 1 prev_x = 0 def next_obstacles(*xs): nonlocal prev_x for x in xs: obstacles.append(Transform([x, 0], 0) * o) self.inv_overtaking_intervals.append((prev_x, xs[0] - overtaking_buffer)) self.overtaking_intervals.append( (xs[0] - overtaking_buffer, xs[-1] + width + overtaking_buffer) ) prev_x = xs[-1] + width + overtaking_buffer # Create obstacles with overlapping and non overlapping buffers next_obstacles(3) next_obstacles(10) next_obstacles(16, 18) next_obstacles(23, 24, 26) self.inv_overtaking_intervals.append((prev_x, 100)) def obstacle_proxy(id): if id == 0: return fake_msgs.obstacle_msgs(obstacles[0:4], id) if id == 1: return fake_msgs.obstacle_msgs(obstacles[4:], id) return fake_msgs.obstacle_msgs([], id) # Create intersection proxy self._rules = { 2: groundtruth_srvs.IntersectionSrvResponse.YIELD, 3: groundtruth_srvs.IntersectionSrvResponse.STOP, } def intersection_proxy(id): lane = groundtruth_msgs.Lane() if id == 2: lane.middle_line = Line([[19, 0], [20, 0]]).to_geometry_msgs() if id == 3: lane.middle_line = Line([[29, 0], [30, 0]]).to_geometry_msgs() return fake_msgs.intersection_msg(rule=self._rules[id], south=lane) self.speaker = ZoneSpeaker( section_proxy=section_msg_proxy, lane_proxy=lane_msg_proxy, parking_proxy=parking_msg_proxy, obstacle_proxy=obstacle_proxy, surface_marking_proxy=surface_marking_msg_proxy, intersection_proxy=intersection_proxy, overtaking_buffer=overtaking_buffer, start_zone_buffer=0.5, end_zone_buffer=0.5, yield_distance=self.yield_distance, )
[docs] def test_zones(self): self.assertListEqual(self.speaker.overtaking_zones, self.overtaking_intervals)
[docs] def test_stop_zones(self): self._rules[2] = groundtruth_srvs.IntersectionSrvResponse.STOP self._rules[3] = groundtruth_srvs.IntersectionSrvResponse.STOP self.assertSetEqual(set(self.speaker.stop_zones), {(19, 19.5), (29, 29.5)}) self.assertSetEqual(set(self.speaker.halt_zones), set())
[docs] def test_halt_zones(self): self._rules[2] = groundtruth_srvs.IntersectionSrvResponse.YIELD self._rules[3] = groundtruth_srvs.IntersectionSrvResponse.YIELD self.assertSetEqual(set(self.speaker.stop_zones), set()) self.assertSetEqual(set(self.speaker.halt_zones), {(19, 19.5), (29, 29.5)})
[docs] def test_speak_function(self): """Test the speakers msgs by putting the car somewhere and checking the output.""" msg_options = dict() msg_options[SpeakerMsg.START_ZONE] = {(0, 0.5)} msg_options[SpeakerMsg.DRIVING_ZONE] = {(0.5, 99.5)} msg_options[SpeakerMsg.END_ZONE] = {(99.5, 100)} msg_options[SpeakerMsg.OVERTAKING_ZONE] = self.overtaking_intervals msg_options[SpeakerMsg.NO_OVERTAKING_ZONE] = self.inv_overtaking_intervals msg_options[SpeakerMsg.PARKING_ZONE] = {(80, 100)} msg_options[SpeakerMsg.NO_PARKING_ZONE] = {(0, 80)} msg_options[SpeakerMsg.HALT_ZONE] = { (20 + self.yield_distance[0], 20 + self.yield_distance[1]) } msg_options[SpeakerMsg.STOP_ZONE] = { (30 + self.yield_distance[0], 30 + self.yield_distance[1]) } msg_options[SpeakerMsg.NO_STOP_ZONE] = { (0, 20 + self.yield_distance[0]), (20 + self.yield_distance[1], 30 + self.yield_distance[0]), (30 + self.yield_distance[1], 100), } msg_options[SpeakerMsg.SPEED_UNLIMITED_ZONE] = {(0, 100)} msg_expects = { (SpeakerMsg.START_ZONE, SpeakerMsg.DRIVING_ZONE, SpeakerMsg.END_ZONE), (SpeakerMsg.OVERTAKING_ZONE, SpeakerMsg.NO_OVERTAKING_ZONE), (SpeakerMsg.PARKING_ZONE, SpeakerMsg.NO_PARKING_ZONE), (SpeakerMsg.HALT_ZONE, SpeakerMsg.STOP_ZONE, SpeakerMsg.NO_STOP_ZONE), } for x in numpy.arange(0, 100, 0.1): # create car msg self.speaker.car_pose = Pose([x, 0], 0) result: List[SpeakerMsg] = self.speaker.speak() msg_types = {msg.type for msg in result} # Check that each of the expected msgs is in what the speaker says for exp in msg_expects: self.assertTrue(len(set(exp) & msg_types) > 0) for msg in msg_types: opts = msg_options[msg] # Check if thats correct self.assertTrue( any(interval[0] <= x and interval[1] >= x for interval in opts) )
if __name__ == "__main__": random.seed("KITCAR") unittest.main()