forked from msr-fiddle/blox
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfifo_pmfirst.py
201 lines (179 loc) · 7.19 KB
/
fifo_pmfirst.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import os
import time
import warnings
import sys
import copy
import argparse
import logging
warnings.simplefilter(action="ignore", category=FutureWarning)
# from job_admission_policy import accept_all
import schedulers
from placement import *
import admission_control
# from acceptance_policy import load_based_accept
from blox import ClusterState, JobState, BloxManager
import blox.utils as utils
# Define logging configurations
def setup_logging():
log_file = f'/scratch1/08503/rnjain/blox-pal/logs/job-runs/fifo_pmfirst_debug.log'
logging.basicConfig(filename=log_file, level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def parse_args(parser):
"""
parser : argparse.ArgumentParser
return a parser with arguments
"""
parser.add_argument(
"--scheduler", default="Fifo", type=str, help="Name of the scheduling strategy"
)
parser.add_argument(
"--scheduler-name",
default="Fifo",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--placement-name",
default="PM-First",
type=str,
help="Name of the scheduling strategy",
)
parser.add_argument(
"--acceptance-policy",
default="accept_all",
type=str,
help="Name of acceptance policy",
)
parser.add_argument(
"--plot", action="store_true", default=False, help="Plot metrics"
)
parser.add_argument(
"--exp-prefix", type=str, help="Unique name for prefix over log files"
)
parser.add_argument("--load", type=int, help="Number of jobs per hour")
parser.add_argument("--simulate", action="store_true", help="Enable Simulation")
parser.add_argument(
"--round-duration", type=int, default=30, help="Round duration in seconds"
)
parser.add_argument(
"--start-id-track", type=int, default=3000, help="Starting ID to track"
)
parser.add_argument(
"--stop-id-track", type=int, default=4000, help="Stop ID to track"
)
parser.add_argument(
"--node-manager-port", default=50052, type=int, help="Node Manager RPC port"
)
parser.add_argument(
"--central-scheduler-port",
default=50051,
type=int,
help="Central Scheduler RPC Port",
)
parser.add_argument(
"--simulator-rpc-port",
default=50050,
type=int,
help="Simulator RPC port to fetch ",
)
args = parser.parse_args()
return args
def main(args):
# The scheduler runs PAL placement, with fifo scheduler and accept all admission control policy
placement_policy = PMFirstPlacement(args)
# Running FIFO scheduling policy
scheduling_policy = schedulers.Fifo(args)
admission_policy = admission_control.acceptAll(args)
# if args.simulate:
# for simulation we get the config from the simulator
# The config helps in providing file names and intialize
blox_instance = BloxManager(args)
if args.simulate:
new_config = blox_instance.rmserver.get_new_sim_config()
print(f"New config {new_config}")
if args.scheduler_name == "":
# terminate the blox instance before exiting
# if no scheduler provided break
blox_instance.terminate_server()
print("No Config Sent")
sys.exit()
blox_instance.scheduler_name = new_config["scheduler"]
blox_instance.load = new_config["load"]
args.scheduler_name = new_config["scheduler"]
args.load = new_config["load"]
args.start_id_track = new_config["start_id_track"]
args.stop_id_track = new_config["stop_id_track"]
print(
f"Running Scheduler {args.scheduler_name}\nLoad {args.load} \n Placement Policy {args.placement_name} \nAcceptance Policy {args.acceptance_policy} \nTracking jobs from {args.start_id_track} to {args.stop_id_track}"
)
blox_instance.reset(args)
cluster_state = ClusterState(args)
job_state = JobState(args)
os.environ["sched_policy"] = args.scheduler_name
os.environ["sched_load"] = str(args.load)
simulator_time = 0
while True:
# get new nodes for the cluster
if blox_instance.terminate:
blox_instance.terminate_server()
print("Terminate current config {}".format(args))
break
blox_instance.update_cluster(cluster_state)
print("State of cluster {}".format(cluster_state.gpu_df))
logging.info("State of cluster {}".format(cluster_state.gpu_df))
new_jobs = blox_instance.pop_wait_queue(args.simulate)
# get simulator jobs
accepted_jobs = admission_policy.accept(new_jobs, cluster_state, job_state)
job_state.add_new_jobs(accepted_jobs)
print("Jobs in cluster {}".format(job_state.active_jobs))
if blox_instance.first_submit_time is None:
for jid in job_state.active_jobs:
if job_state.active_jobs[jid]["job_id"] == 0:
blox_instance.first_submit_time = job_state.active_jobs[jid]["submit_time"]
# # perform scheduler based pruning of jobs
# remove_jobs = utils.prune_jobs_based_on_runtime(
# job_state, cluster_state, blox_instance
# )
# # actually kill jobs
# print("Remove Jobs {}".format(remove_jobs))
# blox_instance.exec_jobs([], remove_jobs, cluster_state, job_state)
# # modify state to account for removal of jobs
# utils.remove_post_termination(remove_jobs, job_state, cluster_state)
# perform scheduling
new_job_schedule = scheduling_policy.schedule(job_state, cluster_state)
# get placement
print("Job schedule {}".format(new_job_schedule))
to_suspend, to_launch = placement_policy.place(
job_state, cluster_state, new_job_schedule
)
logging.info(f"To Suspend Jobs: {to_suspend}")
logging.info(f"To Launch Jobs: {to_launch}")
print("Jobs to suspend {}".format(to_suspend))
print("Jobs to launch {}".format(to_launch))
utils.collect_custom_metrics(
job_state, cluster_state, {"num_preemptions": len(to_suspend)}
)
# collect cluster level metrics
utils.collect_cluster_job_metrics(job_state, cluster_state)
# check if we have finished every job to track
#utils.track_finished_jobs(job_state, cluster_state, blox_instance)
blox_instance.last_round_time = time.time()
# execute jobs
blox_instance.exec_jobs(to_launch, to_suspend, cluster_state, job_state)
# update time
simulator_time += args.round_duration
# if args.simulate:
job_state.time += args.round_duration
cluster_state.time += args.round_duration
blox_instance.time += args.round_duration
if not args.simulate:
print("Time sleep")
time.sleep(args.round_duration)
# update metrics and prune jobs - get rid of finished jobs
logging.info("Calling update metrics")
blox_instance.update_metrics(cluster_state, job_state)
if __name__ == "__main__":
args = parse_args(
argparse.ArgumentParser(description="Arguments for Starting the scheduler")
)
setup_logging()
main(args)