diff --git a/data/alibaba_loader.py b/data/alibaba_loader.py index f57e1eb6..71a86938 100644 --- a/data/alibaba_loader.py +++ b/data/alibaba_loader.py @@ -6,11 +6,12 @@ import sys from collections import defaultdict from dataclasses import dataclass -from typing import List, Mapping, Optional +from functools import partial +from typing import Callable, List, Mapping, Optional, Tuple import absl -from utils import EventTime, setup_csv_logging, setup_logging +from utils import EventTime, setup_logging from workload import ( ExecutionStrategies, ExecutionStrategy, @@ -65,18 +66,27 @@ def __init__( workload_interval: EventTime, flags: "absl.flags", ): + # Initialize the flags and the logging. self._flags = flags - self._path = flags.workload_profile_path self._logger = setup_logging( name=self.__class__.__name__, log_dir=flags.log_dir, log_file=flags.log_file_name, log_level=flags.log_level, ) - self._job_data_generator = self._initialize_job_data_generator() - self._job_graphs: Mapping[str, JobGraph] = {} self._rng_seed = flags.random_seed self._rng = random.Random(self._rng_seed) + + # Initialize data for Workload generation. + self._workload_paths_and_release_policies = ( + self._construct_workload_definitions() + ) + self._job_graph_generators: Mapping[ + str, Callable + ] = self._initialize_job_graph_generators() + self._release_times_and_profiles = self._construct_release_times() + + self._job_graphs: Mapping[str, Mapping[str, JobGraph]] = {} self._release_times = self._construct_release_times() self._current_release_pointer = 0 self._workload_update_interval = ( @@ -90,14 +100,157 @@ def __init__( self._task_cpu_divisor = self._flags.alibaba_loader_task_cpu_divisor self._task_max_pow2_slots = self._flags.alibaba_loader_task_max_pow2_slots - def _construct_release_times(self): - """Construct the release times of the jobs in the workload. + def _construct_workload_definitions( + self, + ) -> List[Tuple[Optional[str], Optional[JobGraph.ReleasePolicy]]]: + """Constructs the Workloads that will be used in the simulation. + + This method uses the flags provided to the constructor to determine which + Workloads to construct. Returns: - A list of release times of the jobs in the workload. + A list of tuples of the form: + (path_to_workload_definition, release_policy) + If the path_to_workload_definition is None, then the release_policy + associated with None will be used to construct the Workload. + Otherwise, the Workload will be constructed from the file at the + path_to_workload_definition. """ - # Create the ReleasePolicy. - release_policy = None + paths_and_policies = [] + if len(self._flags.workload_profile_paths) > 0: + # If specific files were requested, then we use those. + if len(self._flags.override_release_policies) != len( + self._flags.workload_profile_paths + ): + raise ValueError( + "Number of workload profile paths and release policies must match." + ) + for index, (path, release_policy_type) in enumerate( + zip( + self._flags.workload_profile_paths, + self._flags.override_release_policies, + ) + ): + release_policy = None + if release_policy_type != "poisson": + raise NotImplementedError( + f"Release policy {release_policy_type} not implemented." + ) + else: + release_policy = self._construct_release_policy( + policy_type=release_policy_type, + arrival_rate=self._flags.override_poisson_arrival_rates[index], + num_invocations=self._flags.override_num_invocations[index], + ) + if os.path.isfile(path): + extension = pathlib.Path(path).suffix.lower() + if extension != ".pkl": + raise ValueError( + f"Invalid extension {extension} for Alibaba trace." + ) + paths_and_policies.append((path, release_policy)) + else: + raise FileNotFoundError(f"No such file or directory: {path}") + elif self._flags.workload_profile_path is not None: + path = self._flags.workload_profile_path + + # Construct the release policy. + release_policy = None + if self._flags.override_release_policy == "periodic": + release_policy = self._construct_release_policy( + policy_type=self._flags.override_release_policy, + arrival_period=EventTime( + self._flags.override_arrival_period, EventTime.Unit.US + ), + ) + elif self._flags.override_release_policy == "fixed": + release_policy = self._construct_release_policy( + policy_type=self._flags.override_release_policy, + arrival_period=EventTime( + self._flags.override_arrival_period, EventTime.Unit.US + ), + num_invocations=self._flags.override_num_invocation, + ) + elif self._flags.override_release_policy == "poisson": + release_policy = self._construct_release_policy( + policy_type=self._flags.override_release_policy, + arrival_rate=self._flags.override_poisson_arrival_rate, + num_invocations=self._flags.override_num_invocation, + ) + elif self._flags.override_release_policy == "gamma": + release_policy = self._construct_release_policy( + policy_type=self._flags.override_release_policy, + arrival_rate=self._flags.override_poisson_arrival_rate, + num_invocations=self._flags.override_num_invocation, + coefficient=self._flags.override_gamma_coefficient, + ) + elif self._flags.override_release_policy == "fixed_gamma": + release_policy = self._construct_release_policy( + policy_type=self._flags.override_release_policy, + arrival_rate=self._flags.override_poisson_arrival_rate, + base_arrival_rate=self._flags.override_base_arrival_rate, + num_invocations=self._flags.override_num_invocation, + coefficient=self._flags.override_gamma_coefficient, + ) + else: + raise NotImplementedError( + f"Release policy {self._flags.override_release_policy} not " + f"implemented." + ) + + if os.path.isdir(path): + paths_and_policies.extend( + [ + (os.path.join(path, filename), None) + for filename in os.listdir(path) + if filename.endswith(".pkl") + ] + ) + paths_and_policies.append((None, release_policy)) + elif os.path.isfile(path): + extension = pathlib.Path(path).suffix.lower() + if extension != ".pkl": + raise ValueError( + f"Invalid extension {extension} for Alibaba trace." + ) + paths_and_policies.append((path, release_policy)) + else: + raise FileNotFoundError(f"No such file or directory: {path}") + else: + raise ValueError("No workload profile path provided.") + + if len(paths_and_policies) == 0: + raise ValueError("No workload profile paths were retrieved.") + return paths_and_policies + + def _construct_release_policy( + self, + policy_type: str, + arrival_period: Optional[EventTime] = None, + num_invocations: Optional[int] = None, + arrival_rate: Optional[float] = None, + coefficient: Optional[float] = None, + base_arrival_rate: Optional[float] = None, + ) -> JobGraph.ReleasePolicy: + """Constructs the release policy from the given parameters. + + Args: + policy_type (`str`): The type of release policy to construct. + arrival_period (`Optional[EventTime]`): The arrival period to use for the + release policy. Only used for periodic and fixed release policies. + num_invocations (`Optional[int]`): The number of invocations to use for the + release policy. + arrival_rate (`Optional[float]`): The arrival rate to use for the release + policy. + coefficient (`Optional[float]`): The coefficient to use for the release + policy. + base_arrival_rate (`Optional[float]`): The base arrival rate to use for the + release policy. + + Returns: + A release policy object. + """ + # Construct the start time for this policy. start_time = EventTime( time=self._rng.randint( self._flags.randomize_start_time_min, @@ -105,52 +258,91 @@ def _construct_release_times(self): ), unit=EventTime.Unit.US, ) - if self._flags.override_release_policy == "periodic": - if self._flags.override_arrival_period == 0: + if policy_type == "periodic": + if arrival_period is None: raise ValueError( "Arrival period must be specified for periodic release policy." ) - release_policy = JobGraph.ReleasePolicy.periodic( - period=EventTime( - self._flags.override_arrival_period, EventTime.Unit.US - ), + return JobGraph.ReleasePolicy.periodic( + period=arrival_period, start=start_time, rng_seed=self._rng_seed, ) - elif self._flags.override_release_policy == "fixed": - if self._flags.override_arrival_period == 0: + elif policy_type == "fixed": + if arrival_period is None: raise ValueError( "Arrival period must be specified for fixed release policy." ) - release_policy = JobGraph.ReleasePolicy.fixed( - period=EventTime( - self._flags.override_arrival_period, EventTime.Unit.US - ), - num_invocations=self._flags.override_num_invocations, + if num_invocations is None: + raise ValueError( + "Number of invocations must be specified for fixed release policy." + ) + return JobGraph.ReleasePolicy.fixed( + period=arrival_period, + num_invocations=num_invocations, start=start_time, rng_seed=self._rng_seed, ) - elif self._flags.override_release_policy == "poisson": - release_policy = JobGraph.ReleasePolicy.poisson( - rate=self._flags.override_poisson_arrival_rate, - num_invocations=self._flags.override_num_invocations, + elif policy_type == "poisson": + if arrival_rate is None: + raise ValueError( + "Arrival rate must be specified for poisson release policy." + ) + if num_invocations is None: + raise ValueError( + "Number of invocations must be specified for poisson release " + "policy." + ) + return JobGraph.ReleasePolicy.poisson( + rate=arrival_rate, + num_invocations=num_invocations, start=start_time, rng_seed=self._rng_seed, ) - elif self._flags.override_release_policy == "gamma": - release_policy = JobGraph.ReleasePolicy.gamma( - rate=self._flags.override_poisson_arrival_rate, - num_invocations=self._flags.override_num_invocations, - coefficient=self._flags.override_gamma_coefficient, + elif policy_type == "gamma": + if arrival_rate is None: + raise ValueError( + "Arrival rate must be specified for gamma release policy." + ) + if num_invocations is None: + raise ValueError( + "Number of invocations must be specified for gamma release policy." + ) + if coefficient is None: + raise ValueError( + "Coefficient must be specified for gamma release policy." + ) + return JobGraph.ReleasePolicy.gamma( + rate=arrival_rate, + num_invocations=num_invocations, + coefficient=coefficient, start=start_time, rng_seed=self._rng_seed, ) - elif self._flags.override_release_policy == "fixed_gamma": - release_policy = JobGraph.ReleasePolicy.fixed_gamma( - variable_arrival_rate=self._flags.override_poisson_arrival_rate, - base_arrival_rate=self._flags.override_base_arrival_rate, - num_invocations=self._flags.override_num_invocations, - coefficient=self._flags.override_gamma_coefficient, + elif policy_type == "fixed_gamma": + if arrival_rate is None: + raise ValueError( + "Arrival rate must be specified for fixed_gamma release policy." + ) + if base_arrival_rate is None: + raise ValueError( + "Base arrival rate must be specified for fixed_gamma release " + "policy." + ) + if num_invocations is None: + raise ValueError( + "Number of invocations must be specified for fixed_gamma release " + "policy." + ) + if coefficient is None: + raise ValueError( + "Coefficient must be specified for fixed_gamma release policy." + ) + return JobGraph.ReleasePolicy.fixed_gamma( + variable_arrival_rate=arrival_rate, + base_arrival_rate=base_arrival_rate, + num_invocations=num_invocations, + coefficient=coefficient, start=start_time, rng_seed=self._rng_seed, ) @@ -158,49 +350,64 @@ def _construct_release_times(self): raise NotImplementedError( f"Release policy {self._flags.override_release_policy} not implemented." ) - return release_policy.get_release_times( - completion_time=EventTime(self._flags.loop_timeout, EventTime.Unit.US) - ) - def _initialize_job_data_generator(self): - """ - Initialize the job generator from the Alibaba trace file. + def _construct_release_times(self) -> List[Tuple[EventTime, str]]: + """Construct the release times of the jobs in the workload. + + Returns: + A list of release times of the jobs in the workload along with the profile + from which they are to be chosen. """ - if os.path.isdir(self._path): - file_paths = [ - os.path.join(self._path, filename) - for filename in os.listdir(self._path) - if filename.endswith(".pkl") - ] - elif os.path.isfile(self._path): - extension = pathlib.Path(self._path).suffix.lower() - if extension != ".pkl": - raise ValueError(f"Invalid extension {extension} for Alibaba trace.") - file_paths = [self._path] - else: - raise FileNotFoundError(f"No such file or directory: {self._path}") - - def job_data_generator(): - for file_path in file_paths: - with open(file_path, "rb") as pickled_file: - data: Mapping[str, List[str]] = AlibabaTaskUnpickler( - pickled_file - ).load() - for job_graph_name, job_tasks in data.items(): - try: - job_graph = self._convert_job_data_to_job_graph( - job_graph_name, job_tasks - ) - if job_graph: - self._job_graphs[job_graph_name] = job_graph - except ValueError as e: - self._logger.warning( - f"Failed to convert job graph {job_graph_name} " - f"with error {e.__class__}: {e}." - ) - yield - - return job_data_generator() + # Create the release times. + release_times = [] + + for profile_path, release_policy in self._workload_paths_and_release_policies: + if release_policy is not None: + for release_time in release_policy.get_release_times( + completion_time=EventTime( + self._flags.loop_timeout, EventTime.Unit.US + ) + ): + release_times.append((release_time, profile_path)) + + # Sort the release times. + release_times.sort() + return release_times + + def _initialize_job_graph_generators(self) -> Mapping[str, Callable]: + """Initializes the JobGraph generators for specific Workload profile paths. + + Returns: + A mapping from the Workload profile path to the JobGraph generator.""" + + # Define the JobGraph generator for a given path. + def job_graph_data_generator(path: str): + if not os.path.isfile(path): + raise FileNotFoundError(f"No such file: {path}") + with open(path, "rb") as pickled_file: + data: Mapping[str, List[str]] = AlibabaTaskUnpickler( + pickled_file + ).load() + for job_graph_name, job_tasks in data.items(): + try: + job_graph = self._convert_job_data_to_job_graph( + job_graph_name, job_tasks + ) + if job_graph: + self._job_graphs[path][job_graph_name] = job_graph + except ValueError as e: + self._logger.warning( + f"Failed to convert job graph {job_graph_name} " + f"with error {e.__class__}: {e}." + ) + + path_to_job_graph_generator_mapping = {} + for path, _ in self._workload_paths_and_release_policies: + if path is not None: + path_to_job_graph_generator_mapping[path] = partial( + job_graph_data_generator, path + ) + return path_to_job_graph_generator_mapping def _sample_normal_distribution_random(self, n, mean, std, min_val=0, max_val=100): samples = [] @@ -349,45 +556,40 @@ def _convert_job_data_to_job_graph( ) def get_next_workload(self, current_time: EventTime) -> Optional[Workload]: - # Load the next batch of jobs into our mapping. - try: - next(self._job_data_generator) - except StopIteration: - pass - - if len(self._job_graphs) == 0: - # We have no jobs to choose from, throw an error. - raise ValueError( - "No jobs to choose from. The loaded JobGraphs are empty. " - "Check the provided workload file at: {}".format(self._path) - ) - # Get the release times that fit within the range of the current_time and the # current_time + workload_interval. - released_taskgraph_times = [] + released_taskgraph_times_and_profiles = [] while ( self._current_release_pointer < len(self._release_times) - and self._release_times[self._current_release_pointer] + and self._release_times[self._current_release_pointer][0] <= current_time + self._workload_update_interval ): - released_taskgraph_times.append( + released_taskgraph_times_and_profiles.append( self._release_times[self._current_release_pointer] ) self._current_release_pointer += 1 if ( self._current_release_pointer >= len(self._release_times) - and len(released_taskgraph_times) == 0 + and len(released_taskgraph_times_and_profiles) == 0 ): # We are at the end of the times, and we didn't release anything this time. return None else: # Choose a random JobGraph and convert it to a TaskGraph to be released. task_release_index = 0 - while task_release_index < len(released_taskgraph_times): - job_graph = self._rng.choice(list(self._job_graphs.values())) + while task_release_index < len(released_taskgraph_times_and_profiles): + start_time, workload_profile = released_taskgraph_times_and_profiles[ + task_release_index + ] + if workload_profile not in self._job_graphs: + self._job_graphs[workload_profile] = {} + self._job_graph_generators[workload_profile]() + job_graph = self._rng.choice( + list(self._job_graphs[workload_profile].values()) + ) task_graph = job_graph.get_next_task_graph( - start_time=released_taskgraph_times[task_release_index], + start_time=start_time, _flags=self._flags, ) if task_graph is not None: diff --git a/main.py b/main.py index 3a6176bc..2dff2e09 100644 --- a/main.py +++ b/main.py @@ -32,7 +32,7 @@ pass from simulator import Simulator from utils import EventTime, setup_csv_logging, setup_logging -from workload import BranchPredictionPolicy, Workload +from workload import BranchPredictionPolicy, JobGraph, Workload FLAGS = flags.FLAGS @@ -422,20 +422,56 @@ flags.DEFINE_enum( "override_release_policy", "fixed", - ["periodic", "poisson", "gamma", "fixed", "closed_loop", "fixed_gamma"], + JobGraph.RELEASE_POLICIES, "Override the release policy for all TaskGraphs defined in the Workload.", ) +flags.DEFINE_list( + "override_release_policies", + [], + "Override the release policy for all TaskGraphs defined in each Workload." + "If provided, the list must be of the same length as the list of workload " + "profile paths. For a single workload profile path, use `override_release_policy`.", +) +flags.register_validator( + "override_release_policies", + lambda override_release_policies: all( + policy in JobGraph.RELEASE_POLICIES for policy in override_release_policies + ), + "All release policies must be one of {}".format(JobGraph.RELEASE_POLICIES), +) flags.DEFINE_integer( - "override_num_invocations", + "override_num_invocation", 0, "Override the number of invocations for all TaskGraphs defined in the Workload.", ) +flags.DEFINE_list( + "override_num_invocations", + [], + "Override the number of invocations for all TaskGraphs defined in each Workload." + "If provided, the list must be of the same length as the list of workload " + "profile paths. For a single workload profile path, use `override_num_invocation`.", +) +flags.register_validator( + "override_num_invocations", + lambda override_num_invocations: all( + num_invocations.isdigit() for num_invocations in override_num_invocations + ), + "All number of invocations must be an integer.", +) flags.DEFINE_float( "override_poisson_arrival_rate", 0.0, "Override the Poisson arrival rate for all TaskGraphs defined" "in the JSON workload definition.", ) +flags.DEFINE_list( + "override_poisson_arrival_rates", + [], + "Override the Poisson arrival rate for all TaskGraphs defined in each Workload." + "If provided, the list must be of the same length as the list of workload " + "profile paths. For a single workload profile path, use " + "`override_poisson_arrival_rate`.", +) flags.DEFINE_float( "override_base_arrival_rate", 0.0,