Source code for simulation.utils.drive_test.drive_test_cmd

import os
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict

from simulation.utils.basics.ros_cmd import ROSCmd

[docs]@dataclass class DriveTestCmd(ROSCmd): """Simple Wrapper of ROSCmd to run drive tests.""" desc: str = None """Description of the drive.""" must_succeed: bool = False """If the command has to succeed.""" success: bool = False """If the command has ran successfully.""" rosbag_path: str = None """The directory where the rosbags are stored.""" keep_rosbag: bool = False """Keep rosbag in every case""" def __init__( self, *, desc: str, must_succeed: bool = False, environment: Dict[str, Any] = None, **ros_args, ): """Initialize with description, env variables and ros arguments.""" # Check if there are all required paremeters. assert "road" in ros_args, "Argument road is missing." assert "time_limit" in ros_args, "Argument time_limit is missing." assert "mission_mode" in ros_args, "Argument mission_mode is missing." if "rosbag_path" in ros_args: self.rosbag_path = ros_args["rosbag_path"] ros_args["rosbag_path"] = os.path.join(self.rosbag_path, "rosbag") self.keep_rosbag = ros_args.get("keep_rosbag", False) # Ensure rosbags path exists Path(self.rosbag_path).mkdir(parents=True, exist_ok=True) self.desc = desc self.must_succeed = must_succeed super().__init__( "rostest simulation_evaluation drive.test", env_vars=environment, **ros_args )
[docs] def run(self) -> str: if self.rosbag_path is None: status, output = super().run() self.success = status == 0 return output dirs_before = set(os.listdir(self.rosbag_path)) status, output = super().run() dirs_after = set(os.listdir(self.rosbag_path)) self.success = status == 0 # if successful and we are allowed to delete, then delete rosbag if self.success and not self.keep_rosbag: new_dirs = dirs_after - dirs_before assert len(new_dirs) == 1, "Rosbag was not correctly recorded." shutil.rmtree(os.path.join(self.rosbag_path, new_dirs.pop())) return output