Skip to content

Commit

Permalink
[WIP] First implementation for a pipeline-aware linearization algorithm.
Browse files Browse the repository at this point in the history
Includes initial steps for approaching testing. Includes helper class for debugging.
  • Loading branch information
sweetpellegrino authored and niklas committed Jan 15, 2024
1 parent 02d4b01 commit c134ed7
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 2 deletions.
14 changes: 14 additions & 0 deletions src/main/java/org/apache/sysds/lops/Lop.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public enum VisitStatus {
*/
protected boolean _asynchronous = false;

/**
* Refers to the pipeline to which this lop belongs to.
* This is used for identifying parallel execution of lops.
*/
protected int _pipelineID = -1;

/**
* Estimated size for the output produced by this Lop in bytes.
*/
Expand Down Expand Up @@ -418,6 +424,14 @@ public boolean isAsynchronousOp() {
return _asynchronous;
}

public void setPipelineID(int id) {
_pipelineID = id;
}

public int getPipelineID() {
return _pipelineID;
}

public void setMemoryEstimates(double outMem, double totMem, double interMem, double bcMem) {
_outputMemEstimate = outMem;
_memEstimate = totMem;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.apache.sysds.lops.compile.linearization;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

import org.apache.sysds.lops.Lop;

//DEBUGGING PURPOSES WHILE DEVELOPING
public class DEVPrintDAG {

static void asGraphviz(String filePrefix, List<Lop> v) {

StringBuilder sb = new StringBuilder();
sb.append("digraph G {\n");

for (int i = 0; i < v.size(); i++) {
Lop l = v.get(i);

String cutted_l = l.toString().substring(0, Math.min(l.toString().length(), 75));
String toString[] = cutted_l.replaceAll("((.*?\\s){" + (2) + "})", "$1@").split("@");
String toString2 = String.join("\n", toString);
sb.append(l.getID() + " [label=\"" + l.getID() + "::" + i + "::" + l.getPipelineID() + " " + l.getType() + " " + l.getLevel() + ":\n " + toString2 +"\", style=filled, color="+ DEVPrintDAG.getColorName(l.getPipelineID()) +"];\n");

for (Lop in : l.getInputs()) {
sb.append(in.getID() + " -> " + l.getID() + ";\n");
//sb.append(l.getID() + " -> " + in.getID() + ";\n");
}
}

sb.append("}\n");

String currentProcessId = String.valueOf(ProcessHandle.current().pid());

String folderName = "graphs/graphs_" + currentProcessId;
File folder = new File(folderName);

if (!folder.exists()) {
folder.mkdirs();
}

String filename = folderName + "/" + filePrefix + "_" + v.size() + "_" + System.currentTimeMillis() + ".dot";
try {
Files.write(Paths.get(filename), sb.toString().getBytes());
} catch (Exception e) {
e.printStackTrace();
}

}

//Colors only correctly work with VSCode extension: Graphviz Interactive Preview
private static String getColorName(int value) {
String[] colors = {
"red", "orange", "yellow", "green", "blue", "indigo", "violet",
"teal", "purple", "pink", "brown", "beige", "cyan", "turquoise",
"lime", "olive", "mediumseagreen", "azure", "aquamarine", "magenta",
"silver", "gold", "goldenrod4", "sienna2", "deeppink3", "darkolivegreen", "grey52",
"darkslateblue", "darkkhaki", "cornflowerblue", "yellowgreen", "chartreuse2", "navyblue"
};

if (value < 0) {
return "lightgrey";
}

return colors[value % 32];
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ public class ILinearize {
public static Log LOG = LogFactory.getLog(ILinearize.class.getName());

public enum DagLinearization {
DEPTH_FIRST, BREADTH_FIRST, MIN_INTERMEDIATE, MAX_PARALLELIZE, AUTO
DEPTH_FIRST, BREADTH_FIRST, MIN_INTERMEDIATE, MAX_PARALLELIZE, AUTO, NA_PIPELINE;
}

public static List<Lop> linearize(List<Lop> v) {
try {
DagLinearization linearization = ConfigurationManager.getLinearizationOrder();

System.out.println("Linearization: " + linearization);

switch(linearization) {
case MAX_PARALLELIZE:
return doMaxParallelizeSort(v);
Expand All @@ -82,6 +84,8 @@ public static List<Lop> linearize(List<Lop> v) {
return doMinIntermediateSort(v);
case BREADTH_FIRST:
return doBreadthFirstSort(v);
case NA_PIPELINE:
return pipelineSort_depth(v);
case DEPTH_FIRST:
default:
return depthFirst(v);
Expand All @@ -93,6 +97,144 @@ public static List<Lop> linearize(List<Lop> v) {
}
}

/**
* Sort lops depth-first while assigning the nodes to pipelines
*
* @param v List of lops to sort
* @return Sorted list of lops with set _pipelineID on the Lop Object
*/
private static List<Lop> pipelineSort_depth(List<Lop> v) {

System.out.println("-PIPELINE START-");
System.out.println("Nodes size: " + v.size());

// Find all root nodes (nodes with no outputs)
List<Lop> roots = v.stream()
.filter(OperatorOrderingUtils::isLopRoot)
.collect(Collectors.toList());

// Initialize necessary data objects
Integer pipelineId = 0;
// Stores a resulting depth first sorted list of lops (same as in depthFirst())
ArrayList<Lop> opList = new ArrayList<>();
// Stores the pipeline ids and the corresponding lops
// for further refinement of pipeline assignements
Map<Integer, List<Lop>> pipelineList = new HashMap<>();

// Step 1: (Trival) assignment of pipeline ids to the roots
// TODO: maybe write as stream?
for (Lop r : roots) {
pipelineId = depthFirst(r, pipelineId, opList, pipelineList) + 1;
}

// DEV: printing dag after initial assignment of pipeline ids
DEVPrintDAG.asGraphviz("Step1", v);

// Step 2: Merge pipelines with only one node to another (connected) pipeline
// No heuristic is used to merge the pipelines (e.g. to the smallest connected pipeline)
// Just merge them to one of the connected pipelines, beginning with the outputs of the node

// TODO: better way to do this?
// It looks like we need to materialize the stream to a map,
// as we cannot modify the pipelineList while iterating over a stream of it???
Map<Integer, List<Lop>> pipelinesWithOneNode = pipelineList.entrySet().stream()
.filter(e -> e.getValue().size() == 1)
.collect(Collectors.toMap( e-> e.getKey(), e -> e.getValue()));

pipelinesWithOneNode.entrySet().stream().forEach(e -> {
Lop lop = e.getValue().get(0);

// Merge to an existing output node
if (lop.getOutputs().size() > 0) {
lop.setPipelineID(lop.getOutputs().get(0).getPipelineID());
// If no outputs are present, merge to an existing input node
} else if (lop.getInputs().size() > 0) {
lop.setPipelineID(lop.getInputs().get(0).getPipelineID());
}
// else (no inputs and no outputs): do nothing (unreachable node?)
// Remove the pipeline from the list of pipelines
if (lop.getOutputs().size() > 0 || lop.getInputs().size() > 0) {
System.out.println("Merging " + e.getKey() + " to " + lop.getPipelineID());
pipelineList.get(lop.getPipelineID()).add(lop);
pipelineList.remove(e.getKey());
}
});

// DEV: printing dag after merging single node pipelines

DEVPrintDAG.asGraphviz("Step2", v);

// TODO:
// Step 3: Merge small pipelines to bigger ones
// Over the list of sorted pipelines, find the best merge candidate, filtered according to connection
// Maybe use Two Pointer Algorithm to find the best merge candidate
// https://www.geeksforgeeks.org/two-pointers-technique/


// TODO: doesnt reset everything?
roots.forEach(Lop::resetVisitStatus);

System.out.println("-PIPELINE END-");

return opList;
}

//
/**
* Initial assignment of pipeline Ids to the individual lops
*
* @param v List of lops to sort
* @return Depth-first sorted list of lops with preliminary _pipelineID on the Lop Object
*/
private static int depthFirst(Lop root, int pipelineId, ArrayList<Lop> opList, Map<Integer, List<Lop>> pipelineList) {

// Abort if the node was already visited
if (root.isVisited()) {
return root.getPipelineID();
}

// Assign pipeline id to the node, given by the parent
// Set the root node as visited
root.setPipelineID(pipelineId);
root.setVisited();

// Add the root node to the pipeline list
if(pipelineList.containsKey(pipelineId)) {
pipelineList.get(pipelineId).add(root);
} else {
ArrayList<Lop> lopList = new ArrayList<>();
lopList.add(root);
pipelineList.put(pipelineId, lopList);
}

// Children as inputs, as we are traversing the lops bottom up
List<Lop> children = root.getInputs();
// If root node has only one child, use the same pipeline id as root node
if (children.size() == 1) {
Lop child = children.get(0);
// We need to find the max pipeline id of the child, because the child could branch out
pipelineId = Math.max(pipelineId, depthFirst(child, pipelineId, opList, pipelineList));
} else {
// Iteration over all children
for (int i = 0; i < children.size(); i++) {
Lop child = children.get(i);

// If the child has only one output, or all outputs are the root node, use the same pipeline id as parent
if(child.getOutputs().size() == 1 ||
(child.getOutputs().size() > 1 && child.getOutputs().stream().allMatch(o -> o == root))) {
// No need for max, because the child can only have one output
depthFirst(child, root.getPipelineID(), opList, pipelineList);
} else {
// We need to find the max pipeline id of the child, because the child could branch out
pipelineId = Math.max(pipelineId, depthFirst(child, pipelineId + 1, opList, pipelineList));
}
}
}

opList.add(root);
return pipelineId;
}

/**
* Sort lops depth-first
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class DagLinearizationTest extends AutomatedTestBase {
private final String testNames[] = {"matrixmult_dag_linearization", "csplineCG_dag_linearization",
"linear_regression_dag_linearization"};

private final String testConfigs[] = {"breadth-first", "depth-first", "min-intermediate", "max-parallelize"};
private final String testConfigs[] = {"breadth-first", "depth-first", "min-intermediate", "max-parallelize", "na-pipeline"};

private final String testDir = "functions/linearization/";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.apache.sysds.test.functions.linearization;

import static org.junit.Assert.fail;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.matrix.data.MatrixValue;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Test;

//additional imports
import org.apache.sysds.lops.compile.linearization.ILinearize;
import org.apache.sysds.lops.Lop;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.lops.BinaryScalar;
import org.apache.sysds.lops.Data;

public class ILinearizeTest extends AutomatedTestBase {

private final String testDir = "functions/linearization/";

@Override
public void setUp() {
setOutputBuffering(true);
disableConfigFile = true;
TestUtils.clearAssertionInformation();

addTestConfiguration("test", new TestConfiguration(testDir, "test"));
}

@Test
public void testLinearize_Pipeline() {

try {
//DMLConfig dmlconf = DMLConfig.readConfigurationFile("/home/niklas/systemds/src/test/scripts/functions/linearization/SystemDS-config-na-pipeline.xml");
DMLConfig dmlconf = DMLConfig.readConfigurationFile("/home/niklas/systemds/src/test/scripts/functions/linearization/SystemDS-config-default.xml");
ConfigurationManager.setGlobalConfig(dmlconf);
} catch (Exception e) {
e.printStackTrace();
}

System.out.println("testLinearize_Pipeline");

List<Lop> lops = new ArrayList<>();

//lops.add(new Data(org.apache.sysds.common.Types.OpOpData.PERSISTENTREAD, null, null, "test", null, org.apache.sysds.common.Types.DataType.SCALAR, org.apache.sysds.common.Types.ValueType.INT32, null));
Lop lop1 = Data.createLiteralLop(org.apache.sysds.common.Types.ValueType.INT32, "1");
Lop lop2 = Data.createLiteralLop(org.apache.sysds.common.Types.ValueType.INT32, "2");
lops.add(lop1);
lops.add(lop2);
lops.add(new BinaryScalar(lop1, lop2, org.apache.sysds.common.Types.OpOp2.PLUS, org.apache.sysds.common.Types.DataType.SCALAR, org.apache.sysds.common.Types.ValueType.INT32));

List<Lop> result = ILinearize.linearize(lops);

List<Lop> filtered = result.stream().filter(l -> l.getPipelineID() == -1).collect(Collectors.toList());
if (filtered.size() > 0) {
fail("Pipeline ID not set correctly");
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<!--
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
-->

<root>
<sysds.compile.linearization>na_pipeline</sysds.compile.linearization>
</root>

0 comments on commit c134ed7

Please sign in to comment.