Source code for simulation.utils.machine_learning.data.rosbag_to_labels

import argparse
import os

import rosbag
from simulation_groundtruth.msg import ImageLabels as ImageLabelsMsg
from simulation_groundtruth.msg import LabeledBoundingBox as LabeledBoundingBoxMsg

from .labeled_dataset import LabeledDataset


[docs]def rosbag_to_labels(bag_path: str, output_file: str, label_topic: str) -> None: # Find all rosbags recursively bag_files = [] if os.path.isdir(bag_path): for root, dirs, files in os.walk(bag_path): for file in files: if file.endswith(".bag"): bag_files.append(os.path.join(root, file)) else: bag_files = [bag_path] labeled_dataset = ( LabeledDataset.from_yaml(output_file) if os.path.exists(output_file) else LabeledDataset() ) bb_msg_fields = list(LabeledBoundingBoxMsg.__slots__) del bb_msg_fields[bb_msg_fields.index("class_description")] labeled_dataset.attributes = tuple(["img_name"] + bb_msg_fields) for bag_file in bag_files: print(f"Extract labels from {bag_file}.") bag = rosbag.Bag(bag_file, "r") try: for topic, msg, t in bag.read_messages(topics=[label_topic]): msg: ImageLabelsMsg = msg # Create image name from bag name img_name = ( f"{bag.filename.split('/')[-1].split('.')[0]}" f"_frame{msg.img_header.seq:06}.png" ) labeled_dataset.labels[img_name] = [] # Add empty entry to labels for bb in msg.bounding_boxes: labeled_dataset.append_label( key=img_name, label=[img_name] + [getattr(bb, field) for field in bb_msg_fields], ) labeled_dataset.classes[bb.class_id] = bb.class_description except Exception as e: print(f"Exception occured when reading bag file: {e}. Skipping.") bag.close() labeled_dataset.save_as_yaml(output_file)
if __name__ == "__main__": """Extract a folder of labels from a rosbag.""" parser = argparse.ArgumentParser(description="Extract labels from a ROS bag.") parser.add_argument("--bag", help="Directory with rosbag(s).") parser.add_argument("--output_file", help="Output file.") parser.add_argument("--label_topic", help="Label topic.") args = parser.parse_args() rosbag_to_labels(args.bag, args.output_file, args.label_topic)