Source code for simulation.src.simulation_groundtruth.src.groundtruth.node

from dataclasses import dataclass
from typing import Callable, List

import rospy
from gazebo_msgs.msg import ModelStates
from gazebo_simulation.msg import CarState as CarStateMsg
from simulation_groundtruth.msg import GroundtruthStatus
from simulation_groundtruth.msg import LabeledPolygon as LabeledPolygonMsg
from simulation_groundtruth.srv import (
    IntersectionSrv,
    IntersectionSrvRequest,
    IntersectionSrvResponse,
    LabeledPolygonSrv,
    LabeledPolygonSrvResponse,
    LaneSrv,
    LaneSrvRequest,
    LaneSrvResponse,
    ParkingSrv,
    ParkingSrvRequest,
    ParkingSrvResponse,
    SectionSrv,
    SectionSrvRequest,
    SectionSrvResponse,
)
from std_msgs.msg import Empty as EmptyMsg
from std_msgs.msg import String as StringMsg
from std_srvs.srv import Empty as EmptySrv

import simulation.utils.road.road as road_module
from simulation.src.simulation_groundtruth.src.groundtruth.groundtruth import Groundtruth
from simulation.src.simulation_groundtruth.src.groundtruth.object_controller import (
    ObjectController,
)
from simulation.src.simulation_groundtruth.src.groundtruth.renderer import Renderer
from simulation.utils.geometry import Vector
from simulation.utils.ros_base.node_base import NodeBase


[docs]@dataclass class LabeledPolygonService: """Small wrapper class to easily create ROS services that offer LabeledPolygonMsgs.""" topic: str callback: Callable[[id], List[LabeledPolygonMsg]] def __post_init__(self): self.service: rospy.Service = rospy.Service( self.topic, LabeledPolygonSrv, self.request )
[docs] def request(self, request: LabeledPolygonSrv) -> LabeledPolygonSrvResponse: """Answer service request.""" response = LabeledPolygonSrvResponse() response.polygons = self.callback(request.id) rospy.logdebug(f"Answering labeled polygon request {response.polygons}") return response
[docs] def shutdown(self): self.service.shutdown()
[docs]class GroundtruthNode(NodeBase): """ROS node providing services to access the road's groundtruth. Attributes: groundtruth (Groundtruth): Stores road sections and allows to access groundtruth information. renderer (Renderer): Render and update the road in Gazebo. object_controller (ObjectController): Create and track objects (e.g. obstacles, signs) in Gazebo. section_srv (rospy.Service): ROS service to access section ids and types. lane_srv (rospy.Service): ROS service to access the lanes of a road section. obstacle_srv (rospy.Service): ROS service to access obstacles in a road section. parking_srv (rospy.Service): ROS service to access parking spots \ and other parking information. surface_marking_srv (rospy.Service): ROS service to access surface markings in a road section. intersection_srv (rospy.Service): ROS service to access information of an intersection. """ def __init__(self, name="groundtruth_node", log_level=rospy.INFO): super().__init__(name=name, log_level=log_level) # Initialize without a road # The road is loaded in self.update() road = road_module.Road() self.groundtruth = Groundtruth(road=road) self.renderer = Renderer( road, remove_model=self._remove_model, spawn_model=self._spawn_model, pause_gazebo=self._pause_gazebo, unpause_gazebo=self._unpause_gazebo, info_callback=self.update_groundtruth_status, tile_size=Vector(self.param.tile_size), tile_resolution=Vector(self.param.tile_resolution), force_reload=self.param.force_reload_road, ) self.object_controller = ObjectController( road, remove_model=self._remove_model, spawn_model=self._spawn_model, ) self.model_names = [] self.car_state = CarStateMsg() self._groundtruth_status = GroundtruthStatus() self.run()
[docs] def _load_road(self) -> road_module.Road: """Load road sections from current road.""" seed = str( self.param.seed if self.param.seed != "__no_value__" else self.param.default_seed ) return road_module.load(self.param.road, seed=seed)
[docs] def _ensure_gazebo_startup(self): """Make sure that gazebo is correctly launched. Waiting for Gazebo is not as easy as it should be. **This function, however, is a workaround.** It attempts to spawn a model until it is there. Once the model has been spawned, it is directly removed again. At this point, Gazebo has been completely launched. Beware: The model has no collision box and is not visible. It is therefore not problematic if it is not correctly removed! """ model_base_name = "__start_up_box__" rospy.wait_for_message(self.param.topics.gazebo_models, ModelStates) i = 0 while True: # Get all models currently listed in Gazebo's models # that contain the model_base_name in their name. boxes = list(b for b in self.model_names if model_base_name in b) if len(boxes) != 0: for b in boxes: self._remove_model(b) return i += 1 self._spawn_model(f"""<model name="{model_base_name}{i}"></model>""") rospy.sleep(0.1) rospy.wait_for_message(self.param.topics.gazebo_models, ModelStates)
[docs] def start(self): # Gazebo self.spawn_publisher = rospy.Publisher( self.param.topics.spawn_model, StringMsg, queue_size=1000 ) self.remove_publisher = rospy.Publisher( self.param.topics.remove_model, StringMsg, queue_size=1000 ) self.groundtruth_status_publisher = rospy.Publisher( self.param.topics.status, GroundtruthStatus, queue_size=100 ) self.pause_physics_proxy = rospy.ServiceProxy( self.param.topics.pause_gazebo, EmptySrv ) self.unpause_physics_proxy = rospy.ServiceProxy( self.param.topics.unpause_gazebo, EmptySrv ) # Updates self.gazebo_subscriber = rospy.Subscriber( self.param.topics.gazebo_models, ModelStates, callback=self.receive_model_states, ) self.car_state_subscriber = rospy.Subscriber( self.param.topics.car_state.car_state, CarStateMsg, callback=self.receive_car_state, ) # Requests self.interrupt_subscriber = rospy.Subscriber( self.param.topics.renderer.interrupt, EmptyMsg, callback=self.receive_interrupt_request, ) self.reload_subscriber = rospy.Subscriber( self.param.topics.renderer.reload, EmptyMsg, callback=self.receive_reload_request, ) self.update() # Groundtruth services self.section_srv = rospy.Service( self.param.topics.section, SectionSrv, self.get_sections ) self.lane_srv = rospy.Service(self.param.topics.lane, LaneSrv, self.get_lanes) self.parking_srv = rospy.Service( self.param.topics.parking, ParkingSrv, self.get_parking ) self.labeled_polygon_services = [ LabeledPolygonService( self.param.topics.obstacle, callback=self.groundtruth.get_obstacle_msgs ), LabeledPolygonService( self.param.topics.surface_marking, callback=self.groundtruth.get_surface_marking_msgs, ), LabeledPolygonService( self.param.topics.traffic_sign, callback=self.groundtruth.get_traffic_sign_msgs, ), ] self.intersection_srv = rospy.Service( self.param.topics.intersection, IntersectionSrv, self.get_intersection )
[docs] def stop(self): """Turn off.""" self.section_srv.shutdown() self.lane_srv.shutdown() self.parking_srv.shutdown() self.intersection_srv.shutdown() for srv in self.labeled_polygon_services: srv.shutdown() self.pause_physics_proxy.close() self.unpause_physics_proxy.close() self.interrupt_subscriber.unregister() self.reload_subscriber.unregister() self.gazebo_subscriber.unregister() self.car_state_subscriber.unregister() self.spawn_publisher.unregister() self.remove_publisher.unregister() self.groundtruth_status_publisher.unregister()
[docs] def update(self): """Update the groundtruth including renderer and object controller.""" self.update_groundtruth_status(status=GroundtruthStatus.LOAD_NEW_ROAD) road = self._load_road() if self.param.start_groundtruth: self.groundtruth.road = road if self.param.start_renderer or self.param.start_object_controller: self._ensure_gazebo_startup() if self.param.start_renderer: self.renderer.road = road self.renderer.load(self.model_names) if self.param.start_object_controller: self.object_controller.road = road self.object_controller.load(self.model_names) self.update_groundtruth_status(status=GroundtruthStatus.READY)
"""Groundtruth Services."""
[docs] def get_sections(self, request: SectionSrvRequest) -> SectionSrvResponse: """Answer section service request.""" response = SectionSrvResponse() response.sections = self.groundtruth.get_section_msgs() rospy.logdebug(f"Answering section request {response.sections}") return response
[docs] def get_lanes(self, request: LaneSrvRequest) -> LaneSrvResponse: """Answer lane service request.""" response = LaneSrvResponse() response.lane_msg = self.groundtruth.get_lane_msg(request.id) rospy.logdebug(f"Answering lane request {response.lane_msg}") return response
[docs] def get_parking(self, request: ParkingSrvRequest) -> ParkingSrvResponse: """Answer parking service request.""" response = ParkingSrvResponse() response.left_msg, response.right_msg = self.groundtruth.get_parking_msg(request.id) rospy.logdebug(f"Answering parking request {response.left_msg, response.right_msg}") return response
[docs] def get_intersection(self, request: IntersectionSrvRequest) -> IntersectionSrvResponse: """Answer intersection service request.""" response = IntersectionSrvResponse() ( response.turn, response.rule, response.south, response.west, response.east, response.north, ) = self.groundtruth.get_intersection_msg(request.id) rospy.logdebug(f"Answering intersection request {response}") return response
# Receive updates
[docs] def receive_model_states(self, model_states: ModelStates): self.model_names = model_states.name
[docs] def receive_car_state(self, car_state: CarStateMsg): self.car_state = car_state
# Gazebo interactions
[docs] def _spawn_model(self, model_xml: str): """Spawn a model in Gazebo. Args: model_xml: XML String that defines the model. (Without sdf declaration!) """ self.spawn_publisher.publish(f'<sdf version="1.7">{model_xml}</sdf>')
[docs] def _remove_model(self, name): """Remove a model from Gazebo. Args: name: Name of the model in Gazebo. """ self.remove_publisher.publish(name)
[docs] def _pause_gazebo(self): """Pause Gazebo.""" self.pause_physics_proxy()
[docs] def _unpause_gazebo(self): """Request to start Gazebo.""" self.unpause_physics_proxy()
"""Handle requests."""
[docs] def receive_reload_request(self, _: EmptyMsg): """Receive requests to reload the world.""" self.update()
[docs] def receive_interrupt_request(self, _: EmptyMsg): """Receive requests to stop rendering processes.""" self.renderer.interrupt()
"""Publish groundtruth update."""
[docs] def update_groundtruth_status( self, status=None, processed_tiles=None, number_of_tiles=None ): self._groundtruth_status.road = self.param.road self._groundtruth_status.seed = str(self.param.seed) if status is not None: self._groundtruth_status.status = status if processed_tiles is not None: self._groundtruth_status.processed_tiles = processed_tiles if number_of_tiles is not None: self._groundtruth_status.number_of_tiles = number_of_tiles self.groundtruth_status_publisher.publish(self._groundtruth_status)