Skip to content

Commit

Permalink
[RPC] Fetch TPCH profiled info for application at runtime (#92)
Browse files Browse the repository at this point in the history
* [WIP] Fetch tpch profiled info for application at runtime

* Clean up utils_tpch.py a bit

* added nx based graph structure check and map to profiled tpch stage_id

* Adding npy files with tpch-decima profiled data

* Attempt to fix import order to pass build

* formatting files using black formatter

* fix formatting for imports using isort

* fixes using flake8 linting
  • Loading branch information
dhruvsgarg authored Mar 2, 2024
1 parent d12947b commit 388f39f
Show file tree
Hide file tree
Showing 157 changed files with 265 additions and 8 deletions.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added profiles/workload/tpch_decima/2g/task_duration_6.npy
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
27 changes: 27 additions & 0 deletions profiles/workload/tpch_decima/query_dag.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"workload_type": "tpch",
"query_number": {
"1": "[(0, 1), (1, 2)]",
"2": "[(0, 2), (0, 9), (1, 2), (2, 4), (3, 4), (4, 7), (5, 7), (5, 13), (6, 15), (7, 15), (8, 9), (9, 11), (10, 11), (11, 13), (12, 14), (13, 14), (14, 16), (15, 16)]",
"3": "[(0, 2), (1, 2), (2, 4), (3, 4)]",
"4": "[(0, 2), (1, 2), (2, 3), (3, 4)]",
"5": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]",
"6": "[(0, 1)]",
"7": "[(0, 2), (0, 5), (1, 2), (2, 8), (3, 8), (4, 5), (5, 7), (6, 7), (7, 9), (8, 9), (9, 10), (10, 11)]",
"8": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 13), (5, 13), (6, 8), (7, 8), (8, 10), (9, 10), (10, 12), (11, 12), (12, 14), (13, 14), (14, 15), (15, 16)]",
"9": "[(0, 5), (1, 5), (2, 4), (3, 4), (4, 6), (5, 6), (6, 8), (7, 8), (8, 10), (9, 10), (10, 11), (11, 12)]",
"10": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 6), (5, 6), (6, 7)]",
"11": "",
"12": "[(0, 2), (1, 2), (2, 3), (3, 4)]",
"13": "[(0, 2), (1, 2), (2, 3), (3, 4), (4, 5)]",
"14": "[(0, 2), (1, 2)]",
"15": "",
"16": "[(0, 2), (1, 2), (2, 4), (3, 4), (4, 5), (5, 6), (6, 7)]",
"17": "",
"18": "",
"19": "[(0, 2), (1, 2)]",
"20": "",
"21": "",
"22": ""
}
}
76 changes: 68 additions & 8 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import erdos_scheduler_pb2_grpc
import grpc
from absl import app, flags
from tpch_utils import get_all_stage_info_for_query, verify_and_relable_tpch_app_graph

from schedulers import EDFScheduler, FIFOScheduler
from utils import EventTime, setup_logging
Expand Down Expand Up @@ -367,10 +368,62 @@ async def RegisterTaskGraph(self, request, context):
num_executors=0,
)

self._logger.info(
"Attempting to register application ID %s with name %s",
request.id,
request.name,
)
# Check if query is from TPC-H workload.
# If yes, retrieve profiled slots and runtime info. If no, use default values
is_tpch_query = False
tpch_query_all_stage_info = None
if request.name.startswith("TPCH_"):
is_tpch_query = True
# retrieve tasks-per-stage and runtime info based on query number
tpch_query_num = request.name.split("TPCH_Q", 1)[1]
tpch_query_all_stage_info = get_all_stage_info_for_query(tpch_query_num)
same_structure, stage_id_mapping = verify_and_relable_tpch_app_graph(
query_num=tpch_query_num, dependencies=request.dependencies
)

# return failure message if not tpch app isnt of same DAG structure
if not same_structure:
self._logger.warning(
"TPCH application with ID %s and name %s couldn't be registered."
"DAG structure mismatch!",
request.id,
request.name,
)
return erdos_scheduler_pb2.RegisterTaskGraphResponse(
success=False,
message=f"TPCH application ID {request.id} with name {request.name}"
f" couldn't be registered. DAG structure mismatch!",
num_executors=0,
)

# Construct all the Tasks for the TaskGraph.
task_ids_to_task: Mapping[int, Task] = {}
default_resource = Resources(
resource_vector={Resource(name="Slot_CPU", _id="any"): 30}
)
default_runtime = EventTime(20, EventTime.Unit.US)

for task_dependency in request.dependencies:
framework_task = task_dependency.key
if is_tpch_query:
mapped_stage_id = stage_id_mapping[framework_task.id]
task_slots = tpch_query_all_stage_info[mapped_stage_id]["num_tasks"]
task_runtime = tpch_query_all_stage_info[mapped_stage_id][
"avg_task_duration"
]
self._logger.info(
"Creating Task for given app TPCH stage: %s, mapped to "
"original stage id %s, with tasks: %s and avg runtime: %s",
framework_task.id,
mapped_stage_id,
task_slots,
task_runtime,
)
task_ids_to_task[framework_task.id] = Task(
name=framework_task.name,
task_graph=request.id,
Expand All @@ -380,17 +433,24 @@ async def RegisterTaskGraph(self, request, context):
name=f"WorkProfile_{framework_task.name}",
execution_strategies=ExecutionStrategies(
[
# TODO (Sukrit): Find the actual resource requirements
# for the particular TaskGraph, along with the expected
# runtime and set it here.
ExecutionStrategy(
resources=Resources(
resource_vector={
Resource(name="Slot_CPU", _id="any"): 30
}
resources=(
default_resource
if not is_tpch_query
else Resources(
resource_vector={
Resource(
name="Slot_CPU", _id="any"
): task_slots
}
)
),
batch_size=1,
runtime=EventTime(20, EventTime.Unit.US),
runtime=(
default_runtime
if not is_tpch_query
else EventTime(task_runtime, EventTime.Unit.US)
),
)
]
),
Expand Down
170 changes: 170 additions & 0 deletions rpc/tpch_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Code adopted from decima-sim to use their profiles of TPC-H queries

import ast
import json
import os
from typing import Mapping, Sequence

import networkx as nx
import numpy as np

HOME_TPCH_DIR = "../profiles/workload/tpch_decima/"
TPCH_SUBDIR = "2g/"


class SetWithCount(object):
"""
allow duplication in set
"""

def __init__(self):
self.set = {}

def __contains__(self, item):
return item in self.set

def add(self, item):
if item in self.set:
self.set[item] += 1
else:
self.set[item] = 1

def clear(self):
self.set.clear()

def remove(self, item):
self.set[item] -= 1
if self.set[item] == 0:
del self.set[item]


def pre_process_task_duration(task_duration):
# remove fresh durations from first wave
clean_first_wave = {}
for e in task_duration["first_wave"]:
clean_first_wave[e] = []
fresh_durations = SetWithCount()
# O(1) access
for d in task_duration["fresh_durations"][e]:
fresh_durations.add(d)
for d in task_duration["first_wave"][e]:
if d not in fresh_durations:
clean_first_wave[e].append(d)
else:
# prevent duplicated fresh duration blocking first wave
fresh_durations.remove(d)


def get_all_stage_info_for_query(query_num):
task_durations = np.load(
os.path.join(
HOME_TPCH_DIR, TPCH_SUBDIR, "task_duration_" + str(query_num) + ".npy"
),
allow_pickle=True,
).item()

num_nodes = len(task_durations)

stage_info = {}

for n in range(num_nodes):
task_duration = task_durations[n]
e = next(iter(task_duration["first_wave"]))
# NOTE: somehow only picks the first element {2: [n_tasks_in_ms]}

num_tasks = len(task_duration["first_wave"][e]) + len(
task_duration["rest_wave"][e]
)

# remove fresh duration from first wave duration
# drag nearest neighbor first wave duration to empty spots
pre_process_task_duration(task_duration)
rough_duration = np.mean(
[i for t in task_duration["first_wave"].values() for i in t]
+ [i for t in task_duration["rest_wave"].values() for i in t]
+ [i for t in task_duration["fresh_durations"].values() for i in t]
)

curr_stage = {
"stage_id": n,
"num_tasks": num_tasks,
"avg_task_duration": round(rough_duration),
}
stage_info[n] = curr_stage

return stage_info


def get_base_tpch_graph_structure(query_num):
# use query_num to read string from file
with open(os.path.join(HOME_TPCH_DIR, "query_dag.json")) as f:
tpch_query_json = json.load(f)

# get query dependency from file
query_dependency = ast.literal_eval(tpch_query_json["query_number"][str(query_num)])

# convert job structure into a nx graph
base_tpch_graph = nx.DiGraph(query_dependency)

# return the job nx graph for query
return base_tpch_graph


def get_graph_from_deps(dependencies):
# parse dependencies to get it in list of tuples
# construct the TaskGraph from the dependencies.
task_graph_structure: Mapping[int, Sequence[int]] = {}
for task_dependency in dependencies:
task_graph_structure[task_dependency.key.id] = [
task_id for task_id in task_dependency.children_ids
]

# convert our TaskGraph into a nx friendly notation
all_edges_in_app = []
for parent in task_graph_structure.keys():
for child in task_graph_structure[parent]:
all_edges_in_app.append((parent, child))

# construct nx graph
given_app_tpch_graph = nx.DiGraph(all_edges_in_app)

# return the graph
return given_app_tpch_graph


def are_structurally_same(graph1, graph2):
# Step 1: Check if both graphs have the same number of vertices
if len(graph1.nodes) != len(graph2.nodes):
print(
f"DAG structure mismatch! Graph1 has "
f"{graph1.nodes} while Graph2 has {graph2.nodes}"
)
return False, None

# Step 2: Check if there exists a bijection between the vertices
# of the two graphs such that their adjacency relationships match
for mapping in nx.isomorphism.GraphMatcher(graph1, graph2).isomorphisms_iter():
# Check if the adjacency relationships match
if all(v in mapping for u, v in graph1.edges):
# graph structures match
# mapping is a dict {key=original-stage-id, val=app-stage-id}
# we reverse reversed mapping from app-stage-id to orig-stage-id
reversed_mapping = {v: k for k, v in mapping.items()}

return True, reversed_mapping

print("DAG structure mismatch! No mapping could be found")
return False, None


def verify_and_relable_tpch_app_graph(query_num, dependencies):
# get nx graphs from base tpch file and application dependencies
base_tpch_graph = get_base_tpch_graph_structure(query_num)
app_graph = get_graph_from_deps(dependencies)

# verify that the graphs are isomorphic
# returns true if same_structure, along with stage_id_mapping wrt base tpch file
same_structure, stage_id_mapping = are_structurally_same(base_tpch_graph, app_graph)

# return with stage_id_mapping back to assign correct runtime and num_executors
return same_structure, stage_id_mapping

0 comments on commit 388f39f

Please sign in to comment.