Skip to content

Commit

Permalink
simplify clustre to make it schedule faster
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenzhang committed Apr 10, 2020
1 parent f916511 commit 59d20b8
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 16 deletions.
35 changes: 21 additions & 14 deletions HPCSimPickJobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def __init__(self): # do nothing and return. A workaround for passing parameter
self.enable_preworkloads = False
self.pre_workloads = []

#@profile
def my_init(self, workload_file = '', sched_file = ''):
print ("loading workloads from dataset:", workload_file)
self.loads = Workloads(workload_file)
Expand Down Expand Up @@ -195,7 +196,7 @@ def refill_preworkloads(self):
self.running_jobs.append(_job)
_job.allocated_machines = self.cluster.allocate(_job.job_id, _job.request_number_of_processors)


#@profile
def reset(self):
self.cluster.reset()
self.loads.reset()
Expand Down Expand Up @@ -279,7 +280,7 @@ def reset_for_test(self, num,start):
self.current_timestamp = self.loads[self.start].submit_time
self.job_queue.append(self.loads[self.start])
self.next_arriving_job_idx = self.start + 1

#@profile
def moveforward_for_resources_backfill_greedy(self, job, scheduled_logs):
#note that this function is only called when current job can not be scheduled.
assert not self.cluster.can_allocated(job)
Expand All @@ -300,15 +301,16 @@ def moveforward_for_resources_backfill_greedy(self, job, scheduled_logs):
self.job_queue.sort(key=lambda _j: self.fcfs_score(_j))
job_queue_iter_copy = list(self.job_queue)
for _j in job_queue_iter_copy:
if self.cluster.can_allocated(_j) and (self.current_timestamp + _j.request_time) < earliest_start_time:
# we should be OK to schedule the job now
assert _j.scheduled_time == -1 # this job should never be scheduled before.
_j.scheduled_time = self.current_timestamp
_j.allocated_machines = self.cluster.allocate(_j.job_id, _j.request_number_of_processors)
self.running_jobs.append(_j)
score = (self.job_score(_j) / self.num_job_in_batch) # calculated reward
scheduled_logs[_j.job_id] = score
self.job_queue.remove(_j) # remove the job from job queue
if (self.current_timestamp + _j.request_time) < earliest_start_time:
if self.cluster.can_allocated(_j):
# we should be OK to schedule the job now
assert _j.scheduled_time == -1 # this job should never be scheduled before.
_j.scheduled_time = self.current_timestamp
_j.allocated_machines = self.cluster.allocate(_j.job_id, _j.request_number_of_processors)
self.running_jobs.append(_j)
score = (self.job_score(_j) / self.num_job_in_batch) # calculated reward
scheduled_logs[_j.job_id] = score
self.job_queue.remove(_j) # remove the job from job queue

# move to the next timestamp
assert self.running_jobs
Expand All @@ -326,6 +328,7 @@ def moveforward_for_resources_backfill_greedy(self, job, scheduled_logs):
self.cluster.release(next_resource_release_machines)
self.running_jobs.pop(0) # remove the first running job

#@profile
def schedule_curr_sequence_reset(self, score_fn):
# schedule the sequence of jobs using heuristic algorithm.
scheduled_logs = {}
Expand Down Expand Up @@ -539,6 +542,7 @@ def build_observation(self):

return vector

#@profile
def moveforward_for_resources_backfill(self, job):
#note that this function is only called when current job can not be scheduled.
assert not self.cluster.can_allocated(job)
Expand Down Expand Up @@ -601,6 +605,7 @@ def skip_for_resources(self):
self.running_jobs.pop(0) # remove the first running job.
return False

#@profile
def moveforward_for_job(self):
if self.job_queue:
return True
Expand Down Expand Up @@ -701,7 +706,8 @@ def schedule(self, job_for_scheduling):
def valid(self, a):
action = a[0]
return self.pairs[action][0]


#@profile
def step(self, a):
job_for_scheduling = self.pairs[a][0]

Expand Down Expand Up @@ -760,11 +766,12 @@ def step_for_test(self, a):
env = HPCEnv()
env.my_init(workload_file=workload_file, sched_file=workload_file)
env.seed(0)
env.reset_for_test(2048,0)

for _ in range(100):
for _ in range(1):
_, r = env.reset(), 0
while True:
_, r, d, _ = env.step(0)
_, r, d, _, _, _ = env.step(0)
if d:
print ("Final Reward:", r)
break
65 changes: 65 additions & 0 deletions cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,68 @@ def reset(self):
self.free_node = self.total_node
for m in self.all_nodes:
m.reset()
class FakeList:
def __init__(self, l):
self.len = l
def __len__(self):
return self.len

class SimpleCluster:
def __init__(self, cluster_name, node_num, num_procs_per_node):
self.name = cluster_name
self.total_node = node_num
self.free_node = node_num
self.used_node = 0
self.num_procs_per_node = num_procs_per_node
self.all_nodes = []

def feature(self):
return [self.free_node]

def can_allocated(self, job):
if job.request_number_of_nodes != -1:
if job.request_number_of_nodes > self.free_node:
return False
else:
return True

request_node = int(math.ceil(float(job.request_number_of_processors)/float(self.num_procs_per_node)))
job.request_number_of_nodes = request_node
if request_node > self.free_node:
return False
else:
return True

def allocate(self, job_id, request_num_procs):
allocated_nodes = FakeList(0)
request_node = int(math.ceil(float(request_num_procs) / float(self.num_procs_per_node)))

if request_node > self.free_node:
return []

allocated = request_node

self.used_node += allocated
self.free_node -= allocated
allocated_nodes.len = allocated
if allocated == request_node:
return allocated_nodes

print ("Error in allocation, there are enough free resources but can not allocated!")
return []

def release(self, releases):
self.used_node -= len(releases)
self.free_node += len(releases)


def is_idle(self):
if self.used_node == 0:
return True
return False

def reset(self):
self.used_node = 0
self.free_node = self.total_node

Cluster = SimpleCluster
10 changes: 8 additions & 2 deletions compare-pick-jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ def action_from_obs(o):
min_time = min([i[0] for i in lst])
result = [i[1] for i in lst if i[0]==min_time]
return result[0]

#@profile
def run_policy(env, get_probs, get_out, nums, iters):
rl_r = []
f1_r = []
Expand Down Expand Up @@ -184,12 +186,14 @@ def run_policy(env, get_probs, get_out, nums, iters):

if __name__ == '__main__':
import argparse
import time

parser = argparse.ArgumentParser()
parser.add_argument('--rlmodel', type=str, default="./data/logs/256attn/256attn_s0")
parser.add_argument('--workload', type=str, default='./data/lublin_256.swf')
parser.add_argument('--workload', type=str, default='./data/ANL-Intrepid-2009-1.swf')
parser.add_argument('--len', '-l', type=int, default=2048)
parser.add_argument('--seed', '-s', type=int, default=1)
parser.add_argument('--iter', '-i', type=int, default=10)
parser.add_argument('--iter', '-i', type=int, default=1)
args = parser.parse_args()

current_dir = os.getcwd()
Expand All @@ -203,4 +207,6 @@ def run_policy(env, get_probs, get_out, nums, iters):
env.my_init(workload_file=workload_file)
env.seed(args.seed)

start = time.time()
run_policy(env, get_probs, get_value, args.len, args.iter)
print("elapse: {}".format(time.time()-start))

0 comments on commit 59d20b8

Please sign in to comment.