Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flow Scheduler #61

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ca29389
Attempt to implement Flow Scheduler
ruizehung Oct 13, 2023
8a75cc4
Add pybind11 submodule inside Flowlessly
ruizehung Oct 13, 2023
a0bc887
Update Readme FlowScheduler section
ruizehung Oct 13, 2023
f985000
Make ~AdjacencyMapGraph check id_arc.second before deleting the arc
ruizehung Oct 13, 2023
a9c06ab
Add Arc binding
ruizehung Oct 13, 2023
3ace796
Create devcontainer.json
ruizehung Oct 22, 2023
987afd6
Update readme FlowScheduler setup instruction
ruizehung Oct 22, 2023
8909b78
temp
ruizehung Oct 23, 2023
71d6c8c
Add Flow Scheduler class with incomplete implementation
ruizehung Oct 23, 2023
11af2fc
Update Readme to copy flowlessly_py.* to the root
ruizehung Oct 23, 2023
ee243e4
Refactor incomplete flow scheduler implementation. Have the pseudocod…
ruizehung Oct 23, 2023
37266a8
Add test_flowlessly.py
ruizehung Oct 23, 2023
cf41c20
Copy print flow graph to _extract_placement_from_flow_graph
ruizehung Oct 23, 2023
9d48a40
Add comment on that Flow scheduler template is copied from EDF scheduler
ruizehung Oct 23, 2023
21a7646
Use Dockerfile in dev container config
ruizehung Oct 24, 2023
7c4da26
Finish the first draft of Flow scheduler implementation
ruizehung Oct 24, 2023
61dcad1
raise ValueError if worker_pool_id is None but worker_id is not None …
ruizehung Oct 24, 2023
666b6bb
Add a space to self._logger.debug in flow scheduler
ruizehung Oct 24, 2023
30e1f71
Add flow scheduler unit test
ruizehung Oct 24, 2023
5b5cdc5
Update Readme set up FlowScheduler dependencies section
ruizehung Oct 24, 2023
657df0e
Remove unused import in flow scheduler
ruizehung Oct 24, 2023
52e02ec
Remove debug print statement from edg scheduler
ruizehung Oct 24, 2023
2df909a
Remove flowlessly_py.cpython-310-x86_64-linux-gnu.so binary from git
ruizehung Oct 24, 2023
e7f20ca
Fix main.py import order
ruizehung Oct 24, 2023
6289ad7
Remove test_flowlessly.py
ruizehung Oct 24, 2023
f70df02
Merge branch 'port-flowlessly' into flow-scheduler
ruizehung Oct 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Use the specified base image
FROM mcr.microsoft.com/devcontainers/base:jammy

# Install necessary packages and tools
RUN sudo apt update && \
sudo apt install -y make cmake pkg-config g++ libgflags-dev libgoogle-glog-dev libboost-system-dev libboost-thread-dev libboost-timer-dev python3-pip libgtest-dev vim && \
sudo apt clean && \
sudo rm -rf /var/lib/apt/lists/*

# Specify default command (this can be overridden by "postCreateCommand" in devcontainer.json)
CMD [ "/bin/bash" ]
24 changes: 24 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/ubuntu
{
"name": "Ubuntu",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
// "image": "mcr.microsoft.com/devcontainers/base:jammy",
"build": {
"dockerfile": "Dockerfile"
},
// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
"ghcr.io/devcontainers/features/sshd:1": {
"version": "latest"
}
},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "pip3 install -r requirements.txt",
// Configure tool-specific properties.
// "customizations": {},
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
path = schedulers/tetrisched/extern/pybind11
url = https://github.com/pybind/pybind11
branch = stable
[submodule "schedulers/Flowlessly/extern/pybind11"]
path = schedulers/Flowlessly/extern/pybind11
url = https://github.com/pybind/pybind11
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,21 @@ and run
python3 setup.py develop
```

### To set up FlowScheduler dependencies

#### Initialize submodules
`git submodule init && git submodule update`

#### Build Flowlessly and create python binding
```
cd schedulers/Flowlessly
mkdir build
cd build
cmake ..
make
cp flowlessly_py.* ../../../.
```

## Executing Experiments

Run the following command in order to run experiments:
Expand Down
15 changes: 15 additions & 0 deletions configs/simple_workload_flow.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Output configs.
--log=./simple_workload_flow.log
--log_level=debug
--csv=./simple_workload_flow.csv

# Scheduler configs.
--scheduler=Flow
--scheduler_runtime=0
--enforce_deadlines
--retract_schedules

# Execution mode configs.
--execution_mode=json
--workload_profile_path=./profiles/workload/simple_workload.json
--worker_profile_path=./profiles/workers/simple_workload.json
8 changes: 8 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
ClockworkScheduler,
EDFScheduler,
FIFOScheduler,
FlowScheduler,
ILPScheduler,
LSFScheduler,
TetriSchedCPLEXScheduler,
Expand Down Expand Up @@ -188,6 +189,7 @@
"TetriSched_Gurobi",
"Clockwork",
"TetriSched",
"Flow",
],
"The scheduler to use for this execution.",
)
Expand Down Expand Up @@ -558,6 +560,12 @@ def main(args):
log_to_file=FLAGS.scheduler_log_to_file,
_flags=FLAGS,
)
elif FLAGS.scheduler == "Flow":
scheduler = FlowScheduler(
preemptive=FLAGS.preemption,
runtime=EventTime(FLAGS.scheduler_runtime, EventTime.Unit.US),
_flags=FLAGS,
)
else:
raise ValueError(
"Unsupported scheduler implementation: {}".format(FLAGS.scheduler)
Expand Down
2 changes: 1 addition & 1 deletion schedulers/Flowlessly/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ ext/*
*.fdb_latexmk
*.fls
*.log
*.out
*.out
17 changes: 17 additions & 0 deletions schedulers/Flowlessly/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,20 @@ include(DoxyTargets)
enable_testing()

subdirs(src)

# Compilation for the Python bindings.
# Define a Python frontend tetrisched_py and add the relevant sources.
project(flowlessly_py LANGUAGES CXX)
add_subdirectory(extern/pybind11)
set(PYBIND_SRC
src/python/Flowlessly.cpp
)

pybind11_add_module(flowlessly_py ${PYBIND_SRC}
$<TARGET_OBJECTS:misc>
$<TARGET_OBJECTS:graphs>
$<TARGET_OBJECTS:solvers>
)
target_link_libraries(flowlessly_py PRIVATE gflags glog boost_chrono
boost_system boost_thread boost_timer)

1 change: 1 addition & 0 deletions schedulers/Flowlessly/cmake/ExternalDependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ find_package(GLog REQUIRED)
ExternalProject_Add(
gtest
GIT_REPOSITORY https://github.com/google/googletest.git
GIT_TAG main
TIMEOUT 10
PREFIX ${CMAKE_CURRENT_BINARY_DIR}/third_party/gtest
# no install required, we link the library from the build tree
Expand Down
1 change: 1 addition & 0 deletions schedulers/Flowlessly/extern/pybind11
Submodule pybind11 added at 0e2c3e
2 changes: 1 addition & 1 deletion schedulers/Flowlessly/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ add_dependencies(solvers gtest)

set(FLOW_SCHEDULER_SRC
flow_scheduler.cc
)
)

add_executable(flow_scheduler ${FLOW_SCHEDULER_SRC}
$<TARGET_OBJECTS:misc>
Expand Down
2 changes: 1 addition & 1 deletion schedulers/Flowlessly/src/flow_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ DEFINE_bool(debug_output, false, "Print debug graph after each iteration");
DEFINE_bool(graph_has_node_types, false, "Graph input contains node types");
DEFINE_bool(print_assignments, false, "Only print task to PUs assignments");
DEFINE_string(incremental_graphs, "", "File containing paths to the graphs");
DEFINE_bool(daemon, true, "True if the solver should run as a daemon");
DEFINE_bool(daemon, false, "True if the solver should run as a daemon");
DEFINE_int64(alpha_scaling_factor, 2,
"Value by which Eps is divided in the cost scaling algorithm");

Expand Down
5 changes: 4 additions & 1 deletion schedulers/Flowlessly/src/graphs/adjacency_map_graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ AdjacencyMapGraph::~AdjacencyMapGraph() {
// Delete every arc.
for (uint32_t node_id = 1; node_id <= max_node_id_; node_id++) {
for (auto& id_arc : arcs_[node_id]) {
delete id_arc.second;
if (id_arc.second != nullptr) {
delete id_arc.second;
id_arc.second = nullptr; // set the pointer to nullptr after deletion
}
}
}
// AdjacencyMapGraph does not own the stats_ object.
Expand Down
9 changes: 9 additions & 0 deletions schedulers/Flowlessly/src/graphs/graph.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ void Graph::ReadGraph(FILE* graph_file, bool first_scheduling_iteration,
// }
}

void Graph::ReadGraph(const std::string& file_path, bool first_scheduling_iteration, bool& end_of_scheduling) {
FILE* graph_file = fopen(file_path.c_str(), "r");
if (!graph_file) {
throw std::runtime_error("Failed to open file: " + file_path);
}
ReadGraph(graph_file, first_scheduling_iteration, &end_of_scheduling);
fclose(graph_file);
}

void Graph::WriteAssignments(const string& out_file_name) {
FILE* out_file = fopen(out_file_name.c_str(), "w");
CHECK(out_file != NULL) << "Could not open assignments file for writing: "
Expand Down
4 changes: 4 additions & 0 deletions schedulers/Flowlessly/src/graphs/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ class Graph {

void ReadGraph(FILE* graph_file, bool first_scheduling_iteration,
bool* end_of_scheduling);

// Add this function so that it's easier to use from Python
void ReadGraph(const std::string& file_path, bool first_scheduling_iteration, bool& end_of_scheduling);


/**
* Removes the given arc from the graph. It updates the supply at the source
Expand Down
94 changes: 94 additions & 0 deletions schedulers/Flowlessly/src/python/Flowlessly.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h> // Required for converting STL types to Python types

#include <boost/thread/latch.hpp>

#include "graphs/graph.h"
#include "graphs/adjacency_map_graph.h"
#include "solvers/successive_shortest.h"

DEFINE_bool(graph_has_node_types, false, "Graph input contains node types");

namespace py = pybind11;

PYBIND11_MODULE(flowlessly_py, flowlessly_m) {
flowlessly_m.doc() = "Python API for Flowlessly."; // optional module docstring

py::class_<flowlessly::Statistics>(flowlessly_m, "Statistics")
.def(py::init<>());

// Define the NodeType enum.
py::enum_<flowlessly::NodeType>(flowlessly_m, "NodeType")
.value("OTHER", flowlessly::NodeType::OTHER)
.value("TASK", flowlessly::NodeType::TASK)
.value("PU", flowlessly::NodeType::PU)
.value("SINK", flowlessly::NodeType::SINK)
.value("MACHINE", flowlessly::NodeType::MACHINE)
.value("INTERMEDIATE_RES", flowlessly::NodeType::INTERMEDIATE_RES)
.export_values();

py::enum_<flowlessly::NodeStatus>(flowlessly_m, "NodeStatus")
.value("NOT_VISITED", flowlessly::NodeStatus::NOT_VISITED)
.value("VISITING", flowlessly::NodeStatus::VISITING)
.value("VISITED", flowlessly::NodeStatus::VISITED)
.export_values();

py::class_<flowlessly::Node>(flowlessly_m, "Node")
.def(py::init<>()) // Default constructor
.def(py::init<const flowlessly::Node &>()) // Copy constructor
.def_readwrite("supply", &flowlessly::Node::supply)
.def_readwrite("potential", &flowlessly::Node::potential)
.def_readwrite("distance", &flowlessly::Node::distance)
.def_readwrite("type", &flowlessly::Node::type)
.def_readwrite("status", &flowlessly::Node::status);

py::class_<flowlessly::Arc>(flowlessly_m, "Arc")
.def(py::init<>()) // Default constructor
.def(py::init<uint32_t, uint32_t, bool, bool, int32_t, int32_t, int64_t, flowlessly::Arc*>()) // Parameterized constructor
.def(py::init<const flowlessly::Arc&>()) // Copy constructor
.def(py::init<flowlessly::Arc*>()) // Constructor from a pointer
.def("CopyArc", &flowlessly::Arc::CopyArc)
.def_readwrite("src_node_id", &flowlessly::Arc::src_node_id)
.def_readwrite("dst_node_id", &flowlessly::Arc::dst_node_id)
.def_readwrite("is_alive", &flowlessly::Arc::is_alive)
.def_readwrite("is_fwd", &flowlessly::Arc::is_fwd)
.def_readwrite("is_running", &flowlessly::Arc::is_running)
.def_readwrite("residual_cap", &flowlessly::Arc::residual_cap)
.def_readwrite("min_flow", &flowlessly::Arc::min_flow)
.def_readwrite("cost", &flowlessly::Arc::cost)
.def_readwrite("reverse_arc", &flowlessly::Arc::reverse_arc);

py::class_<flowlessly::AdjacencyMapGraph>(flowlessly_m, "AdjacencyMapGraph")
.def(py::init<flowlessly::Statistics*>())
.def("ReadGraph", static_cast<void (flowlessly::AdjacencyMapGraph::*)(const std::string&, bool, bool&)> (&flowlessly::AdjacencyMapGraph::ReadGraph))
.def("AddArc", &flowlessly::AdjacencyMapGraph::AddArc, pybind11::return_value_policy::reference_internal)
.def("AddNode", static_cast<void (flowlessly::AdjacencyMapGraph::*)(uint32_t, int32_t, int64_t, flowlessly::NodeType, bool)> (&flowlessly::AdjacencyMapGraph::AddNode)) // Overloaded function
.def("AddNode", static_cast<uint32_t (flowlessly::AdjacencyMapGraph::*)(int32_t, int64_t, flowlessly::NodeType, bool)> (&flowlessly::AdjacencyMapGraph::AddNode)) // Overloaded function
.def("ChangeArc", static_cast<void (flowlessly::AdjacencyMapGraph::*)(flowlessly::Arc*, uint32_t, int32_t, int64_t, int32_t)> (&flowlessly::AdjacencyMapGraph::ChangeArc)) // Overloaded function
.def("ChangeArc", static_cast<void (flowlessly::AdjacencyMapGraph::*)(uint32_t, uint32_t, uint32_t, int32_t, int64_t, int32_t, bool, int64_t)> (&flowlessly::AdjacencyMapGraph::ChangeArc)) // Overloaded function
.def("GetMachinePUs", &flowlessly::AdjacencyMapGraph::GetMachinePUs)
.def("GetRandomArc", &flowlessly::AdjacencyMapGraph::GetRandomArc)
.def("GetTaskAssignments", &flowlessly::AdjacencyMapGraph::GetTaskAssignments)
.def("GetTotalCost", &flowlessly::AdjacencyMapGraph::GetTotalCost)
.def("InitializeGraph", &flowlessly::AdjacencyMapGraph::InitializeGraph)
.def("IsEpsOptimal", &flowlessly::AdjacencyMapGraph::IsEpsOptimal)
.def("IsFeasible", &flowlessly::AdjacencyMapGraph::IsFeasible)
.def("IsInTopologicalOrder", &flowlessly::AdjacencyMapGraph::IsInTopologicalOrder)
.def("RemoveArc", &flowlessly::AdjacencyMapGraph::RemoveArc)
.def("RemoveNode", &flowlessly::AdjacencyMapGraph::RemoveNode)
.def("WriteAssignments", &flowlessly::AdjacencyMapGraph::WriteAssignments)
.def("WriteFlowGraph", &flowlessly::AdjacencyMapGraph::WriteFlowGraph)
.def("WriteGraph", &flowlessly::AdjacencyMapGraph::WriteGraph)
.def("get_arcs", &flowlessly::AdjacencyMapGraph::get_arcs, pybind11::return_value_policy::reference_internal)
.def("get_nodes", &flowlessly::AdjacencyMapGraph::get_nodes, pybind11::return_value_policy::reference_internal)
.def("get_max_node_id", &flowlessly::AdjacencyMapGraph::get_max_node_id)
.def("get_sink_node", &flowlessly::AdjacencyMapGraph::get_sink_node)
.def("get_source_nodes", &flowlessly::AdjacencyMapGraph::get_source_nodes, py::return_value_policy::reference_internal)
.def("get_potentials", &flowlessly::AdjacencyMapGraph::get_potentials, py::return_value_policy::reference_internal);

py::class_<flowlessly::SuccessiveShortest>(flowlessly_m, "SuccessiveShortest")
.def(py::init<flowlessly::AdjacencyMapGraph*, flowlessly::Statistics*>())
.def("LogStatistics", &flowlessly::SuccessiveShortest::LogStatistics)
.def("ResetStatistics", &flowlessly::SuccessiveShortest::ResetStatistics)
.def("Run", &flowlessly::SuccessiveShortest::Run);
}
1 change: 1 addition & 0 deletions schedulers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Expose the BaseScheduler as part of the module.

Check failure on line 1 in schedulers/__init__.py

View workflow job for this annotation

GitHub Actions / Python 3.9 Build

Imports are incorrectly sorted and/or formatted.
from .base_scheduler import BaseScheduler

# Scheduler implementations
Expand All @@ -10,6 +10,7 @@
from .lsf_scheduler import LSFScheduler
from .tetrisched_cplex_scheduler import TetriSchedCPLEXScheduler
from .tetrisched_gurobi_scheduler import TetriSchedGurobiScheduler
from .flow_scheduler import FlowScheduler

try:
from .tetrisched_scheduler import TetriSchedScheduler
Expand Down
Loading
Loading