Source code for simulation.src.simulation_evaluation.src.speaker.speakers.test.fake_msgs

import itertools
import math
import random
from typing import List

import simulation_groundtruth.msg as groundtruth_msgs
import simulation_groundtruth.srv as groundtruth_srvs

from simulation.utils.geometry import Line, Point, Polygon, Vector
from simulation.utils.road.sections.line_tuple import LineTuple


[docs]def create_points( start_point=Point(0, 0), offset_right=1, point_dist=1, section_count=10, point_count=100, direction=0, deviation=math.pi / 8, ) -> List[LineTuple]: """Create a line of points and split it up into multiple sections.""" # Points that are biased in one direction points = [start_point] + list( Vector(r=point_dist, phi=direction + (random.random() - 0.5) * 2 * deviation) for _ in range(point_count) ) # Prefix sum over the points to get the middle line points middle_points = list(itertools.accumulate(points)) # Divide the middle points into multiple sections def divide_chunks(): for i in range(0, point_count, int(point_count / section_count)): yield middle_points[ i : i + int(point_count / section_count) + 1 # noqa: 203 ] # Also include first point of next section middle_lines = (Line(ps) for ps in divide_chunks()) return list( LineTuple( middle_line.parallel_offset(offset_right, side="left"), middle_line, middle_line.parallel_offset(offset_right, side="right"), ) for middle_line in middle_lines )
[docs]def section_msgs(section_count: int = 4, section_types: List[int] = None): section_msgs = groundtruth_srvs.SectionSrvResponse() for i in range(0, section_count): sec = groundtruth_msgs.Section() sec.id = i if section_types: sec.type = section_types[i] else: sec.type = 5 # Parking section_msgs.sections.append(sec) return section_msgs
[docs]def section_srv(section_count: int = 4, section_types: List[int] = None): def _cal(req: groundtruth_srvs.SectionSrvRequest): return section_msgs(section_count, section_types) return _cal
[docs]def lane_msgs( lines: List[LineTuple] = [], id: int = 0 ) -> groundtruth_srvs.LaneSrvResponse: # Don't care about the default value list """Imitate the response of the groundtruth lane service by returning what's in lines at idx id.""" lane_msg = groundtruth_msgs.Lane() lane_msg.left_line = lines[id].left.to_geometry_msgs() lane_msg.middle_line = lines[id].middle.to_geometry_msgs() lane_msg.right_line = lines[id].right.to_geometry_msgs() lane_srv_response = groundtruth_srvs.LaneSrvResponse() lane_srv_response.lane_msg = lane_msg return lane_srv_response
[docs]def lane_srv(lines=None): if not lines: lines = [] def _cal(req: groundtruth_srvs.LaneSrvRequest): return lane_msgs(lines, req.id) return _cal
[docs]def obstacle_msgs( obstacles: List[Polygon], id: int ) -> groundtruth_srvs.LabeledPolygonSrvResponse: response = groundtruth_srvs.LabeledPolygonSrvResponse() response.polygons = list( groundtruth_msgs.LabeledPolygon( obtacle.to_geometry_msg(), random.random(), False, 0, "fake_obstacle" ) # Dummy type 0 for obtacle in obstacles ) return response
[docs]def obstacle_srv(obstacles=None): if not obstacles: obstacles = [] def _cal(req: groundtruth_srvs.LabeledPolygonSrvResponse): return obstacle_msgs(obstacles, req.id) return _cal
[docs]def empty_labeled_polygon_response(id: int) -> groundtruth_srvs.LabeledPolygonSrvResponse: response = groundtruth_srvs.LabeledPolygonSrvResponse() response.polygons = [] return response
[docs]def surface_marking_srv(): def _cal(req: groundtruth_srvs.LabeledPolygonSrvResponse): return empty_labeled_polygon_response(req.id) return _cal
[docs]def intersection_msg( south: groundtruth_msgs.Lane = None, west: groundtruth_msgs.Lane = None, east: groundtruth_msgs.Lane = None, north: groundtruth_msgs.Lane = None, rule: int = 0, turn: int = 0, id: int = 0, ) -> groundtruth_srvs.IntersectionSrvResponse: res = groundtruth_srvs.IntersectionSrvResponse() res.rule = rule if south: res.south = south if west: res.west = west if east: res.east = east if north: res.north = north res.rule = rule res.turn = turn return res
[docs]def intersection_srv( south: groundtruth_msgs.Lane = None, west: groundtruth_msgs.Lane = None, east: groundtruth_msgs.Lane = None, north: groundtruth_msgs.Lane = None, rule: int = 0, turn: int = 0, ): def _cal(req: groundtruth_srvs.IntersectionSrvRequest): return intersection_msg(south, west, east, north, rule, turn, req.id) return _cal
[docs]def parking_msgs( right_spots: List[Polygon], left_spots: List[Polygon], right_border: Line, left_border: Line, id: int, ) -> groundtruth_srvs.ParkingSrvResponse: response = groundtruth_srvs.ParkingSrvResponse() right_msg = groundtruth_msgs.Parking() right_msg.borders = [groundtruth_msgs.Line(right_border.to_geometry_msgs())] right_msg.spots = list( groundtruth_msgs.LabeledPolygon( spot.to_geometry_msg(), 0, False, groundtruth_msgs.Parking.SPOT_FREE, "fake_parking_spot", ) for spot in right_spots ) response.right_msg = right_msg left_msg = groundtruth_msgs.Parking() left_msg.borders = [groundtruth_msgs.Line(left_border.to_geometry_msgs())] left_msg.spots = list( groundtruth_msgs.LabeledPolygon( spot.to_geometry_msg(), 0, False, groundtruth_msgs.Parking.SPOT_FREE, "fake_parking_spot", ) for spot in left_spots ) response.left_msg = left_msg return response
[docs]def parking_srv( right_spots: List[Polygon], left_spots: List[Polygon], right_border: Line, left_border: Line, ): def _cal(req: groundtruth_srvs.ParkingSrvRequest): return parking_msgs(right_spots, left_spots, right_border, left_border, req.id) return _cal