Skip to content

Commit

Permalink
[SYSTEMDS-3662] Parfor Merge Sparse
Browse files Browse the repository at this point in the history
This commit optimize the parfor merge.
In the case of Kmeans with 10 runs it optimize the merge phase from
19 to 1 sec because it exploits the sparsity of the merging blocks.

Closes #1971
  • Loading branch information
Baunsgaard committed Jan 5, 2024
1 parent b3aac0d commit fdd60f6
Showing 1 changed file with 143 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,51 @@
import java.util.List;

import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.utils.Util;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;

/**
* <p>
* Due to independence of all iterations, any result has the following properties:
* (1) non local var, (2) matrix object, and (3) completely independent.
* These properties allow us to realize result merging in parallel without any synchronization.
* </p>
*
* <p>
* (1) non local var,
* </p>
* <p>
* (2) matrix object, and
* </p>
* <p>
* (3) completely independent.
* </p>
*
* <p>
* These properties allow us to realize result merging in parallel without any synchronization.
* </p>
*/
public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject>
{
public abstract class ResultMergeMatrix extends ResultMerge<MatrixObject> {
private static final long serialVersionUID = 5319002218804570071L;

public ResultMergeMatrix() {
super();
}

public ResultMergeMatrix(MatrixObject out, MatrixObject[] in, String outputFilename, boolean accum) {
super(out, in, outputFilename, accum);
}
protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly ) {

protected void mergeWithoutComp(MatrixBlock out, MatrixBlock in, boolean appendOnly) {
mergeWithoutComp(out, in, appendOnly, false);
}
protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par ) {
//pass through to matrix block operations
if( _isAccum )

protected void mergeWithoutComp(MatrixBlock out, MatrixBlock in, boolean appendOnly, boolean par) {
// pass through to matrix block operations
if(_isAccum)
out.binaryOperationsInPlace(PLUS, in);
else{
else {
MatrixBlock out2 = out.merge(in, appendOnly, par);

if(out2 != out)
Expand All @@ -61,52 +75,132 @@ protected void mergeWithoutComp( MatrixBlock out, MatrixBlock in, boolean append
}

/**
* NOTE: append only not applicable for wiht compare because output must be populated with
* initial state of matrix - with append, this would result in duplicates.
* NOTE: append only not applicable for with compare because output must be populated with initial state of matrix -
* with append, this would result in duplicates.
*
* @param out output matrix block
* @param in input matrix block
* @param compare ?
* @param out output matrix block
* @param in input matrix block
* @param compare Comparison matrix of old values.
*/
protected void mergeWithComp( MatrixBlock out, MatrixBlock in, DenseBlock compare )
{
//Notes for result correctness:
// * Always iterate over entire block in order to compare all values
// (using sparse iterator would miss values set to 0)
protected void mergeWithComp(MatrixBlock out, MatrixBlock in, DenseBlock compare) {
// Notes for result correctness:
// * Always iterate over entire block in order to compare all values
// (using sparse iterator would miss values set to 0)
// * Explicit NaN awareness because for cases were original matrix contains
// NaNs, since NaN != NaN, otherwise we would potentially overwrite results
// NaNs, since NaN != NaN, otherwise we would potentially overwrite results
// * For the case of accumulation, we add out += (new-old) to ensure correct results
// because all inputs have the old values replicated

if( in.isEmptyBlock(false) ) {
if( _isAccum ) return; //nothing to do
for( int i=0; i<in.getNumRows(); i++ )
for( int j=0; j<in.getNumColumns(); j++ )
if( compare.get(i, j) != 0 )
out.quickSetValue(i, j, 0);
// because all inputs have the old values replicated

final int rows = in.getNumRows();
final int cols = in.getNumColumns();
if(in.isEmptyBlock(false)) {
if(_isAccum)
return; // nothing to do
mergeWithCompEmpty(out, rows, cols, compare);
}
else if(in.isInSparseFormat() && _isAccum)
mergeSparseAccumulative(out, in, rows, cols, compare);
else if(in.isInSparseFormat())
mergeSparse(out, in, rows, cols, compare);
else // SPARSE/DENSE
mergeGeneric(out, in, rows, cols, compare);
}

private void mergeWithCompEmpty(MatrixBlock out, int m, int n, DenseBlock compare) {
for(int i = 0; i < m; i++)
mergeWithCompEmptyRow(out, m, n, compare, i);
}

private void mergeWithCompEmptyRow(MatrixBlock out, int m, int n, DenseBlock compare, int i) {

for(int j = 0; j < n; j++) {
final double valOld = compare.get(i, j);
if(!Util.eq(0.0, valOld)) // NaN awareness
out.quickSetValue(i, j, 0);
}
}

private void mergeSparseAccumulative(MatrixBlock out, MatrixBlock in, int m, int n, DenseBlock compare) {
final SparseBlock a = in.getSparseBlock();
for(int i = 0; i < m; i++) {
if(a.isEmpty(i))
continue;
final int apos = a.pos(i);
final int alen = a.size(i) + apos;
final int[] aix = a.indexes(i);
final double[] aval = a.values(i);
mergeSparseRowAccumulative(out, apos, alen, aix, aval, compare, n, i);
}
}

private void mergeSparseRowAccumulative(MatrixBlock out, int apos, int alen, int[] aix, double[] aval,
DenseBlock compare, int n, int i) {
for(; apos < alen; apos++) { // inside
final double valOld = compare.get(i, aix[apos]);
final double valNew = aval[apos];
if(!Util.eq(valNew, valOld)) { // NaN awareness
double value = out.quickGetValue(i, aix[apos]) + (valNew - valOld);
out.quickSetValue(i, aix[apos], value);
}
}
}

private void mergeSparse(MatrixBlock out, MatrixBlock in, int m, int n, DenseBlock compare) {
final SparseBlock a = in.getSparseBlock();
for(int i = 0; i < m; i++) {
if(a.isEmpty(i))
mergeWithCompEmptyRow(out, m, n, compare, i);
else {
final int apos = a.pos(i);
final int alen = a.size(i) + apos;
final int[] aix = a.indexes(i);
final double[] aval = a.values(i);
mergeSparseRow(out, apos, alen, aix, aval, compare, n, i);
}
}
else { //SPARSE/DENSE
int rows = in.getNumRows();
int cols = in.getNumColumns();
for( int i=0; i<rows; i++ )
for( int j=0; j<cols; j++ ) {
double valOld = compare.get(i,j);
double valNew = in.quickGetValue(i,j); //input value
if( (valNew != valOld && !Double.isNaN(valNew) ) //for changed values
|| Double.isNaN(valNew) != Double.isNaN(valOld) ) //NaN awareness
{
double value = !_isAccum ? valNew :
(out.quickGetValue(i, j) + (valNew - valOld));
out.quickSetValue(i, j, value);
}
}

private void mergeSparseRow(MatrixBlock out, int apos, int alen, int[] aix, double[] aval, DenseBlock compare, int n,
int i) {
int j = 0;
for(; j < n && apos < alen; j++) { // inside
final boolean aposValid = aix[apos] == j;
final double valOld = compare.get(i, j);
final double valNew = aix[apos] == j ? aval[apos] : 0.0;
if(!Util.eq(valNew, valOld)) { // NaN awareness
double value = !_isAccum ? valNew : (out.quickGetValue(i, j) + (valNew - valOld));
out.quickSetValue(i, j, value);
}
if(aposValid)
apos++;
}
for(; j < n; j++) {
final double valOld = compare.get(i, j);
if(valOld != 0) {
double value = (out.quickGetValue(i, j) - valOld);
out.quickSetValue(i, j, value);
}
}

}

private void mergeGeneric(MatrixBlock out, MatrixBlock in, int m, int n, DenseBlock compare) {
for(int i = 0; i < m; i++) {
for(int j = 0; j < n; j++) {
final double valOld = compare.get(i, j);
final double valNew = in.quickGetValue(i, j); // input value
if(!Util.eq(valNew, valOld)) { // NaN awareness
double value = !_isAccum ? valNew : (out.quickGetValue(i, j) + (valNew - valOld));
out.quickSetValue(i, j, value);
}
}
}
}

protected long computeNonZeros( MatrixObject out, List<MatrixObject> in ) {
//sum of nnz of input (worker result) - output var existing nnz
protected long computeNonZeros(MatrixObject out, List<MatrixObject> in) {
// sum of nnz of input (worker result) - output var existing nnz
long outNNZ = out.getDataCharacteristics().getNonZeros();
return outNNZ - in.size() * outNNZ + in.stream()
return outNNZ - in.size() * outNNZ + in.stream()//
.mapToLong(m -> m.getDataCharacteristics().getNonZeros()).sum();
}
}

0 comments on commit fdd60f6

Please sign in to comment.