"""ROS node that connects multiple speakers to ROS topics."""
import rospy
from gazebo_simulation.msg import CarState as CarStateMsg
from simulation_evaluation.msg import Broadcast as BroadcastMsg
from simulation_evaluation.msg import Speaker as SpeakerMsg
from simulation_groundtruth.msg import GroundtruthStatus
from simulation_groundtruth.srv import (
IntersectionSrv,
LabeledPolygonSrv,
LaneSrv,
ParkingSrv,
SectionSrv,
)
from simulation.src.simulation_evaluation.src.speaker.speakers import (
AreaSpeaker,
BroadcastSpeaker,
EventSpeaker,
SpeedSpeaker,
ZoneSpeaker,
)
from simulation.utils.ros_base.node_base import NodeBase
[docs]class SpeakerNode(NodeBase):
"""ROS node that integrates all speakers into the ROS framework.
Attributes:
speakers (List[Tuple[Speaker, rospy.Message, rospy.Publisher]]): All speakers \
with their message type and a publisher that publishes messages created \
by the speaker.
subscriber (rospy.Subscriber): Receive CarState messages with position and speed \
of the car.
section_proxy, lane_proxy, parking_proxy, obstacle_proxy, intersection_proxy \
(rospy.ServiceProxy): Proxies that the speakers need to request \
groundtruth information.
"""
def __init__(self):
"""Initialize the node."""
super().__init__(name="speaker_node")
# Msg names
self.msg_names = {
val: name
for name, val in SpeakerMsg.__dict__.items()
if isinstance(val, int) and not name[0] == "_"
}
self.run(function=self.publish_speakers, rate=30)
[docs] def start(self):
rospy.logdebug("STARTING SPEAKER")
groundtruth_topics = self.param.topics.groundtruth
# Create groundtruth service proxies
self.section_proxy = rospy.ServiceProxy(groundtruth_topics.section, SectionSrv)
self.lane_proxy = rospy.ServiceProxy(groundtruth_topics.lane, LaneSrv)
self.parking_proxy = rospy.ServiceProxy(groundtruth_topics.parking, ParkingSrv)
self.obstacle_proxy = rospy.ServiceProxy(
groundtruth_topics.obstacle, LabeledPolygonSrv
)
self.surface_marking_proxy = rospy.ServiceProxy(
groundtruth_topics.surface_marking, LabeledPolygonSrv
)
self.intersection_proxy = rospy.ServiceProxy(
groundtruth_topics.intersection, IntersectionSrv
)
rospy.wait_for_service(groundtruth_topics.section)
self.groundtruth_status_subscriber = rospy.Subscriber(
self.param.topics.groundtruth.status,
GroundtruthStatus,
callback=self.receive_groundtruth_update,
)
# Add all speakers
self.speakers = []
def append_speaker(speaker, topic, msg_type=SpeakerMsg):
self.speakers.append(
(
speaker,
rospy.Publisher(topic, msg_type, queue_size=self.param.queue_size),
)
)
append_speaker(
AreaSpeaker(
section_proxy=self.section_proxy,
lane_proxy=self.lane_proxy,
parking_proxy=self.parking_proxy,
min_wheel_count=self.param.area.min_wheels,
area_buffer=self.param.area.buffer,
),
self.param.topics.area,
)
append_speaker(
ZoneSpeaker(
section_proxy=self.section_proxy,
lane_proxy=self.lane_proxy,
parking_proxy=self.parking_proxy,
obstacle_proxy=self.obstacle_proxy,
surface_marking_proxy=self.surface_marking_proxy,
intersection_proxy=self.intersection_proxy,
overtaking_buffer=self.param.zone.overtaking_buffer,
start_zone_buffer=self.param.zone.start_buffer,
end_zone_buffer=self.param.zone.end_buffer,
yield_distance=(
self.param.zone.yield_distance[0],
self.param.zone.yield_distance[1],
),
),
self.param.topics.zone,
)
append_speaker(
EventSpeaker(
section_proxy=self.section_proxy,
lane_proxy=self.lane_proxy,
obstacle_proxy=self.obstacle_proxy,
surface_marking_proxy=self.surface_marking_proxy,
parking_proxy=self.parking_proxy,
parking_spot_buffer=self.param.event.parking_spot_buffer,
min_parking_wheels=self.param.event.min_parking_wheels,
),
self.param.topics.event,
)
append_speaker(
SpeedSpeaker(
stop_threshold=self.param.speed.stop_threshold,
halt_time=self.param.speed.halt_time,
stop_time=self.param.speed.stop_time,
),
self.param.topics.speed,
)
append_speaker(
BroadcastSpeaker(
section_proxy=self.section_proxy,
lane_proxy=self.lane_proxy,
),
self.param.topics.broadcast,
msg_type=BroadcastMsg,
)
# Subscribe to carstate
self.subscriber = rospy.Subscriber(
self.param.topics.car_state.car_state, CarStateMsg, self.carstate_cb
)
rospy.logdebug("STARTED SPEAKER")
rospy.wait_for_message(self.param.topics.car_state.car_state, CarStateMsg)
[docs] def stop(self):
self.subscriber.unregister()
self.section_proxy.close()
self.lane_proxy.close()
self.parking_proxy.close()
self.obstacle_proxy.close()
self.surface_marking_proxy.close()
self.intersection_proxy.close()
# Stop all speaker publisher
for _, publisher in self.speakers:
publisher.unregister()
[docs] def carstate_cb(self, msg: CarStateMsg):
"""Receive CarState message and pass it to all speakers.
Args:
msg: New CarState message
"""
# Pipe car state msg to every speaker handler
for speaker, _ in self.speakers:
speaker.listen(msg)
rospy.logdebug("RECEIVED CARSTATE")
[docs] def receive_groundtruth_update(self, msg: GroundtruthStatus):
"""Receive GroundtruthStatus message.
Args:
msg: New GroundtruthStatus message
"""
self.param.active = msg.status == GroundtruthStatus.READY
[docs] def publish_speakers(self):
"""Publish the output of all speakers."""
# Publish in all speaker handlers
for speaker, publisher in self.speakers:
try:
msgs = speaker.speak()
for msg in msgs:
rospy.logdebug(
f"PUBLISHING {msg} on speaker {speaker.__class__.__name__}"
)
if type(msg) == SpeakerMsg:
msg.name = self.msg_names[msg.type]
publisher.publish(msg)
except Exception as e:
rospy.logerr(f"Exception occured in {type(speaker)}: {e}")