import argparse
import os
import cv2
import rosbag
from cv_bridge import CvBridge
[docs]def rosbag_to_images(
bag_path: str, output_dir: str, image_topic: str, name_after_header: bool = False
) -> None:
# Create output_dir if it doesnt exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Find all rosbags recursively
bag_files = []
if os.path.isdir(bag_path):
for root, dirs, files in os.walk(bag_path):
for file in files:
if file.endswith(".bag"):
bag_files.append(os.path.join(root, file))
else:
bag_files = [bag_path]
for bag_file in bag_files:
print(f"Extract images from {bag_file}.")
bag = rosbag.Bag(bag_file, "r")
bridge = CvBridge()
count = 0
for topic, msg, t in bag.read_messages(topics=[image_topic]):
cv_img = bridge.imgmsg_to_cv2(msg, desired_encoding="passthrough")
# Create image name from bag name
img_name = (
f"{bag.filename.split('/')[-1].split('.')[0]}"
f"_frame{msg.header.seq if name_after_header else count:06}.png"
)
cv2.imwrite(os.path.join(output_dir, img_name), cv_img)
count += 1
bag.close()
if __name__ == "__main__":
"""Extract a folder of images from a rosbag."""
parser = argparse.ArgumentParser(description="Extract images from a ROS bag.")
parser.add_argument("--bag", help="Directory with rosbag(s).")
parser.add_argument("--output_dir", help="Output directory.")
parser.add_argument("--image_topic", help="Image topic.")
parser.add_argument(
"--name_after_header",
help="Whether to use the message's header as the image's name.",
action="store_true",
)
args = parser.parse_args()
rosbag_to_images(args.bag, args.output_dir, args.image_topic, args.name_after_header)