Source code for simulation.src.gazebo_simulation.src.automatic_drive.node

import functools
import math
import random
from dataclasses import dataclass
from typing import Dict, List, Tuple

import geometry_msgs.msg
import rospy
import std_msgs
from simulation_brain_link.msg import State as StateEstimationMsg
from simulation_groundtruth.srv import LaneSrv, SectionSrv
from tf2_msgs.msg import TFMessage

from simulation.utils.geometry import Line, Pose, Transform, Vector
from simulation.utils.ros_base.node_base import NodeBase


[docs]@dataclass class DrivingState: distance_driven: float time: float
[docs]class AutomaticDriveNode(NodeBase): """ROS node to drive the car along the right side of the road. Instead of directly modifying the car's position and speed. The vehicle_simulation's output is emulated. I.e. the transform from the vehicle to it's world coordinate system is published and a state_estimation message published. This enables to use the vehicle_simulation_link_node to move the car and only replace KITcar_brain + vehicle_simulation! Attributes: pub_tf (rospy.publisher): Publishes the new vehicle/world transform. state_estimation_publisher (rospy.Publisher): Publishes state estimation messages. section_proxy (rospy.ServiceProxy): Connection to groundtruth section service. lane_proxy (rospy.ServiceProxy): Connection to groundtruth lane service. _driving_state (DrivingState): Keep track of time and distance along the road. """ def __init__(self): super().__init__( name="automatic_drive_node", log_level=rospy.INFO ) # Name can be overwritten in launch file self.run(function=self.update, rate=float(self.param.rate))
[docs] def start(self): """Start node.""" self.pub_tf = rospy.Publisher( "/tf", TFMessage, queue_size=100 ) # See: https://github.com/ros/geometry2/blob/melodic-devel # /tf2_ros/src/tf2_ros/transform_broadcaster.py self.state_estimation_publisher = rospy.Publisher( self.param.topics.vehicle_simulation_link.state_estimation, StateEstimationMsg, queue_size=1, ) groundtruth_topics = self.param.topics.groundtruth rospy.wait_for_service(groundtruth_topics.section, timeout=30) # Create groundtruth service proxies self.section_proxy = rospy.ServiceProxy(groundtruth_topics.section, SectionSrv) self.lane_proxy = rospy.ServiceProxy(groundtruth_topics.lane, LaneSrv) # Calculate the driving line once, so that it is cached!! self.driving_line # Read initial position from vehicle simulation link parameters try: initial = self.param.vehicle_simulation_link.initial_pose if len(initial) > 3: angle = initial[3] del initial[3] else: angle = 0 pos = Vector(initial) self.initial_tf = Transform(pos, angle) except KeyError: self.initial_tf = None self._driving_state = DrivingState(0, rospy.Time.now().to_sec()) super().start()
[docs] def stop(self): self.state_estimation_publisher.unregister() self.pub_tf.unregister() super().stop()
@functools.cached_property def middle_line(self) -> Line: """Line: Line in the middle of the road.""" # Get all sections sections: List[int] = self.section_proxy().sections assert len(sections) > 0, ( "There must be atleast one road section. " "(The groundtruth node might not be working correctly.)" ) # Concatenate the middle line of all sections return sum( (Line(self.lane_proxy(sec.id).lane_msg.middle_line) for sec in sections), Line() ) @functools.cached_property def driving_line(self) -> Tuple[Line, List[List[float]]]: """Tuple[Line, List[List[float]]]: Line where car drives. And points to stop at. """ path = Line() stops = [] def append(offset, segment, stop): nonlocal path if offset > 0: segment = segment.parallel_offset(offset, "left") elif offset < 0: segment = segment.parallel_offset(-offset, "right") path += segment if stop > 0: stops.append([path.length, stop]) if self.param.randomize_path: # Stitch a line together from varied offsets along the middle line length = self.middle_line.length x = 0 offset = 0 max_step = 4 road_width = 0.4 while x < length: offset = max( min((0.5 - random.random()) * 2 * road_width, road_width), -road_width, ) p = self.middle_line.interpolate_pose(x) orth = Vector(1, 0, 0).rotated(p.get_angle() + math.pi / 2) path += Line([p.position + offset * orth, p.position + offset * orth]) x += max_step * random.random() else: param_path: List[Dict[str, float]] = self.param.path param_path = [obj for obj in param_path if "offset" in obj] current_start = param_path[0]["start"] current_offset = param_path[0]["offset"] current_stop = 0 param_path.remove(param_path[0]) # Read the path from the parameters for obj in param_path: end_arc_length = obj["start"] before_end_line = Line.cut(self.middle_line, end_arc_length)[0] current_segment = Line.cut(before_end_line, current_start)[1] append(current_offset, current_segment, current_stop) current_offset = obj["offset"] current_start = obj["start"] current_stop = obj.get("stop", 0) current_segment = Line.cut(self.middle_line, current_start)[1] append(current_offset, current_segment, 0) return path.simplify(0.05).smooth(0.1), stops @functools.cached_property def speeds(self): out = [] speed_params = [obj for obj in self.param.path if "speed" in obj] for param in speed_params: path_length = self.middle_line.project( self.driving_line[0].interpolate(param["start"]) ) out.append([path_length, int(param["speed"])]) return out
[docs] def update(self): """Calculate and publish new car state information.""" # Update the driving state current_time = rospy.Time.now().to_sec() current_speed = self.param.speed d_time = current_time - self._driving_state.time if ( len(self.speeds) > 1 and self.speeds[1][0] <= self._driving_state.distance_driven ): # If we reach a new speed zone we delete the old one del self.speeds[0] # Set the current speed current_speed = self.speeds[0][1] if len(self.speeds) > 0 else self.param.speed current_speed /= 36 # km/h to m/s and model car scale of 1/10 # Check if the car needs to stop remaining_stops = self.driving_line[1] if len(remaining_stops) > 0: if remaining_stops[0][0] < self._driving_state.distance_driven: remaining_stops[0][1] -= d_time if remaining_stops[0][1] > 0: current_speed = 0 else: del remaining_stops[0] self._driving_state.distance_driven += d_time * current_speed if ( not self.param.loop and self._driving_state.distance_driven > self.driving_line[0].length ): rospy.signal_shutdown("Finished driving along the road.") return self._driving_state.distance_driven %= self.driving_line[0].length self._driving_state.time = current_time rospy.logdebug(f"Current driving state: {self._driving_state}") # Calculate position, speed, and yaw position = self.driving_line[0].interpolate(self._driving_state.distance_driven) # Depending on the align_with_middle_line parameter, the car is always parallel # to the middle line or to the driving line. alignment_line = ( self.middle_line if self.param.align_with_middle_line else self.driving_line[0] ) # Always let the car face into the direction of the middle line. pose = Pose( position, alignment_line.interpolate_direction(alignment_line.project(position)), ) speed = Vector(current_speed, 0) # Ignore y component of speed # Yaw rate = curvature * speed yaw_rate = ( alignment_line.interpolate_curvature( min(self._driving_state.distance_driven, alignment_line.length) ) * current_speed ) # Publish up to date messages! self.update_world_vehicle_tf( self.initial_tf.inverse * Transform(pose, pose.get_angle()) ) self.update_state_estimation(speed, yaw_rate)
[docs] def update_state_estimation(self, speed: Vector, yaw_rate: float): """Publish new state estimation message. Args: speed: Current speed in vehicle coordinates. yaw_rate: Yaw rate of the car. """ msg = StateEstimationMsg() msg.speed_x = speed.x msg.speed_y = speed.y msg.yaw_rate = yaw_rate self.state_estimation_publisher.publish(msg)
[docs] def update_world_vehicle_tf(self, vehicle_world_tf: Transform): """Publish up to date world to vehicle transformation to /tf. Args: vehicle_world_tf(Transform): Transformation between vehicle and world frames. """ tf_stamped = geometry_msgs.msg.TransformStamped() tf_stamped.header = std_msgs.msg.Header() tf_stamped.header.stamp = rospy.Time.now() # Transform from world to vehicle tf_stamped.header.frame_id = self.param.vehicle_simulation_link.frame.world tf_stamped.child_frame_id = self.param.vehicle_simulation_link.frame.vehicle # Transformation from world to vehicle tf_stamped.transform = (vehicle_world_tf).to_geometry_msg() self.pub_tf.publish(TFMessage([tf_stamped]))