Source code for simulation.src.gazebo_simulation.src.car_state.visualization

import rospy
from gazebo_simulation.msg import CarState as CarStateMsg
from visualization_msgs.msg import Marker

import simulation.utils.ros_base.visualization as visualization
from simulation.utils.geometry.point import Point
from simulation.utils.ros_base.node_base import NodeBase


[docs]class CarStateVisualizationNode(NodeBase): """ROS node which allows to visualize the car state in rviz. Attributes: frame_publisher (rospy.publisher): Publishes the cars frame as a rviz marker. view_cone_publisher (rospy.publisher): Publishes the cars view cone as rviz marker. state_subscriber (rospy.subscriber): Subscribes to car_state """ def __init__(self): """Initialize the node.""" super().__init__(name="car_state_visualization_node") self.run()
[docs] def start(self): """Start visualization.""" self.frame_publisher = rospy.Publisher( self.param.topics.rviz.frame, Marker, queue_size=1 ) self.view_cone_publisher = rospy.Publisher( self.param.topics.rviz.cone, Marker, queue_size=1 ) self.state_subscriber = rospy.Subscriber( self.param.topics.car_state, CarStateMsg, callback=self.state_cb ) super().start()
[docs] def stop(self): """Stop visualization.""" self.state_subscriber.unregister() self.frame_publisher.unregister() self.view_cone_publisher.unregister() super().stop()
[docs] def state_cb(self, msg: CarStateMsg): """Call when car state is published. Arguments: msg (CarStateMsg): Msg published by car state node """ frame_marker = visualization.get_marker_for_points( (Point(p) for p in msg.frame.points), frame_id=self.param.vehicle_simulation_link.frame.simulation, rgba=[0, 0, 1, 0.7], ) self.frame_publisher.publish(frame_marker) if len(msg.view_cone.points): cone_marker = visualization.get_marker_for_points( (Point(p) for p in msg.view_cone.points), frame_id=self.param.vehicle_simulation_link.frame.simulation, id=1, ) self.view_cone_publisher.publish(cone_marker)