Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Service rewrite #99

Draft
wants to merge 117 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
117 commits
Select commit Hold shift + click to select a range
956afcd
Implement TPC-H data loader
1ntEgr8 Aug 29, 2024
a25fbe8
Bug fix: convert job graph to task graph
1ntEgr8 Sep 5, 2024
4d06a95
Make loop_timeout configurable
1ntEgr8 Sep 5, 2024
c756d17
Make profile path configurable
1ntEgr8 Sep 5, 2024
9f6aa8b
release time handling on workload
1ntEgr8 Sep 19, 2024
be66704
Wrap up tpch loader implementation
1ntEgr8 Sep 23, 2024
a4d0ded
scale runtime based on max number of tasks
1ntEgr8 Oct 1, 2024
8111ba6
fix bug in runtime calc
1ntEgr8 Oct 2, 2024
e172b56
rename optimization_passes flag to opt_passes
1ntEgr8 Nov 4, 2024
90e696c
add cloudlab support, fix runtime rounding bug, make rng gen match se…
1ntEgr8 Nov 4, 2024
06cf4f7
restore tpch_utils to main version
1ntEgr8 Nov 4, 2024
fcb0180
split tpch_replay config files
1ntEgr8 Nov 4, 2024
9014090
remove opt_passes flag
1ntEgr8 Nov 4, 2024
fbce571
Merge branch 'opt-passes-flag-fix' of github.com:1ntEgr8/erdos-schedu…
1ntEgr8 Nov 4, 2024
86420f3
update tpch_utils.py
1ntEgr8 Nov 4, 2024
2f09d5e
setup new service.py
1ntEgr8 Nov 4, 2024
eba9a45
update rpc proto dir hierarchy to resolve module import issue with sy…
1ntEgr8 Nov 4, 2024
e6a364a
checkout dhruv's version of service.py
1ntEgr8 Nov 4, 2024
69876ef
make workload_loader optional, make step and get_time_until_next_even…
1ntEgr8 Nov 4, 2024
d68772d
implement register/deregister framework
1ntEgr8 Nov 4, 2024
320cb34
refactor sim time calculation
1ntEgr8 Nov 4, 2024
43b8331
implement RegisterWorker
1ntEgr8 Nov 4, 2024
eccddc5
factor out __get_worker_pool
1ntEgr8 Nov 4, 2024
e94b1f1
refactor tpch loader
1ntEgr8 Nov 7, 2024
ed510ab
implement register task graph
1ntEgr8 Nov 7, 2024
81c4307
add testing for service
1ntEgr8 Nov 8, 2024
e1faa7f
implement register environment ready
1ntEgr8 Nov 8, 2024
0025f3c
init impl for get placements, readme with spark-erdos setup
Nov 7, 2024
a7f18e3
WIP: service changes to handle first tpch taskgraph
Nov 12, 2024
942ead7
Fix the tick() function to dequeue all events upto n, pass runtime_un…
Nov 13, 2024
2b0c895
refactor runtime unit setting
1ntEgr8 Nov 19, 2024
7a41bfe
reomve microsecond granularity for now, will add support later
1ntEgr8 Nov 19, 2024
6170819
rename sim_time to stime to make it less confusing
1ntEgr8 Nov 19, 2024
d76453a
refactor tick
1ntEgr8 Nov 19, 2024
35df499
update rpc service to invoke new tick, update test
1ntEgr8 Nov 19, 2024
acf6a4e
add comment explaining scheduler runtime zero setting
1ntEgr8 Nov 19, 2024
08aa855
oops forget to add comment on update workload
1ntEgr8 Nov 19, 2024
106945f
refactor naming in tpch loader
1ntEgr8 Nov 19, 2024
52828b7
fix documentation error in workload/tasks.py
1ntEgr8 Nov 19, 2024
2dac435
add support for returning current task graph placements from simulator
1ntEgr8 Nov 19, 2024
023805c
fix logging
1ntEgr8 Nov 19, 2024
0dfd34e
construct and return placements response
1ntEgr8 Nov 19, 2024
05b8f47
do not return placements if task graph is complete
1ntEgr8 Nov 20, 2024
14ec521
fix placement time bug in simulator
1ntEgr8 Nov 20, 2024
062aa5c
implement orchestrated mode
1ntEgr8 Nov 20, 2024
dfabf71
remove redundant checks
1ntEgr8 Nov 20, 2024
76117c6
enqueue scheduler start event in register task graph
1ntEgr8 Nov 21, 2024
fcf2142
bug fixes
1ntEgr8 Nov 21, 2024
a6d72e4
add a lock, haven't checked all places
1ntEgr8 Nov 21, 2024
2ec812e
Add tests for notify task completion
Nov 21, 2024
42eee94
add cancelled field to proto
1ntEgr8 Nov 21, 2024
972a020
populate cancelled field in rpc response
1ntEgr8 Nov 21, 2024
1a7e7dd
Updates to spark erdos service documentation
Nov 21, 2024
b244b7e
implement register driver
1ntEgr8 Nov 21, 2024
0afc85a
quick fixes in regsister and de-register driver
Nov 21, 2024
fd389ea
Updates to spark erdos documentation
Nov 21, 2024
8e313a9
register driver bug fix
1ntEgr8 Nov 21, 2024
ab56afb
Add override_worker_cpu_count flag
Nov 21, 2024
b6fbab2
More documentation: tpch-spark and spark-mirror related
Nov 22, 2024
977f36a
add delay in test_service for register env ready
Nov 22, 2024
d793b25
doc update for tpch spark
Nov 22, 2024
6264772
Add test to invoke getPlacements before task registration
Nov 22, 2024
bf658fe
create task graph after environment is ready
1ntEgr8 Nov 22, 2024
7873e73
update test
1ntEgr8 Nov 22, 2024
e76c3fc
Separate out internal state mgmt for driver and application
Nov 23, 2024
5891dc3
re-add impl for override_worker_cpu_count
Nov 23, 2024
25f3acd
fix: correct enqueue of task_finished and sched_start in notifyTaskCo…
Nov 23, 2024
7ad75ec
enable task cancellations to be sent back to backend
Nov 23, 2024
83d63d8
update test script to include cancel_task scenario (needs to be sped up)
Nov 23, 2024
66ec393
allow service to use different schedulers based on args
Nov 25, 2024
400ec44
format service
1ntEgr8 Nov 25, 2024
f69c0a9
add enforce deadlines flag
1ntEgr8 Nov 25, 2024
9e8ab1e
step workers during a tick
1ntEgr8 Nov 26, 2024
84d89d2
update placement time only if it is in the past
1ntEgr8 Nov 26, 2024
bf301aa
refactor file
1ntEgr8 Nov 26, 2024
0499541
document refactored simulate_f
1ntEgr8 Nov 26, 2024
9539069
handle case where task graph cannot be accommodated in worker pool
1ntEgr8 Nov 26, 2024
89ee06d
Potpourri of bug fixes and improvements
1ntEgr8 Nov 27, 2024
98a5247
Add log stats method to simulator
1ntEgr8 Nov 27, 2024
618348a
check for worker registration
1ntEgr8 Nov 27, 2024
15bf380
update documentation in service
1ntEgr8 Nov 27, 2024
0b3a661
updating the service to operate in US instead of S time unit
Nov 27, 2024
cca24d8
misc improvements
1ntEgr8 Nov 27, 2024
7387b06
override flags in service
1ntEgr8 Nov 27, 2024
1461a3c
fix workload release bug
1ntEgr8 Nov 27, 2024
209a4a3
update docs about sched time discretization
Nov 28, 2024
9104f7e
added comments about setting deadlines in generate_task_graph
Nov 28, 2024
8fd7f2a
updates to test script (verified across edf, fifo, dsched)
Nov 28, 2024
9cf701a
update documentation for the service
Nov 28, 2024
25f4a3d
correctly handle task cancellations
1ntEgr8 Nov 29, 2024
4d979c1
simulator bug fixes and improvements
1ntEgr8 Nov 29, 2024
48a9711
add launch script
1ntEgr8 Dec 2, 2024
b1942ea
add shutdown rpc method
1ntEgr8 Dec 3, 2024
2e70864
add service experiment runner
1ntEgr8 Dec 3, 2024
aeb4d6d
sleep for a bit before launching queries
1ntEgr8 Dec 3, 2024
4094e01
remove extraneous sleep
1ntEgr8 Dec 3, 2024
167ba49
set start_time of task to its placement time
Dec 3, 2024
53cc735
add testcase to verify new taskgraph termination approach
Dec 3, 2024
29e306f
nit documentation in erdos-spark setup
Dec 3, 2024
14190ad
more explainable msgs from GetPlacements
Dec 3, 2024
e2636f6
reorder event queue priority to process scheduler events before task …
Dec 3, 2024
eeb4854
improvements to experiment runner
1ntEgr8 Dec 3, 2024
cb96c3e
fix profile path in tpch loader
1ntEgr8 Dec 3, 2024
7036fcf
add spark-master-ip flag
1ntEgr8 Dec 3, 2024
bd16310
reinstate previous eventQueue priority order (task_placement before s…
Dec 4, 2024
b4aceeb
[simulator] Unschedule subtree rooted at task if task is unable to ru…
Dec 4, 2024
ce582e0
[service] log line to track tasks that get delayed in execution
Dec 4, 2024
3831f44
sleep for some time before signalling shutdown
1ntEgr8 Dec 4, 2024
2a74838
hack analyze pipeline to work with tpch output
1ntEgr8 Dec 4, 2024
f4bbe6a
[simulator] check task state before invoking unschedule on it
Dec 4, 2024
b958a8a
add support for tpch query partitioning
1ntEgr8 Dec 4, 2024
038cc7f
run_service_experiments: Log service stdout/stderr
rohanbafna Dec 14, 2024
6f84681
run_service_experiments.py: Fix --dry_run
rohanbafna Dec 14, 2024
968b158
run_service_experiments: Timestamp results folder
rohanbafna Dec 14, 2024
51d6f46
run_service_experiments: Fix hang on exception
rohanbafna Dec 14, 2024
3491d59
Spark service: Correctly log stats on shutdown
rohanbafna Dec 14, 2024
7275702
Set correct completion time for finishing Tasks
rohanbafna Dec 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ def analyze_resource_utilization(
# Plotting defaults.
# hatches = ['//', '--', '**']
# alphas = np.arange(0.2, 1.2, 0.2)
resource_color = {"GPU": "red", "CPU": "green"}
resource_color = {"Slot": "green"}

# Worker Pool statistics
worker_pool_stats = csv_reader.get_worker_pool_utilizations(scheduler_csv_file)
Expand Down Expand Up @@ -1246,16 +1246,16 @@ def log_aggregate_stats(
/ sum(stat.resource_utilizations[resource])
for stat in worker_pool_stats
]
for resource in ("GPU", "CPU")
for resource in ("Slot",)
}

scheduler_invocations = csv_reader.get_scheduler_invocations(csv_file)
placed_tasks = [
scheduler_invocation.placed_tasks
scheduler_invocation.num_placed_tasks
for scheduler_invocation in scheduler_invocations
]
unplaced_tasks = [
scheduler_invocation.unplaced_tasks
scheduler_invocation.num_unplaced_tasks
for scheduler_invocation in scheduler_invocations
]

Expand All @@ -1268,8 +1268,7 @@ def log_aggregate_stats(
placement_delay,
deadline_delay,
stat_function(e2e_response_time),
stat_function(resource_uses["GPU"]),
stat_function(resource_uses["CPU"]),
stat_function(resource_uses["Slot"]),
stat_function(placed_tasks),
stat_function(unplaced_tasks),
log_name,
Expand All @@ -1288,8 +1287,7 @@ def log_aggregate_stats(
"Placement",
"Deadline",
"JCT",
"GPU",
"CPU",
"Slot",
"Placed",
"Unplaced",
"Log",
Expand Down
38 changes: 38 additions & 0 deletions configs/tpch_replay_dsched.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Output configs.
--log=./tpch_replay_dsched.log
--log_level=debug
--csv=./tpch_replay_dsched.csv

# Task configs.
--runtime_variance=0

# Scheduler configs.

# DSched
--scheduler=TetriSched
--scheduler_runtime=0
--enforce_deadlines
--retract_schedules
--release_taskgraphs
--drop_skipped_tasks
--scheduler_time_discretization=1

# Deadline variance
--min_deadline_variance=10
--max_deadline_variance=25

# Execution mode configs.
--execution_mode=replay
--replay_trace=tpch

# Release time config.
--override_release_policy=gamma
--override_gamma_coefficient=1
--override_poisson_arrival_rate=1
--override_num_invocation=10

# TPCH flags
--random_seed=1234
--tpch_query_dag_spec=profiles/workload/tpch/queries.yaml
--tpch_dataset_size=50
--worker_profile_path=profiles/workers/tpch_cluster.yaml
47 changes: 47 additions & 0 deletions configs/tpch_replay_edf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Output configs.
# --log=./tpch_replay_dsched.log
# --log_level=debug
# --csv=./tpch_replay_dsched.csv

--log=./tpch_replay_edf.log
--log_level=debug
--csv=./tpch_replay_edf.csv

# Task configs.
--runtime_variance=0

# Scheduler configs.

# EDF
--scheduler=EDF
--scheduler_runtime=0
--enforce_deadlines

# DSched
# --scheduler=TetriSched
# --scheduler_runtime=0
# --enforce_deadlines
# --retract_schedules
# --release_taskgraphs
# --drop_skipped_tasks
# --scheduler_time_discretization=1

# Deadline variance
--min_deadline_variance=10
--max_deadline_variance=25

# Execution mode configs.
--execution_mode=replay
--replay_trace=tpch

# Release time config.
--override_release_policy=gamma
--override_gamma_coefficient=1
--override_poisson_arrival_rate=1
--override_num_invocation=10

# TPCH flags
--random_seed=1234
--tpch_query_dag_spec=profiles/workload/tpch/queries.yaml
--tpch_dataset_size=50
--worker_profile_path=profiles/workers/tpch_cluster.yaml
1 change: 1 addition & 0 deletions data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from .task_loader_benchmark import TaskLoaderBenchmark
from .task_loader_pylot import TaskLoaderPylot
from .task_loader_synthetic import TaskLoaderSynthetic
from .tpch_loader import TpchWorkloadLoader
from .worker_loader import WorkerLoader
from .worker_loader_benchmark import WorkerLoaderBenchmark
from .workload_loader import WorkloadLoader
Expand Down
5 changes: 5 additions & 0 deletions data/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def parse_events(self, readings: Mapping[str, Sequence[str]]):
)
elif reading[1] == "UPDATE_WORKLOAD":
simulator.total_tasks += int(reading[2])
elif reading[1] == "LOG_STATS":
assert (
simulator is not None
), "No SIMULATOR_START found for a corresponding SIMULATOR_END."
simulator.update_stats(reading)
elif reading[1] == "SIMULATOR_END":
assert (
simulator is not None
Expand Down
26 changes: 19 additions & 7 deletions data/csv_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,18 @@ def __init__(self, csv_path: str, start_time: int, total_tasks: int = 0):
self.scheduler_invocations: list[Scheduler] = []
self.task_graphs: dict[str, TaskGraph] = {}

def update_stats(self, csv_reading: str):
assert (
csv_reading[1] == "LOG_STATS"
), f"The event {csv_reading[1]} was not of type LOG_STATS."
self.finished_tasks = int(csv_reading[2])
self.dropped_tasks = int(csv_reading[3])
self.missed_deadlines = int(csv_reading[4])
self.finished_task_graphs = int(csv_reading[5])
self.dropped_taskgraphs = int(csv_reading[6])
self.missed_taskgraphs = int(csv_reading[7])
self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs

def update_finish(self, csv_reading: str):
"""Updates the values of the Simulator based on the SIMULATOR_END event from
CSV.
Expand All @@ -396,10 +408,10 @@ def update_finish(self, csv_reading: str):
csv_reading[1] == "SIMULATOR_END"
), f"The event {csv_reading[1]} was not of type SIMULATOR_END."
self.end_time = int(csv_reading[0])
self.finished_tasks = int(csv_reading[2])
self.dropped_tasks = int(csv_reading[3])
self.missed_deadlines = int(csv_reading[4])
self.finished_task_graphs = int(csv_reading[5])
self.dropped_taskgraphs = int(csv_reading[6])
self.missed_taskgraphs = int(csv_reading[7])
self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs
# self.finished_tasks = int(csv_reading[2])
# self.dropped_tasks = int(csv_reading[3])
# self.missed_deadlines = int(csv_reading[4])
# self.finished_task_graphs = int(csv_reading[5])
# self.dropped_taskgraphs = int(csv_reading[6])
# self.missed_taskgraphs = int(csv_reading[7])
# self.goodput_taskgraphs = self.finished_task_graphs - self.missed_taskgraphs
Loading