Skip to content

Commit

Permalink
add parameters to control sjf calculation in training and testing pha…
Browse files Browse the repository at this point in the history
…ses. Testing phase has no SJF calculation and does not consider whether a traj is having correct score
  • Loading branch information
daidong committed Apr 17, 2020
1 parent 34d7e76 commit ebc7481
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
123 changes: 66 additions & 57 deletions HPCSimPickJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def discount_cumsum(x, discount):


class HPCEnv(gym.Env):
def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, batch_job_slice=0): # do nothing and return. A workaround for passing parameters to the environment
def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, batch_job_slice=0, build_sjf=False): # do nothing and return. A workaround for passing parameters to the environment
super(HPCEnv, self).__init__()
print("Initialize Simple HPC Env")

Expand Down Expand Up @@ -104,6 +104,7 @@ def __init__(self,shuffle=False, backfil=False, skip=False, job_score_type=0, ba
self.job_score_type = job_score_type
self.batch_job_slice = batch_job_slice

self.build_sjf = build_sjf
self.sjf_scores = []

#@profile
Expand All @@ -113,54 +114,55 @@ def my_init(self, workload_file = '', sched_file = ''):
self.cluster = Cluster("Cluster", self.loads.max_nodes, self.loads.max_procs/self.loads.max_nodes)
self.penalty_job_score = JOB_SEQUENCE_SIZE * self.loads.max_exec_time / 10

#calculate SJF scores for all sample sequence and save them here
index = 0
if self.batch_job_slice == 0:
max_index = self.loads.size() - JOB_SEQUENCE_SIZE - 1
else:
max_index = min(self.batch_job_slice, self.loads.size()) - JOB_SEQUENCE_SIZE - 1
print("max index... initializing SJF Score Array", max_index)

while index <= max_index:
index += 1
if index % 100 == 0:
print("index", index)

self.cluster.reset()
self.loads.reset()

self.job_queue = []
self.running_jobs = []
self.visible_jobs = []
self.pairs = []

self.current_timestamp = 0
self.start = 0
self.next_arriving_job_idx = 0
self.last_job_in_batch = 0
self.num_job_in_batch = 0
self.scheduled_rl = {}
self.penalty = 0
self.pivot_job = False
self.scheduled_scores = []

job_sequence_size = JOB_SEQUENCE_SIZE
self.pre_workloads = []

self.start = index;
self.start_idx_last_reset = self.start
self.num_job_in_batch = job_sequence_size
self.last_job_in_batch = self.start + self.num_job_in_batch
self.current_timestamp = self.loads[self.start].submit_time
self.job_queue.append(self.loads[self.start])
self.next_arriving_job_idx = self.start + 1

if self.enable_preworkloads:
self.gen_preworkloads(job_sequence_size + self.np_random.randint(job_sequence_size))

self.sjf_scores.append(sum(self.schedule_curr_sequence_reset(self.sjf_score).values()))

#print(self.sjf_scores)
if self.build_sjf:
#calculate SJF scores for all sample sequence and save them here
index = 0
if self.batch_job_slice == 0:
max_index = self.loads.size() - JOB_SEQUENCE_SIZE - 1
else:
max_index = min(self.batch_job_slice, self.loads.size()) - JOB_SEQUENCE_SIZE - 1
print("max index... initializing SJF Score Array", max_index)

while index <= max_index:
index += 1
if index % 100 == 0:
print("index", index)

self.cluster.reset()
self.loads.reset()

self.job_queue = []
self.running_jobs = []
self.visible_jobs = []
self.pairs = []

self.current_timestamp = 0
self.start = 0
self.next_arriving_job_idx = 0
self.last_job_in_batch = 0
self.num_job_in_batch = 0
self.scheduled_rl = {}
self.penalty = 0
self.pivot_job = False
self.scheduled_scores = []

job_sequence_size = JOB_SEQUENCE_SIZE
self.pre_workloads = []

self.start = index;
self.start_idx_last_reset = self.start
self.num_job_in_batch = job_sequence_size
self.last_job_in_batch = self.start + self.num_job_in_batch
self.current_timestamp = self.loads[self.start].submit_time
self.job_queue.append(self.loads[self.start])
self.next_arriving_job_idx = self.start + 1

if self.enable_preworkloads:
self.gen_preworkloads(job_sequence_size + self.np_random.randint(job_sequence_size))

self.sjf_scores.append(sum(self.schedule_curr_sequence_reset(self.sjf_score).values()))

#print(self.sjf_scores)

def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
Expand Down Expand Up @@ -279,18 +281,25 @@ def reset(self):

self.pre_workloads = []

done = False
while not done:
# randomly sample a sequence of jobs from workload (self.start_idx_last_reset + 1) % (self.loads.size() - 2 * job_sequence_size)
assert self.batch_job_slice == 0 or self.batch_job_slice>=job_sequence_size
assert self.batch_job_slice == 0 or self.batch_job_slice>=job_sequence_size

if self.build_sjf:
done = False
while not done:
# randomly sample a sequence of jobs from workload (self.start_idx_last_reset + 1) % (self.loads.size() - 2 * job_sequence_size
if self.batch_job_slice == 0:
self.start = self.np_random.randint(job_sequence_size, (self.loads.size() - job_sequence_size - 1))
else:
self.start = self.np_random.randint(job_sequence_size, (self.batch_job_slice - job_sequence_size - 1))

if self.sjf_scores[self.start] > 10 and self.sjf_scores[self.start] < 150:
done = True
else:
if self.batch_job_slice == 0:
self.start = self.np_random.randint(job_sequence_size, (self.loads.size() - job_sequence_size - 1))
else:
self.start = self.np_random.randint(job_sequence_size, (self.batch_job_slice - job_sequence_size - 1))

if self.sjf_scores[self.start] > 10 and self.sjf_scores[self.start] < 150:
done = True

self.start_idx_last_reset = self.start
self.num_job_in_batch = job_sequence_size
self.last_job_in_batch = self.start + self.num_job_in_batch
Expand Down Expand Up @@ -903,6 +912,6 @@ def step_for_test(self, a):
current_dir = os.getcwd()
workload_file = os.path.join(current_dir, args.workload)

env = HPCEnv(batch_job_slice=100)
env = HPCEnv(batch_job_slice=100, build_sjf=True)
env.seed(0)
env.my_init(workload_file=workload_file, sched_file=workload_file)
2 changes: 1 addition & 1 deletion compare-pick-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def run_policy(env, get_probs, get_out, nums, iters, score_type):

# initialize the environment from scratch
env = HPCEnv(shuffle=args.shuffle, backfil=args.backfil, skip=args.skip, job_score_type=args.score_type,
batch_job_slice=args.batch_job_slice)
batch_job_slice=args.batch_job_slice, build_sjf=False)
env.my_init(workload_file=workload_file)
env.seed(args.seed)

Expand Down
2 changes: 1 addition & 1 deletion ppo-pick-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def ppo(workload_file, model_path, ac_kwargs=dict(), seed=0,
tf.set_random_seed(seed)
np.random.seed(seed)

env = HPCEnv(shuffle=shuffle, backfil=backfil, skip=skip, job_score_type=score_type, batch_job_slice=batch_job_slice)
env = HPCEnv(shuffle=shuffle, backfil=backfil, skip=skip, job_score_type=score_type, batch_job_slice=batch_job_slice, build_sjf=True)
env.seed(seed)
env.my_init(workload_file=workload_file, sched_file=model_path)

Expand Down

0 comments on commit ebc7481

Please sign in to comment.