Source code for simulation.src.simulation_groundtruth.src.groundtruth.visualization

#!/usr/bin/env python3
"""Node that publishes groundtruth information for rviz."""

from typing import List, Tuple

import rospy
from geometry_msgs.msg import Point32 as GeomPoint
from simulation_groundtruth.msg import Parking as ParkingMsg

# Received from simulation.src.simulation_groundtruth.src.groundtruth extractor
from simulation_groundtruth.srv import (
    IntersectionSrv,
    LabeledPolygonSrv,
    LaneSrv,
    ParkingSrv,
    SectionSrv,
)
from visualization_msgs.msg import Marker

import simulation.utils.road.sections.type as road_section_type
import simulation.utils.ros_base.visualization as visualization
from simulation.utils.geometry import Point, Polygon
from simulation.utils.ros_base.node_base import NodeBase


[docs]class GroundtruthVisualizationNode(NodeBase): """ROS node to visualize groundtruth objects in RVIZ. Attributes: publishers (Dict[rospy.Publisher]): Automatically generated dictionary \ containing all publishers. A publisher is created for every visualization \ topic listed in the parameter file. get_sections, get_lane, get_parking, get_obstacles, get_intersection \ (rospy.ServiceProxy): Can be called for groundtruth information. """ def __init__(self): super().__init__(name="groundtruth_visualization_node") # Run the update function with a rate of self.param.rate self.run(function=self.update, rate=self.param.rate)
[docs] def start(self): """Start the node by creating publishers and service proxies.""" rospy.wait_for_service(self.param.topics.section) # For all entries in topics/visualization a publisher is created! # and added to the publishers dictionary. rospy.loginfo( f"Creating publishers for these visualization topics: \ {self.param.topics.visualization.as_dict()}" ) self.publishers = { name: rospy.Publisher(topic, Marker, queue_size=100) for name, topic in self.param.topics.visualization.as_dict().items() } self.get_sections = rospy.ServiceProxy(self.param.topics.section, SectionSrv) self.get_lane = rospy.ServiceProxy(self.param.topics.lane, LaneSrv) self.get_parking = rospy.ServiceProxy(self.param.topics.parking, ParkingSrv) self.get_obstacles = rospy.ServiceProxy( self.param.topics.obstacle, LabeledPolygonSrv ) self.get_signs = rospy.ServiceProxy( self.param.topics.traffic_sign, LabeledPolygonSrv ) self.get_surface_markings = rospy.ServiceProxy( self.param.topics.surface_marking, LabeledPolygonSrv ) self.get_intersection = rospy.ServiceProxy( self.param.topics.intersection, IntersectionSrv )
[docs] def stop(self): """Stop the node by unregistering publishers.""" for _, publisher in self.publishers.items(): publisher.unregister()
[docs] def _publish_point_marker( self, points: List[GeomPoint], publisher_name: str, id: int, rgba: List[float], ns="simulation/groundtruth", ): """Publish an RVIZ marker on the publisher's topic. Args: points: Points to be published. publisher_name: Key of the publisher in the publisher dictionary. id: RVIZ marker id. rgba: List of the marker color. ns: RVIZ namespace of the marker. """ marker = visualization.get_marker_for_points( (Point(p) for p in points), frame_id=self.param.vehicle_simulation_link.frame.simulation, rgba=rgba, id=id, ns=ns, duration=100 / self.param.rate, # Groundtruth is too slow otherwise! ) self.publishers[publisher_name].publish(marker)
[docs] def _update_labeled_polygon_topic( self, proxy: rospy.ServiceProxy, publisher_name: str, marker_color: Tuple[float, float, float, float], ): """Publish polygons for all elements.""" els = sum( ( [ Polygon(msg.frame).to_geometry_msg().points for msg in proxy(sec.id).polygons ] for sec in self.get_sections().sections ), [], ) for id, el in enumerate(els): self._publish_point_marker(el, publisher_name, id, marker_color)
[docs] def _show_lane_markers(self): """Publish markers for all lane markings of road.""" for id in (sec.id for sec in self.get_sections().sections): lane_msg = self.get_lane(id).lane_msg self._publish_point_marker( lane_msg.left_line, "left_line", id, self.param.colors.left_line, ) self._publish_point_marker( lane_msg.middle_line, "middle_line", id, self.param.colors.middle_line, ) self._publish_point_marker( lane_msg.right_line, "right_line", id, self.param.colors.right_line, )
[docs] def _show_parking_markers(self): """Publish markers for all parking lines and spots of road.""" parking_sections = [ s for s in self.get_sections().sections if s.type == road_section_type.PARKING_AREA ] parking_counter = 0 spot_counter = 0 for sec in parking_sections: parking_srv = self.get_parking(sec.id) left_msg, right_msg = parking_srv.left_msg, parking_srv.right_msg def show_borders(borders, topic_name, ns): nonlocal parking_counter for border in borders: self._publish_point_marker( border.points, topic_name, parking_counter, self.param.colors.parking_border, ns="/simulation/groundtruth/" + ns, ) parking_counter += 1 show_borders(left_msg.borders, "parking_line", "parking_left") show_borders(right_msg.borders, "parking_line", "parking_right") spots = [] spots.extend(left_msg.spots) spots.extend(right_msg.spots) for spot in spots: # Extract all four points points = Polygon(spot.frame.points).to_geometry_msg().points # self.rviz_parking_spot_publisher.publish(poly) color = self.param.colors.parking_spot_free if spot.type == ParkingMsg.SPOT_X: points = [ points[0], points[1], points[3], points[2], points[0], points[3], points[2], points[1], ] # Create an x in the visualization color = self.param.colors.parking_spot_x elif spot.type == ParkingMsg.SPOT_OCCUPIED: color = self.param.colors.parking_spot_occupied self._publish_point_marker(points, "parking_spot", spot_counter, color) spot_counter += 1
[docs] def _show_intersection_markers(self): """Publish lanes and surface markings of all intersections.""" intersec_id = 0 def show_lines(lane_msg): nonlocal intersec_id self._publish_point_marker( lane_msg.left_line, "intersection", intersec_id, self.param.colors.left_line, ns="/simulation/groundtruth/intersection", ) intersec_id += 1 self._publish_point_marker( lane_msg.middle_line, "intersection", intersec_id, self.param.colors.middle_line, ns="/simulation/groundtruth/intersection", ) intersec_id += 1 self._publish_point_marker( lane_msg.right_line, "intersection", intersec_id, self.param.colors.right_line, ns="/simulation/groundtruth/intersection", ) intersec_id += 1 for intersection in ( self.get_intersection(sec.id) for sec in self.get_sections().sections if sec.type == road_section_type.INTERSECTION ): # Show all lines show_lines(intersection.south) show_lines(intersection.west) show_lines(intersection.east) show_lines(intersection.north)
[docs] def update(self): """Update all markers.""" rospy.logdebug("Updating groundtruth visualization.") self._show_lane_markers() self._show_parking_markers() self._show_intersection_markers() self._update_labeled_polygon_topic( self.get_obstacles, "obstacle", self.param.colors.obstacle ) self._update_labeled_polygon_topic( self.get_signs, "traffic_sign", self.param.colors.traffic_sign ) self._update_labeled_polygon_topic( self.get_surface_markings, "surface_marking", self.param.colors.surface_marking )