-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RPC] Fetch TPCH profiled info for application at runtime #92
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a2f65ed
[WIP] Fetch tpch profiled info for application at runtime
dhruvsgarg 10728a9
Merge branch 'main' of https://github.com/erdos-project/erdos-schedul…
dhruvsgarg 8ddd26c
Clean up utils_tpch.py a bit
dhruvsgarg df94875
added nx based graph structure check and map to profiled tpch stage_id
dhruvsgarg df446c0
Adding npy files with tpch-decima profiled data
dhruvsgarg 31c3f15
Attempt to fix import order to pass build
dhruvsgarg 063f01f
formatting files using black formatter
dhruvsgarg c99b12d
fix formatting for imports using isort
dhruvsgarg c34b55c
fixes using flake8 linting
dhruvsgarg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
{ | ||
"workload_type": "tpch", | ||
"query_number": { | ||
"1": "[(0, 1), (1, 2)]", | ||
"2": "[(0, 2), (0, 9), (1, 2), (2, 4), (3, 4), (4, 7), (5, 7), (5, 13), (6, 15), (7, 15), (8, 9), (9, 11), (10, 11), (11, 13), (12, 14), (13, 14), (14, 16), (15, 16)]", | ||
"3": "[(0, 2), (1, 2), (2, 4), (3, 4)]", | ||
"4": "[(0, 2), (1, 2), (2, 3), (3, 4)]", | ||
"5": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]", | ||
"6": "[(0, 1)]", | ||
"7": "[(0, 2), (0, 5), (1, 2), (2, 8), (3, 8), (4, 5), (5, 7), (6, 7), (7, 9), (8, 9), (9, 10), (10, 11)]", | ||
"8": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 13), (5, 13), (6, 8), (7, 8), (8, 10), (9, 10), (10, 12), (11, 12), (12, 14), (13, 14), (14, 15), (15, 16)]", | ||
"9": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]", | ||
"10": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 7)]", | ||
"11": "", | ||
"12": "[(0, 2), (1, 2), (2, 3), (3, 4)]", | ||
"13": "[(0, 2), (1, 2), (2, 3), (3, 4), (4, 5)]", | ||
"14": "[(0, 2), (1, 2)]", | ||
"15": "", | ||
"16": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 5), (5, 6), (6, 7)]", | ||
"17": "", | ||
"18": "", | ||
"19": "[(0, 2), (1, 2)]", | ||
"20": "", | ||
"21": "", | ||
"22": "" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,170 @@ | ||||||
# Code adopted from decima-sim to use their profiles of TPC-H queries | ||||||
|
||||||
import ast | ||||||
import json | ||||||
import os | ||||||
from typing import Mapping, Sequence | ||||||
|
||||||
import networkx as nx | ||||||
import numpy as np | ||||||
|
||||||
HOME_TPCH_DIR = "../profiles/workload/tpch_decima/" | ||||||
TPCH_SUBDIR = "2g/" | ||||||
|
||||||
|
||||||
class SetWithCount(object): | ||||||
""" | ||||||
allow duplication in set | ||||||
""" | ||||||
|
||||||
def __init__(self): | ||||||
self.set = {} | ||||||
|
||||||
def __contains__(self, item): | ||||||
return item in self.set | ||||||
|
||||||
def add(self, item): | ||||||
if item in self.set: | ||||||
self.set[item] += 1 | ||||||
else: | ||||||
self.set[item] = 1 | ||||||
|
||||||
def clear(self): | ||||||
self.set.clear() | ||||||
|
||||||
def remove(self, item): | ||||||
self.set[item] -= 1 | ||||||
if self.set[item] == 0: | ||||||
del self.set[item] | ||||||
|
||||||
|
||||||
def pre_process_task_duration(task_duration): | ||||||
# remove fresh durations from first wave | ||||||
clean_first_wave = {} | ||||||
for e in task_duration["first_wave"]: | ||||||
clean_first_wave[e] = [] | ||||||
fresh_durations = SetWithCount() | ||||||
# O(1) access | ||||||
for d in task_duration["fresh_durations"][e]: | ||||||
fresh_durations.add(d) | ||||||
for d in task_duration["first_wave"][e]: | ||||||
if d not in fresh_durations: | ||||||
clean_first_wave[e].append(d) | ||||||
else: | ||||||
# prevent duplicated fresh duration blocking first wave | ||||||
fresh_durations.remove(d) | ||||||
|
||||||
|
||||||
def get_all_stage_info_for_query(query_num): | ||||||
task_durations = np.load( | ||||||
os.path.join( | ||||||
HOME_TPCH_DIR, TPCH_SUBDIR, "task_duration_" + str(query_num) + ".npy" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we get the We might want to run a mix of TPC-H queries from different scaling sizes. |
||||||
), | ||||||
allow_pickle=True, | ||||||
).item() | ||||||
|
||||||
num_nodes = len(task_durations) | ||||||
|
||||||
stage_info = {} | ||||||
|
||||||
for n in range(num_nodes): | ||||||
task_duration = task_durations[n] | ||||||
e = next(iter(task_duration["first_wave"])) | ||||||
# NOTE: somehow only picks the first element {2: [n_tasks_in_ms]} | ||||||
|
||||||
num_tasks = len(task_duration["first_wave"][e]) + len( | ||||||
task_duration["rest_wave"][e] | ||||||
) | ||||||
|
||||||
# remove fresh duration from first wave duration | ||||||
# drag nearest neighbor first wave duration to empty spots | ||||||
pre_process_task_duration(task_duration) | ||||||
rough_duration = np.mean( | ||||||
[i for t in task_duration["first_wave"].values() for i in t] | ||||||
+ [i for t in task_duration["rest_wave"].values() for i in t] | ||||||
+ [i for t in task_duration["fresh_durations"].values() for i in t] | ||||||
) | ||||||
|
||||||
curr_stage = { | ||||||
"stage_id": n, | ||||||
"num_tasks": num_tasks, | ||||||
"avg_task_duration": round(rough_duration), | ||||||
} | ||||||
stage_info[n] = curr_stage | ||||||
|
||||||
return stage_info | ||||||
|
||||||
|
||||||
def get_base_tpch_graph_structure(query_num): | ||||||
# use query_num to read string from file | ||||||
with open(os.path.join(HOME_TPCH_DIR, "query_dag.json")) as f: | ||||||
tpch_query_json = json.load(f) | ||||||
|
||||||
# get query dependency from file | ||||||
query_dependency = ast.literal_eval(tpch_query_json["query_number"][str(query_num)]) | ||||||
|
||||||
# convert job structure into a nx graph | ||||||
base_tpch_graph = nx.DiGraph(query_dependency) | ||||||
|
||||||
# return the job nx graph for query | ||||||
return base_tpch_graph | ||||||
|
||||||
|
||||||
def get_graph_from_deps(dependencies): | ||||||
# parse dependencies to get it in list of tuples | ||||||
# construct the TaskGraph from the dependencies. | ||||||
task_graph_structure: Mapping[int, Sequence[int]] = {} | ||||||
for task_dependency in dependencies: | ||||||
task_graph_structure[task_dependency.key.id] = [ | ||||||
task_id for task_id in task_dependency.children_ids | ||||||
] | ||||||
|
||||||
# convert our TaskGraph into a nx friendly notation | ||||||
all_edges_in_app = [] | ||||||
for parent in task_graph_structure.keys(): | ||||||
for child in task_graph_structure[parent]: | ||||||
all_edges_in_app.append((parent, child)) | ||||||
|
||||||
# construct nx graph | ||||||
given_app_tpch_graph = nx.DiGraph(all_edges_in_app) | ||||||
|
||||||
# return the graph | ||||||
return given_app_tpch_graph | ||||||
|
||||||
|
||||||
def are_structurally_same(graph1, graph2): | ||||||
# Step 1: Check if both graphs have the same number of vertices | ||||||
if len(graph1.nodes) != len(graph2.nodes): | ||||||
print( | ||||||
f"DAG structure mismatch! Graph1 has " | ||||||
f"{graph1.nodes} while Graph2 has {graph2.nodes}" | ||||||
) | ||||||
return False, None | ||||||
|
||||||
# Step 2: Check if there exists a bijection between the vertices | ||||||
# of the two graphs such that their adjacency relationships match | ||||||
for mapping in nx.isomorphism.GraphMatcher(graph1, graph2).isomorphisms_iter(): | ||||||
# Check if the adjacency relationships match | ||||||
if all(v in mapping for u, v in graph1.edges): | ||||||
# graph structures match | ||||||
# mapping is a dict {key=original-stage-id, val=app-stage-id} | ||||||
# we reverse reversed mapping from app-stage-id to orig-stage-id | ||||||
reversed_mapping = {v: k for k, v in mapping.items()} | ||||||
|
||||||
return True, reversed_mapping | ||||||
|
||||||
print("DAG structure mismatch! No mapping could be found") | ||||||
return False, None | ||||||
|
||||||
|
||||||
def verify_and_relable_tpch_app_graph(query_num, dependencies): | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Typo |
||||||
# get nx graphs from base tpch file and application dependencies | ||||||
base_tpch_graph = get_base_tpch_graph_structure(query_num) | ||||||
app_graph = get_graph_from_deps(dependencies) | ||||||
|
||||||
# verify that the graphs are isomorphic | ||||||
# returns true if same_structure, along with stage_id_mapping wrt base tpch file | ||||||
same_structure, stage_id_mapping = are_structurally_same(base_tpch_graph, app_graph) | ||||||
|
||||||
# return with stage_id_mapping back to assign correct runtime and num_executors | ||||||
return same_structure, stage_id_mapping |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have any runtimes that are lower than a second?