import rospy
from gazebo_simulation.msg import CarState as CarStateMsg
from visualization_msgs.msg import Marker
import simulation.utils.ros_base.visualization as visualization
from simulation.utils.geometry.point import Point
from simulation.utils.ros_base.node_base import NodeBase
[docs]class CarStateVisualizationNode(NodeBase):
"""ROS node which allows to visualize the car state in rviz.
Attributes:
frame_publisher (rospy.publisher): Publishes the cars frame as a rviz marker.
view_cone_publisher (rospy.publisher): Publishes the cars view cone as rviz marker.
state_subscriber (rospy.subscriber): Subscribes to car_state
"""
def __init__(self):
"""Initialize the node."""
super().__init__(name="car_state_visualization_node")
self.run()
[docs] def start(self):
"""Start visualization."""
self.frame_publisher = rospy.Publisher(
self.param.topics.rviz.frame, Marker, queue_size=1
)
self.view_cone_publisher = rospy.Publisher(
self.param.topics.rviz.cone, Marker, queue_size=1
)
self.state_subscriber = rospy.Subscriber(
self.param.topics.car_state, CarStateMsg, callback=self.state_cb
)
super().start()
[docs] def stop(self):
"""Stop visualization."""
self.state_subscriber.unregister()
self.frame_publisher.unregister()
self.view_cone_publisher.unregister()
super().stop()
[docs] def state_cb(self, msg: CarStateMsg):
"""Call when car state is published.
Arguments:
msg (CarStateMsg): Msg published by car state node
"""
frame_marker = visualization.get_marker_for_points(
(Point(p) for p in msg.frame.points),
frame_id=self.param.vehicle_simulation_link.frame.simulation,
rgba=[0, 0, 1, 0.7],
)
self.frame_publisher.publish(frame_marker)
if len(msg.view_cone.points):
cone_marker = visualization.get_marker_for_points(
(Point(p) for p in msg.view_cone.points),
frame_id=self.param.vehicle_simulation_link.frame.simulation,
id=1,
)
self.view_cone_publisher.publish(cone_marker)