diff --git a/HPCSimPickJobs.py b/HPCSimPickJobs.py index 4b9ef39..3d36ab2 100644 --- a/HPCSimPickJobs.py +++ b/HPCSimPickJobs.py @@ -64,7 +64,7 @@ def discount_cumsum(x, discount): class HPCEnv(gym.Env): - def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, batch_job_slice=0): # do nothing and return. A workaround for passing parameters to the environment + def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, batch_job_slice=0, build_sjf=False): # do nothing and return. A workaround for passing parameters to the environment super(HPCEnv, self).__init__() print("Initialize Simple HPC Env") @@ -104,6 +104,7 @@ def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, ba self.job_score_type = job_score_type self.batch_job_slice = batch_job_slice + self.build_sjf = build_sjf self.sjf_scores = [] #@profile @@ -113,54 +114,55 @@ def my_init(self, workload_file = '', sched_file = ''): self.cluster = Cluster("Cluster", self.loads.max_nodes, self.loads.max_procs/self.loads.max_nodes) self.penalty_job_score = JOB_SEQUENCE_SIZE * self.loads.max_exec_time / 10 - #calculate SJF scores for all sample sequence and save them here - index = 0 - if self.batch_job_slice == 0: - max_index = self.loads.size() - JOB_SEQUENCE_SIZE - 1 - else: - max_index = min(self.batch_job_slice, self.loads.size()) - JOB_SEQUENCE_SIZE - 1 - print("max index... initializing SJF Score Array", max_index) - - while index <= max_index: - index += 1 - if index % 100 == 0: - print("index", index) - - self.cluster.reset() - self.loads.reset() - - self.job_queue = [] - self.running_jobs = [] - self.visible_jobs = [] - self.pairs = [] - - self.current_timestamp = 0 - self.start = 0 - self.next_arriving_job_idx = 0 - self.last_job_in_batch = 0 - self.num_job_in_batch = 0 - self.scheduled_rl = {} - self.penalty = 0 - self.pivot_job = False - self.scheduled_scores = [] - - job_sequence_size = JOB_SEQUENCE_SIZE - self.pre_workloads = [] - - self.start = index; - self.start_idx_last_reset = self.start - self.num_job_in_batch = job_sequence_size - self.last_job_in_batch = self.start + self.num_job_in_batch - self.current_timestamp = self.loads[self.start].submit_time - self.job_queue.append(self.loads[self.start]) - self.next_arriving_job_idx = self.start + 1 - - if self.enable_preworkloads: - self.gen_preworkloads(job_sequence_size + self.np_random.randint(job_sequence_size)) - - self.sjf_scores.append(sum(self.schedule_curr_sequence_reset(self.sjf_score).values())) - - #print(self.sjf_scores) + if self.build_sjf: + #calculate SJF scores for all sample sequence and save them here + index = 0 + if self.batch_job_slice == 0: + max_index = self.loads.size() - JOB_SEQUENCE_SIZE - 1 + else: + max_index = min(self.batch_job_slice, self.loads.size()) - JOB_SEQUENCE_SIZE - 1 + print("max index... initializing SJF Score Array", max_index) + + while index <= max_index: + index += 1 + if index % 100 == 0: + print("index", index) + + self.cluster.reset() + self.loads.reset() + + self.job_queue = [] + self.running_jobs = [] + self.visible_jobs = [] + self.pairs = [] + + self.current_timestamp = 0 + self.start = 0 + self.next_arriving_job_idx = 0 + self.last_job_in_batch = 0 + self.num_job_in_batch = 0 + self.scheduled_rl = {} + self.penalty = 0 + self.pivot_job = False + self.scheduled_scores = [] + + job_sequence_size = JOB_SEQUENCE_SIZE + self.pre_workloads = [] + + self.start = index; + self.start_idx_last_reset = self.start + self.num_job_in_batch = job_sequence_size + self.last_job_in_batch = self.start + self.num_job_in_batch + self.current_timestamp = self.loads[self.start].submit_time + self.job_queue.append(self.loads[self.start]) + self.next_arriving_job_idx = self.start + 1 + + if self.enable_preworkloads: + self.gen_preworkloads(job_sequence_size + self.np_random.randint(job_sequence_size)) + + self.sjf_scores.append(sum(self.schedule_curr_sequence_reset(self.sjf_score).values())) + + #print(self.sjf_scores) def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) @@ -279,18 +281,25 @@ def reset(self): self.pre_workloads = [] - done = False - while not done: - # randomly sample a sequence of jobs from workload (self.start_idx_last_reset + 1) % (self.loads.size() - 2 * job_sequence_size) - assert self.batch_job_slice == 0 or self.batch_job_slice>=job_sequence_size + assert self.batch_job_slice == 0 or self.batch_job_slice>=job_sequence_size + + if self.build_sjf: + done = False + while not done: + # randomly sample a sequence of jobs from workload (self.start_idx_last_reset + 1) % (self.loads.size() - 2 * job_sequence_size + if self.batch_job_slice == 0: + self.start = self.np_random.randint(job_sequence_size, (self.loads.size() - job_sequence_size - 1)) + else: + self.start = self.np_random.randint(job_sequence_size, (self.batch_job_slice - job_sequence_size - 1)) + + if self.sjf_scores[self.start] > 10 and self.sjf_scores[self.start] < 150: + done = True + else: if self.batch_job_slice == 0: self.start = self.np_random.randint(job_sequence_size, (self.loads.size() - job_sequence_size - 1)) else: self.start = self.np_random.randint(job_sequence_size, (self.batch_job_slice - job_sequence_size - 1)) - if self.sjf_scores[self.start] > 10 and self.sjf_scores[self.start] < 150: - done = True - self.start_idx_last_reset = self.start self.num_job_in_batch = job_sequence_size self.last_job_in_batch = self.start + self.num_job_in_batch @@ -903,6 +912,6 @@ def step_for_test(self, a): current_dir = os.getcwd() workload_file = os.path.join(current_dir, args.workload) - env = HPCEnv(batch_job_slice=100) + env = HPCEnv(batch_job_slice=100, build_sjf=True) env.seed(0) env.my_init(workload_file=workload_file, sched_file=workload_file) diff --git a/compare-pick-jobs.py b/compare-pick-jobs.py index 4ac7773..72fae3b 100644 --- a/compare-pick-jobs.py +++ b/compare-pick-jobs.py @@ -222,7 +222,7 @@ def run_policy(env, get_probs, get_out, nums, iters, score_type): # initialize the environment from scratch env = HPCEnv(shuffle=args.shuffle, backfil=args.backfil, skip=args.skip, job_score_type=args.score_type, - batch_job_slice=args.batch_job_slice) + batch_job_slice=args.batch_job_slice, build_sjf=False) env.my_init(workload_file=workload_file) env.seed(args.seed) diff --git a/ppo-pick-jobs.py b/ppo-pick-jobs.py index 3baf5e1..703a46e 100644 --- a/ppo-pick-jobs.py +++ b/ppo-pick-jobs.py @@ -247,7 +247,7 @@ def ppo(workload_file, model_path, ac_kwargs=dict(), seed=0, tf.set_random_seed(seed) np.random.seed(seed) - env = HPCEnv(shuffle=shuffle, backfil=backfil, skip=skip, job_score_type=score_type, batch_job_slice=batch_job_slice) + env = HPCEnv(shuffle=shuffle, backfil=backfil, skip=skip, job_score_type=score_type, batch_job_slice=batch_job_slice, build_sjf=True) env.seed(seed) env.my_init(workload_file=workload_file, sched_file=model_path)