Source code for simulation.src.simulation_brain_link.src.sensor_camera.node

"""Prepare Gazebo’s camera image for kitcar-ros."""

import cv2
import rospy
from cv_bridge.core import CvBridge
from sensor_msgs.msg import Image as ImageMsg

from simulation.utils.ros_base.node_base import NodeBase

[docs]class SensorCameraNode(NodeBase): """ROS node which receives the camera image, crops it and can perform preprocessing. Attributes: subscriber (rospy.subscriber): Subscribes to camera image from gazebo publisher (rospy.publisher): Publishes camera image. """ def __init__(self): super().__init__( name="sensor_camera_node" ) # Name can be overwritten in launch file if self.param.apply_gan: # Import the ros connector here because it requires the "torch" package. # The torch package is quite heavy and should only be necessary if the # neural network is used. from simulation.utils.machine_learning.cycle_gan.image_translator import ( ImageTranslator, ) self.gan_translator = ImageTranslator(self.param.use_wasserstein_gan)
[docs] def start(self): """Start node.""" self.publisher = rospy.Publisher( self.param.topics.pub_topic, ImageMsg, queue_size=1 ) self.subscriber = rospy.Subscriber( self.param.topics.sub_topic, ImageMsg, callback=self.receive_image, queue_size=1, ) super().start()
[docs] def stop(self): """Turn off node.""" self.subscriber.unregister() self.publisher.unregister() super().stop()
[docs] def receive_image(self, img_msg: ImageMsg): """Process and republish new camera image. The image is cropped to simulate the internal preprocessing of our real camera. Args: img_msg: Published image message. """ br = CvBridge() cv_img = br.imgmsg_to_cv2(img_msg) new_img = cv_img[ self.param.output_start_y : self.param.output_end_y, self.param.output_start_x : self.param.output_end_x, ] if self.param.apply_gan: new_img = self.gan_translator( new_img, f_keep_pixels=self.param.factor_keep_pixels, f_keep_colored_pixels=self.param.factor_keep_colored_pixels, ) else: new_img = cv2.cvtColor(new_img, cv2.COLOR_BGR2GRAY) out_msg = br.cv2_to_imgmsg(new_img, encoding="mono8") out_msg.header = img_msg.header self.publisher.publish(out_msg)