-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RPC] Fetch TPCH profiled info for application at runtime #92
Changes from 3 commits
a2f65ed
10728a9
8ddd26c
df94875
df446c0
31c3f15
063f01f
c99b12d
c34b55c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -19,6 +19,7 @@ | |||||
|
||||||
from schedulers import EDFScheduler, FIFOScheduler | ||||||
from utils import EventTime, setup_logging | ||||||
from utils_tpch import get_all_stage_info_for_query | ||||||
from workers import Worker, WorkerPool, WorkerPools | ||||||
from workload import ( | ||||||
ExecutionStrategies, | ||||||
|
@@ -366,11 +367,41 @@ async def RegisterTaskGraph(self, request, context): | |||||
f"already registered!", | ||||||
num_executors=0, | ||||||
) | ||||||
|
||||||
self._logger.info( | ||||||
"Attempting to register application ID %s with name %s", | ||||||
request.id, | ||||||
request.name, | ||||||
) | ||||||
# Check if query is from TPC-H workload. | ||||||
# If yes, retrieve profiled slots and runtime info. If no, use default values | ||||||
is_tpch_query = False | ||||||
tpch_query_all_stage_info = None | ||||||
if(request.name.startswith("TPCH_")): | ||||||
is_tpch_query = True | ||||||
# retrieve tasks-per-stage and runtime info based on query number | ||||||
tpch_query_num = request.name.split("TPCH_Q",1)[1] | ||||||
tpch_query_all_stage_info = get_all_stage_info_for_query(tpch_query_num) | ||||||
|
||||||
# Construct all the Tasks for the TaskGraph. | ||||||
task_ids_to_task: Mapping[int, Task] = {} | ||||||
default_resource = Resources(resource_vector={ | ||||||
Resource(name="Slot_CPU", _id="any"): 30 | ||||||
} | ||||||
) | ||||||
default_runtime = EventTime(20, EventTime.Unit.US) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Do you have any runtimes that are lower than a second? |
||||||
|
||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you using this in the code below? |
||||||
for task_dependency in request.dependencies: | ||||||
framework_task = task_dependency.key | ||||||
if is_tpch_query: | ||||||
task_slots = tpch_query_all_stage_info[framework_task.id]["num_tasks"] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explain how you're matching the correct stage with its actual resource usage and runtime here? |
||||||
task_runtime = tpch_query_all_stage_info[framework_task.id]["avg_task_duration"] | ||||||
self._logger.info( | ||||||
"Creating Task for TPCH stage: %s with tasks: %s and avg runtime: %s", | ||||||
framework_task.id, | ||||||
task_slots, | ||||||
task_runtime, | ||||||
) | ||||||
task_ids_to_task[framework_task.id] = Task( | ||||||
name=framework_task.name, | ||||||
task_graph=request.id, | ||||||
|
@@ -380,17 +411,22 @@ async def RegisterTaskGraph(self, request, context): | |||||
name=f"WorkProfile_{framework_task.name}", | ||||||
execution_strategies=ExecutionStrategies( | ||||||
[ | ||||||
# TODO (Sukrit): Find the actual resource requirements | ||||||
# for the particular TaskGraph, along with the expected | ||||||
# runtime and set it here. | ||||||
ExecutionStrategy( | ||||||
resources=Resources( | ||||||
resource_vector={ | ||||||
Resource(name="Slot_CPU", _id="any"): 30 | ||||||
} | ||||||
) if not is_tpch_query else Resources( | ||||||
resource_vector={ | ||||||
Resource(name="Slot_CPU", _id="any"): task_slots | ||||||
} | ||||||
), | ||||||
batch_size=1, | ||||||
runtime=EventTime(20, EventTime.Unit.US), | ||||||
runtime=EventTime( | ||||||
20, EventTime.Unit.US | ||||||
) if not is_tpch_query else EventTime( | ||||||
task_runtime, EventTime.Unit.US | ||||||
), | ||||||
) | ||||||
] | ||||||
), | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
# Code adopted from decima-sim to use their profiles of TPC-H queries | ||
|
||
import numpy as np | ||
import os | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The build is failing, please look at the errors and resolve them. |
||
|
||
HOME_TPCH_DIR = "../profiles/workload/tpch_decima/" | ||
TPCH_SUBDIR = "2g/" | ||
|
||
class SetWithCount(object): | ||
""" | ||
allow duplication in set | ||
""" | ||
def __init__(self): | ||
self.set = {} | ||
|
||
def __contains__(self, item): | ||
return item in self.set | ||
|
||
def add(self, item): | ||
if item in self.set: | ||
self.set[item] += 1 | ||
else: | ||
self.set[item] = 1 | ||
|
||
def clear(self): | ||
self.set.clear() | ||
|
||
def remove(self, item): | ||
self.set[item] -= 1 | ||
if self.set[item] == 0: | ||
del self.set[item] | ||
|
||
def pre_process_task_duration(task_duration): | ||
# remove fresh durations from first wave | ||
clean_first_wave = {} | ||
for e in task_duration['first_wave']: | ||
clean_first_wave[e] = [] | ||
fresh_durations = SetWithCount() | ||
# O(1) access | ||
for d in task_duration['fresh_durations'][e]: | ||
fresh_durations.add(d) | ||
for d in task_duration['first_wave'][e]: | ||
if d not in fresh_durations: | ||
clean_first_wave[e].append(d) | ||
else: | ||
# prevent duplicated fresh duration blocking first wave | ||
fresh_durations.remove(d) | ||
|
||
|
||
def get_all_stage_info_for_query(query_num): | ||
task_durations = np.load(os.path.join(HOME_TPCH_DIR, TPCH_SUBDIR,'task_duration_' + str(query_num) + '.npy'), | ||
allow_pickle=True).item() | ||
|
||
num_nodes = len(task_durations) | ||
|
||
stage_info = {} | ||
|
||
for n in range(num_nodes): | ||
task_duration = task_durations[n] | ||
e = next(iter(task_duration['first_wave'])) | ||
# NOTE: somehow only picks the first element {2: [n_tasks_in_ms]} | ||
|
||
num_tasks = len(task_duration['first_wave'][e]) + \ | ||
len(task_duration['rest_wave'][e]) | ||
|
||
# remove fresh duration from first wave duration | ||
# drag nearest neighbor first wave duration to empty spots | ||
pre_process_task_duration(task_duration) | ||
rough_duration = np.mean( | ||
[i for l in task_duration['first_wave'].values() for i in l] + \ | ||
[i for l in task_duration['rest_wave'].values() for i in l] + \ | ||
[i for l in task_duration['fresh_durations'].values() for i in l]) | ||
|
||
curr_stage = {"stage_id": n, "num_tasks": num_tasks, "avg_task_duration": round(rough_duration)} | ||
stage_info[n] = curr_stage | ||
|
||
return stage_info |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making this a top-level
utils_tpch.py
, can you make a folder insiderpc
namedtpch
and name itutils.py
inside?Please also include the
*.npy
files that we are using for initial testing from Decima.