Source code for

import argparse
from typing import Callable, Optional

import numpy as np
from envyaml import EnvYAML
from tabulate import tabulate

from .drive_test_cmd import DriveTestCmd

[docs]def make_multiline(string: str, max_width: int): if max_width == 0: return string return "".join( [char + ("\n" if (i + 1) % max_width == 0 else "") for i, char in enumerate(string)] )
[docs]class Color: """Adds color to string for stdout."""
[docs] @staticmethod def ansi(color_code: int) -> Callable[[str], str]: """Returns a callable function to which a string can be parsed. The string will be colored in the color of the ansi color code. Args: color_code: The ansi color code in which the string should be colored. Take a look at Returns: A callable function colorize to which a string can be parsed """ def colorize(s: str) -> str: """Colorizes data with the given color. Args: s: The string which should be colored Returns: A string in the given color """ return f"\033[{color_code}m{s}\033[0m" return colorize
success = ansi.__func__(92) """Adds green to the text""" failure = ansi.__func__(91) """Adds red to the text"""
[docs]class AutomatedDriveTest: """Automated Drive Testing.""" def __init__( self, config: str, runner_index: Optional[int], total_runners: Optional[int] ): """Load config. Args: config: Path to config. """ # Use envyaml so that users can set environment variables within the yaml file. data = EnvYAML(config) # Read parameters specified inside table_header self.table_data = [data["table_header"]] self.max_string_width = data.get("table_column_max_width", 0) # Get global tests parameters default_args = data.get("default_args", {}) jobs = ( data["tests"] if runner_index is None or total_runners is None else np.array_split(data["tests"], int(total_runners))[int(runner_index) - 1] ) # Loop over all jobs # and parse parameters to DriveTestCmd # Join args and default args. self.pipeline = [DriveTestCmd(**{**default_args, **args}) for args in jobs]
[docs] def execute(self): """Execute tests inside self.pipeline.""" # Execute tests for job in self.pipeline: print(f"Starting {job.get_cmd()}") print("Waiting for the test to finish...") output = # Print log to console after job finishes # TODO: Print log while job is running print(output)
[docs] def show_results(self): """Print the table of results to stdout.""" # Colorize stdout SUCCESS = Color.success("Success") FAILED = Color.failure("Failure") # Fill table data for job in self.pipeline: row = [] # Loop over first row in table (which should be the header) # and add desc or ros_args elements from DriveTestCmd object. # Also result gets added. (Which gets created during job execution) for header_elem in self.table_data[0]: if header_elem == "desc": item = make_multiline(str(job.desc), self.max_string_width) elif header_elem == "conclusion": must_succeed = SUCCESS if job.must_succeed else None result = SUCCESS if job.success else FAILED item = f"{result} (Expected: {must_succeed})" else: item = make_multiline( str(job.ros_args.get(header_elem)), self.max_string_width ) row.append(item) self.table_data.append(row) # Print table of results print("[RESULT]".ljust(80, "-"), end=2 * "\n") print(tabulate(self.table_data, headers="firstrow", tablefmt="fancy_grid"))
[docs] def check(self): """Check if tests were successful (atleast the ones that should succeed).""" # Check if there is any failed job marked with must_succeed result_successful = not any( job.must_succeed and not job.success for job in self.pipeline ) result_message = ( Color.success("All roads marked with must_succeed have been run successfully.") if result_successful else Color.failure("Job has failed.") ) print("\n", result_message, sep="") if not result_successful: exit(1)
if __name__ == "__main__": parser = argparse.ArgumentParser( description=( "Run a number of drive test to automatically validate the car's" "behavior on predefined roads." ) ) parser.add_argument( "-c", "--config", required=True, help="Path to the config file.", ) parser.add_argument( "--runner_index", required=False, default=None, help="Select jobs only for specific runner.", ) parser.add_argument( "--total_runners", required=False, default=None, help="How many runners to use." ) args = parser.parse_args() test = AutomatedDriveTest(args.config, args.runner_index, args.total_runners) test = AutomatedDriveTest(args.config, args.runner_index, args.total_runners) test.execute() test.show_results() test.check()