Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RPC] Fetch TPCH profiled info for application at runtime #92

Merged
merged 9 commits into from
Mar 2, 2024
44 changes: 40 additions & 4 deletions rpc/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from schedulers import EDFScheduler, FIFOScheduler
from utils import EventTime, setup_logging
from utils_tpch import get_all_stage_info_for_query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making this a top-level utils_tpch.py, can you make a folder inside rpc named tpch and name it utils.py inside?

Please also include the *.npy files that we are using for initial testing from Decima.

from workers import Worker, WorkerPool, WorkerPools
from workload import (
ExecutionStrategies,
Expand Down Expand Up @@ -366,11 +367,41 @@ async def RegisterTaskGraph(self, request, context):
f"already registered!",
num_executors=0,
)

self._logger.info(
"Attempting to register application ID %s with name %s",
request.id,
request.name,
)
# Check if query is from TPC-H workload.
# If yes, retrieve profiled slots and runtime info. If no, use default values
is_tpch_query = False
tpch_query_all_stage_info = None
if(request.name.startswith("TPCH_")):
is_tpch_query = True
# retrieve tasks-per-stage and runtime info based on query number
tpch_query_num = request.name.split("TPCH_Q",1)[1]
tpch_query_all_stage_info = get_all_stage_info_for_query(tpch_query_num)

# Construct all the Tasks for the TaskGraph.
task_ids_to_task: Mapping[int, Task] = {}
default_resource = Resources(resource_vector={
Resource(name="Slot_CPU", _id="any"): 30
}
)
default_runtime = EventTime(20, EventTime.Unit.US)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
default_runtime = EventTime(20, EventTime.Unit.US)
default_runtime = EventTime(20, EventTime.Unit.S)

Do you have any runtimes that are lower than a second?


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you using this in the code below?

for task_dependency in request.dependencies:
framework_task = task_dependency.key
if is_tpch_query:
task_slots = tpch_query_all_stage_info[framework_task.id]["num_tasks"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how you're matching the correct stage with its actual resource usage and runtime here?

task_runtime = tpch_query_all_stage_info[framework_task.id]["avg_task_duration"]
self._logger.info(
"Creating Task for TPCH stage: %s with tasks: %s and avg runtime: %s",
framework_task.id,
task_slots,
task_runtime,
)
task_ids_to_task[framework_task.id] = Task(
name=framework_task.name,
task_graph=request.id,
Expand All @@ -380,17 +411,22 @@ async def RegisterTaskGraph(self, request, context):
name=f"WorkProfile_{framework_task.name}",
execution_strategies=ExecutionStrategies(
[
# TODO (Sukrit): Find the actual resource requirements
# for the particular TaskGraph, along with the expected
# runtime and set it here.
ExecutionStrategy(
resources=Resources(
resource_vector={
Resource(name="Slot_CPU", _id="any"): 30
}
) if not is_tpch_query else Resources(
resource_vector={
Resource(name="Slot_CPU", _id="any"): task_slots
}
),
batch_size=1,
runtime=EventTime(20, EventTime.Unit.US),
runtime=EventTime(
20, EventTime.Unit.US
) if not is_tpch_query else EventTime(
task_runtime, EventTime.Unit.US
),
)
]
),
Expand Down
77 changes: 77 additions & 0 deletions utils_tpch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Code adopted from decima-sim to use their profiles of TPC-H queries

Check failure on line 1 in utils_tpch.py

View workflow job for this annotation

GitHub Actions / Python 3.9 Build

Imports are incorrectly sorted and/or formatted.

import numpy as np
import os
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The build is failing, please look at the errors and resolve them.


HOME_TPCH_DIR = "../profiles/workload/tpch_decima/"
TPCH_SUBDIR = "2g/"

class SetWithCount(object):
"""
allow duplication in set
"""
def __init__(self):
self.set = {}

def __contains__(self, item):
return item in self.set

def add(self, item):
if item in self.set:
self.set[item] += 1
else:
self.set[item] = 1

def clear(self):
self.set.clear()

def remove(self, item):
self.set[item] -= 1
if self.set[item] == 0:
del self.set[item]

def pre_process_task_duration(task_duration):
# remove fresh durations from first wave
clean_first_wave = {}
for e in task_duration['first_wave']:
clean_first_wave[e] = []
fresh_durations = SetWithCount()
# O(1) access
for d in task_duration['fresh_durations'][e]:
fresh_durations.add(d)
for d in task_duration['first_wave'][e]:
if d not in fresh_durations:
clean_first_wave[e].append(d)
else:
# prevent duplicated fresh duration blocking first wave
fresh_durations.remove(d)


def get_all_stage_info_for_query(query_num):
task_durations = np.load(os.path.join(HOME_TPCH_DIR, TPCH_SUBDIR,'task_duration_' + str(query_num) + '.npy'),
allow_pickle=True).item()

num_nodes = len(task_durations)

stage_info = {}

for n in range(num_nodes):
task_duration = task_durations[n]
e = next(iter(task_duration['first_wave']))
# NOTE: somehow only picks the first element {2: [n_tasks_in_ms]}

num_tasks = len(task_duration['first_wave'][e]) + \
len(task_duration['rest_wave'][e])

# remove fresh duration from first wave duration
# drag nearest neighbor first wave duration to empty spots
pre_process_task_duration(task_duration)
rough_duration = np.mean(
[i for l in task_duration['first_wave'].values() for i in l] + \
[i for l in task_duration['rest_wave'].values() for i in l] + \
[i for l in task_duration['fresh_durations'].values() for i in l])

curr_stage = {"stage_id": n, "num_tasks": num_tasks, "avg_task_duration": round(rough_duration)}
stage_info[n] = curr_stage

return stage_info
Loading