Skip to content

Commit

Permalink
Merge branch 'port-flowlessly' into flow-scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
ruizehung committed Oct 24, 2023
2 parents 6289ad7 + 321c3ca commit f70df02
Show file tree
Hide file tree
Showing 34 changed files with 4,011 additions and 2,184 deletions.
2 changes: 1 addition & 1 deletion analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from utils import STATS_FUNCTIONS, log_statistics, setup_logging

# The formats that the Chrome trace can be output to.
TRACE_FORMATS = ("task", "taskgraph", "application", "resource")
TRACE_FORMATS = ("task", "taskgraph", "application", "resource", "scheduler")

# The formats that the task stats can be output to.
TASK_STATS_FORMATS = ("basic", "detailed")
Expand Down
27 changes: 27 additions & 0 deletions configs/new_av_workload.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Output configs.
--log=./hetero_task_dag_aware_period_30_d_320_invoc_10.log
--log_level=debug
--csv=./hetero_task_dag_aware_period_30_d_320_invoc_10.csv

# Task configs.
--runtime_variance=0

# Scheduler configs.
#--scheduler=TetriSched_Gurobi
--scheduler=TetriSched_Gurobi

--scheduler_runtime=0
--enforce_deadlines
--retract_schedules
--release_taskgraphs
--noscheduler_enable_batching
# --scheduler_log_times=0
# --scheduler_log_times=53
--drop_skipped_tasks
# --scheduler_plan_ahead=7000
--scheduler_time_discretization=1

# Execution mode configs.
--execution_mode=yaml
--workload_profile_path=./profiles/workload/new_av_workload.yaml
--worker_profile_path=./profiles/workers/new_av_worker_profile.json
26 changes: 26 additions & 0 deletions configs/new_av_workload_dag_unaware.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Output configs.
--log=./hetero_task_dag_unaware_period_30_d_320_invoc_10.log
--log_level=debug
--csv=./hetero_task_dag_unaware_period_30_d_320_invoc_10.csv

# Task configs.
--runtime_variance=0

# Scheduler configs.
#--scheduler=TetriSched_Gurobi
--scheduler=TetriSched_Gurobi
--scheduler_runtime=0
--enforce_deadlines
--retract_schedules
# --release_taskgraphs
--noscheduler_enable_batching
# --scheduler_log_times=0
# --scheduler_log_times=0
--drop_skipped_tasks
# --scheduler_plan_ahead=7000
--scheduler_time_discretization=1

# Execution mode configs.
--execution_mode=yaml
--workload_profile_path=./profiles/workload/new_av_workload.yaml
--worker_profile_path=./profiles/workers/new_av_worker_profile.json
296 changes: 163 additions & 133 deletions data/csv_reader.py

Large diffs are not rendered by default.

55 changes: 48 additions & 7 deletions data/csv_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections import namedtuple
from functools import total_ordering
from typing import Mapping, Optional, Sequence
from typing import List, Mapping, Optional, Sequence

Resource = namedtuple("Resource", ["name", "id", "quantity"])
WorkerPoolStats = namedtuple(
Expand All @@ -23,14 +23,25 @@ def __init__(self, name: str, id: str, resources: Sequence[Resource]):
class Placement(object):
def __init__(
self,
name: str,
timestamp: int,
task_id: str,
task_graph: str,
placement_time: int,
deadline: int,
worker_pool: WorkerPool,
resources_used: Sequence[Resource],
resources_used: Sequence[Resource] = [],
completion_time: Optional[int] = None,
):
self.task_name = name
self.timestamp = timestamp
self.id = task_id
self.task_graph = task_graph
self.placement_time = placement_time
self.deadline = deadline
self.worker_pool = worker_pool
self.resources_used = resources_used
self.completion_time = None
self.completion_time = completion_time


@total_ordering
Expand Down Expand Up @@ -122,6 +133,11 @@ def update_placement(

placement_time = int(csv_reading[0])
placement = Placement(
name=self.name,
timestamp=self.timestamp,
task_id=self.id,
task_graph=self.task_graph,
deadline=self.deadline,
placement_time=placement_time,
worker_pool=worker_pools[csv_reading[6]],
resources_used=[
Expand Down Expand Up @@ -173,7 +189,12 @@ def update_migration(
csv_reading[1] == "TASK_MIGRATED"
), f"The event {csv_reading[1]} was not of type TASK_MIGRATED."
placement = Placement(
name=self.name,
timestamp=self.timestamp,
task_id=self.id,
task_graph=self.task_graph,
placement_time=int(csv_reading[0]),
deadline=self.deadline,
worker_pool=worker_pools[csv_reading[5]],
resources_used=[
Resource(*csv_reading[i : i + 3]) for i in range(7, len(csv_reading), 3)
Expand Down Expand Up @@ -260,8 +281,11 @@ def __init__(
self.end_time = None
self.runtime = None
self.true_runtime = None
self.placed_tasks = None
self.unplaced_tasks = None
self.num_placed_tasks = None
self.num_unplaced_tasks = None

# Values updated from TASK_SCHEDULED events.
self.task_placements = []

def update_finish(self, csv_reading: str):
"""Updates the values of the Scheduler based on the SCHEDULER_FINISHED event
Expand All @@ -278,10 +302,27 @@ def update_finish(self, csv_reading: str):
), f"The Scheduler at {self.start_time} was already finished."
self.end_time = int(csv_reading[0])
self.runtime = int(csv_reading[2])
self.placed_tasks = int(csv_reading[3])
self.unplaced_tasks = int(csv_reading[4])
self.num_placed_tasks = int(csv_reading[3])
self.num_unplaced_tasks = int(csv_reading[4])
self.true_runtime = int(csv_reading[5])

def update_task_schedule(self, csv_reading: List[str]):
assert (
csv_reading[1] == "TASK_SCHEDULED"
), f"Event {csv_reading[1]} is not of type TASK_SCHEDULED."
self.task_placements.append(
Placement(
name=csv_reading[2],
timestamp=int(csv_reading[4]),
task_id=csv_reading[5],
task_graph=csv_reading[3],
placement_time=int(csv_reading[7]),
deadline=csv_reading[6],
worker_pool=csv_reading[8],
completion_time=int(csv_reading[7]) + int(csv_reading[9]),
)
)


class Simulator(object):
def __init__(self, csv_path: str, start_time: int, total_tasks: int):
Expand Down
16 changes: 16 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import random
import sys

Expand Down Expand Up @@ -55,6 +56,13 @@
flags.DEFINE_string(
"log_file_name", None, "Name of the file to log the results to.", short_name="log"
)
flags.DEFINE_enum(
"log_file_mode",
"write",
["append", "write"],
"Sets the mode in which the log file is opened. If 'append', the log file is "
"opened in append mode, and if 'write', the log file is opened in write mode. ",
)
flags.DEFINE_string(
"csv_file_name",
None,
Expand Down Expand Up @@ -344,6 +352,13 @@ def main(args):
"""Main loop that loads the data from the given profile paths, and
runs the Simulator on the data with the given scheduler.
"""
if FLAGS.log_file_mode == "write":
# Delete the prior log file if it exists.
if os.path.exists(FLAGS.log_file_name):
os.remove(FLAGS.log_file_name)
if os.path.exists(FLAGS.csv_file_name):
os.remove(FLAGS.csv_file_name)

random.seed(FLAGS.random_seed)
logger = setup_logging(
name=__name__,
Expand Down Expand Up @@ -542,6 +557,7 @@ def main(args):
time_discretization=EventTime(
FLAGS.scheduler_time_discretization, EventTime.Unit.US
),
log_to_file=FLAGS.scheduler_log_to_file,
_flags=FLAGS,
)
elif FLAGS.scheduler == "Flow":
Expand Down
16 changes: 16 additions & 0 deletions profiles/workers/new_av_worker_profile.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"name": "WorkerPool_1",
"workers": [
{
"name": "Worker_1_1",
"resources": [
{
"name": "Slot",
"quantity": 2
}
]
}
]
}
]
56 changes: 56 additions & 0 deletions profiles/workload/new_av_workload.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Implementation of a simple AV workload for testing the ILP soltuions.
# This workload involves a straight-line invocation of the major
# components of an AV and is being used to understand how the ILP
# manages to solve the happens-before dependency problem.
graphs:
- name: AutonomousVehicle
graph:
- name: Sensor
work_profile: SensorProfile
children: ["Perception"]
- name: Perception
work_profile: PerceptionProfile
children: ["Prediction"]
- name: Prediction
work_profile: PredictionProfile
children: ["Planning"]
- name: Planning
work_profile: PlanningProfile
children: ["Control"]
- name: Control
work_profile: ControlProfile
release_policy: fixed
period: 30 # In us for now.
invocations: 10
deadline_variance: [320, 320]
profiles:
- name: SensorProfile
execution_strategies:
- batch_size: 1
runtime: 10
resource_requirements:
Slot:any: 1
- name: PerceptionProfile
execution_strategies:
- batch_size: 1
runtime: 10
resource_requirements:
Slot:any: 1
- name: PredictionProfile
execution_strategies:
- batch_size: 1
runtime: 40
resource_requirements:
Slot:any: 2
- name: PlanningProfile
execution_strategies:
- batch_size: 1
runtime: 10
resource_requirements:
Slot:any: 1
- name: ControlProfile
execution_strategies:
- batch_size: 1
runtime: 10
resource_requirements:
Slot:any: 1
Loading

0 comments on commit f70df02

Please sign in to comment.