diff --git a/bin/systemds b/bin/systemds index 2e8e629495b..f0cb0b729b0 100755 --- a/bin/systemds +++ b/bin/systemds @@ -413,6 +413,7 @@ if [ $WORKER == 1 ]; then print_out "# starting Federated worker on port $PORT" CMD=" \ java $SYSTEMDS_STANDALONE_OPTS \ + --add-modules=jdk.incubator.vector \ $LOG4JPROPFULL \ -jar $SYSTEMDS_JAR_FILE \ -w $PORT \ @@ -422,6 +423,7 @@ elif [ "$FEDMONITORING" == 1 ]; then print_out "# starting Federated backend monitoring on port $PORT" CMD=" \ java $SYSTEMDS_STANDALONE_OPTS \ + --add-modules=jdk.incubator.vector \ $LOG4JPROPFULL \ -jar $SYSTEMDS_JAR_FILE \ -fedMonitoring $PORT \ @@ -433,6 +435,7 @@ elif [ $SYSDS_DISTRIBUTED == 0 ]; then CMD=" \ java $SYSTEMDS_STANDALONE_OPTS \ $LOG4JPROPFULL \ + --add-modules=jdk.incubator.vector \ -jar $SYSTEMDS_JAR_FILE \ -f $SCRIPT_FILE \ -exec $SYSDS_EXEC_MODE \ @@ -442,6 +445,7 @@ else print_out "# Running script $SCRIPT_FILE distributed with opts: $*" CMD=" \ spark-submit $SYSTEMDS_DISTRIBUTED_OPTS \ + --add-modules=jdk.incubator.vector \ $SYSTEMDS_JAR_FILE \ -f $SCRIPT_FILE \ -exec $SYSDS_EXEC_MODE \ diff --git a/pom.xml b/pom.xml index 64616b94de9..f47b76c662e 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,7 @@ - 11 + 17 {java.level} Testing settings false @@ -77,6 +77,7 @@ 1C 2 false + false ** false -Xms3000m -Xmx3000m -Xmn300m @@ -345,6 +346,9 @@ ${java.level} ${java.level} ${java.level} + + --add-modules=jdk.incubator.vector + @@ -367,6 +371,7 @@ file:src/test/resources/log4j.properties + --add-modules=jdk.incubator.vector @@ -875,9 +880,10 @@ *.protobuf true - true + false true - false + --add-modules=jdk.incubator.vector + ${doc.skip} public ${java.level} diff --git a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java index 2cf651f1894..85ce9882ecc 100644 --- a/src/main/java/org/apache/sysds/hops/AggBinaryOp.java +++ b/src/main/java/org/apache/sysds/hops/AggBinaryOp.java @@ -439,8 +439,7 @@ private boolean isApplicableForTransitiveSparkExecType(boolean left) || (left && !isLeftTransposeRewriteApplicable(true))) && getInput(index).getParent().size()==1 //bagg is only parent && !getInput(index).areDimsBelowThreshold() - && (getInput(index).optFindExecType() == ExecType.SPARK - || (getInput(index) instanceof DataOp && ((DataOp)getInput(index)).hasOnlyRDD())) + && getInput(index).hasSparkOutput() && getInput(index).getOutputMemEstimate()>getOutputMemEstimate(); } diff --git a/src/main/java/org/apache/sysds/hops/BinaryOp.java b/src/main/java/org/apache/sysds/hops/BinaryOp.java index 839ce641af6..14a90b64483 100644 --- a/src/main/java/org/apache/sysds/hops/BinaryOp.java +++ b/src/main/java/org/apache/sysds/hops/BinaryOp.java @@ -747,8 +747,8 @@ protected ExecType optFindExecType(boolean transitive) { checkAndSetForcedPlatform(); - DataType dt1 = getInput().get(0).getDataType(); - DataType dt2 = getInput().get(1).getDataType(); + final DataType dt1 = getInput(0).getDataType(); + final DataType dt2 = getInput(1).getDataType(); if( _etypeForced != null ) { setExecType(_etypeForced); @@ -796,18 +796,28 @@ else if ( dt1 == DataType.SCALAR && dt2 == DataType.MATRIX ) { checkAndSetInvalidCPDimsAndSize(); } - //spark-specific decision refinement (execute unary scalar w/ spark input and + // spark-specific decision refinement (execute unary scalar w/ spark input and // single parent also in spark because it's likely cheap and reduces intermediates) - if(transitive && _etype == ExecType.CP && _etypeForced != ExecType.CP && _etypeForced != ExecType.FED && - getDataType().isMatrix() // output should be a matrix - && (dt1.isScalar() || dt2.isScalar()) // one side should be scalar - && supportsMatrixScalarOperations() // scalar operations - && !(getInput().get(dt1.isScalar() ? 1 : 0) instanceof DataOp) // input is not checkpoint - && getInput().get(dt1.isScalar() ? 1 : 0).getParent().size() == 1 // unary scalar is only parent - && !HopRewriteUtils.isSingleBlock(getInput().get(dt1.isScalar() ? 1 : 0)) // single block triggered exec - && getInput().get(dt1.isScalar() ? 1 : 0).optFindExecType() == ExecType.SPARK) { - // pull unary scalar operation into spark - _etype = ExecType.SPARK; + if(transitive // we allow transitive Spark operations. continue sequences of spark operations + && _etype == ExecType.CP // The instruction is currently in CP + && _etypeForced != ExecType.CP // not forced CP + && _etypeForced != ExecType.FED // not federated + && (getDataType().isMatrix() || getDataType().isFrame()) // output should be a matrix or frame + ) { + final boolean v1 = getInput(0).isScalarOrVectorBellowBlockSize(); + final boolean v2 = getInput(1).isScalarOrVectorBellowBlockSize(); + final boolean left = v1 == true; // left side is the vector or scalar + final Hop sparkIn = getInput(left ? 1 : 0); + if((v1 ^ v2) // XOR only one side is allowed to be a vector or a scalar. + && (supportsMatrixScalarOperations() || op == OpOp2.APPLY_SCHEMA) // supported operation + && sparkIn.getParent().size() == 1 // only one parent + && !HopRewriteUtils.isSingleBlock(sparkIn) // single block triggered exec + && sparkIn.optFindExecType() == ExecType.SPARK // input was spark op. + && !(sparkIn instanceof DataOp) // input is not checkpoint + ) { + // pull operation into spark + _etype = ExecType.SPARK; + } } if( OptimizerUtils.ALLOW_BINARY_UPDATE_IN_PLACE && @@ -837,7 +847,7 @@ else if( (op == OpOp2.CBIND && getDataType().isList()) || (op == OpOp2.RBIND && getDataType().isList())) { _etype = ExecType.CP; } - + //mark for recompile (forever) setRequiresRecompileIfNecessary(); @@ -1154,17 +1164,35 @@ && getInput().get(0) == that2.getInput().get(0) } public boolean supportsMatrixScalarOperations() { - return ( op==OpOp2.PLUS ||op==OpOp2.MINUS - ||op==OpOp2.MULT ||op==OpOp2.DIV - ||op==OpOp2.MODULUS ||op==OpOp2.INTDIV - ||op==OpOp2.LESS ||op==OpOp2.LESSEQUAL - ||op==OpOp2.GREATER ||op==OpOp2.GREATEREQUAL - ||op==OpOp2.EQUAL ||op==OpOp2.NOTEQUAL - ||op==OpOp2.MIN ||op==OpOp2.MAX - ||op==OpOp2.LOG ||op==OpOp2.POW - ||op==OpOp2.AND ||op==OpOp2.OR ||op==OpOp2.XOR - ||op==OpOp2.BITWAND ||op==OpOp2.BITWOR ||op==OpOp2.BITWXOR - ||op==OpOp2.BITWSHIFTL ||op==OpOp2.BITWSHIFTR); + switch(op) { + case PLUS: + case MINUS: + case MULT: + case DIV: + case MODULUS: + case INTDIV: + case LESS: + case LESSEQUAL: + case GREATER: + case GREATEREQUAL: + case EQUAL: + case NOTEQUAL: + case MIN: + case MAX: + case LOG: + case POW: + case AND: + case OR: + case XOR: + case BITWAND: + case BITWOR: + case BITWXOR: + case BITWSHIFTL: + case BITWSHIFTR: + return true; + default: + return false; + } } public boolean isPPredOperation() { diff --git a/src/main/java/org/apache/sysds/hops/Hop.java b/src/main/java/org/apache/sysds/hops/Hop.java index b32a1a74aab..4a842c69b0f 100644 --- a/src/main/java/org/apache/sysds/hops/Hop.java +++ b/src/main/java/org/apache/sysds/hops/Hop.java @@ -1040,6 +1040,12 @@ public final String toString() { // ======================================================================================== + protected boolean isScalarOrVectorBellowBlockSize(){ + return getDataType().isScalar() || (dimsKnown() && + (( _dc.getRows() == 1 && _dc.getCols() < ConfigurationManager.getBlocksize()) + || _dc.getCols() == 1 && _dc.getRows() < ConfigurationManager.getBlocksize())); + } + protected boolean isVector() { return (dimsKnown() && (_dc.getRows() == 1 || _dc.getCols() == 1) ); } @@ -1624,6 +1630,11 @@ protected void setMemoryAndComputeEstimates(Lop lop) { lop.setComputeEstimate(ComputeCost.getHOPComputeCost(this)); } + protected boolean hasSparkOutput(){ + return (this.optFindExecType() == ExecType.SPARK + || (this instanceof DataOp && ((DataOp)this).hasOnlyRDD())); + } + /** * Set parse information. * diff --git a/src/main/java/org/apache/sysds/hops/TernaryOp.java b/src/main/java/org/apache/sysds/hops/TernaryOp.java index 0334dbbb2f7..7dffabdf71e 100644 --- a/src/main/java/org/apache/sysds/hops/TernaryOp.java +++ b/src/main/java/org/apache/sysds/hops/TernaryOp.java @@ -21,6 +21,7 @@ import org.apache.sysds.api.DMLScript; import org.apache.sysds.common.Types.DataType; +import org.apache.sysds.common.Types.ExecType; import org.apache.sysds.common.Types.OpOp2; import org.apache.sysds.common.Types.OpOp3; import org.apache.sysds.common.Types.OpOpDG; @@ -33,8 +34,8 @@ import org.apache.sysds.lops.CentralMoment; import org.apache.sysds.lops.CoVariance; import org.apache.sysds.lops.Ctable; +import org.apache.sysds.lops.Data; import org.apache.sysds.lops.Lop; -import org.apache.sysds.common.Types.ExecType; import org.apache.sysds.lops.LopsException; import org.apache.sysds.lops.PickByCount; import org.apache.sysds.lops.SortKeys; @@ -273,6 +274,8 @@ private void constructLopsCtable() { // F=ctable(A,B,W) DataType dt1 = getInput().get(0).getDataType(); + + DataType dt2 = getInput().get(1).getDataType(); DataType dt3 = getInput().get(2).getDataType(); Ctable.OperationTypes ternaryOpOrig = Ctable.findCtableOperationByInputDataTypes(dt1, dt2, dt3); @@ -280,7 +283,10 @@ private void constructLopsCtable() { // Compute lops for all inputs Lop[] inputLops = new Lop[getInput().size()]; for(int i=0; i < getInput().size(); i++) { - inputLops[i] = getInput().get(i).constructLops(); + if(i == 0 && HopRewriteUtils.isSequenceSizeOfA(getInput(0), getInput(1))) + inputLops[i] = Data.createLiteralLop(ValueType.INT64, "" +getInput(1).getDim(0)); + else + inputLops[i] = getInput().get(i).constructLops(); } ExecType et = optFindExecType(); diff --git a/src/main/java/org/apache/sysds/hops/UnaryOp.java b/src/main/java/org/apache/sysds/hops/UnaryOp.java index 1bda77530bb..db8c773a075 100644 --- a/src/main/java/org/apache/sysds/hops/UnaryOp.java +++ b/src/main/java/org/apache/sysds/hops/UnaryOp.java @@ -366,7 +366,11 @@ protected double computeOutputMemEstimate( long dim1, long dim2, long nnz ) } else { sparsity = OptimizerUtils.getSparsity(dim1, dim2, nnz); } - return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity); + + if(getDataType() == DataType.FRAME) + return OptimizerUtils.estimateSizeExactFrame(dim1, dim2); + else + return OptimizerUtils.estimateSizeExactSparsity(dim1, dim2, sparsity); } @Override @@ -463,6 +467,13 @@ public boolean isMetadataOperation() { || _op == OpOp1.CAST_AS_LIST; } + private boolean isDisallowedSparkOps(){ + return isCumulativeUnaryOperation() + || isCastUnaryOperation() + || _op==OpOp1.MEDIAN + || _op==OpOp1.IQM; + } + @Override protected ExecType optFindExecType(boolean transitive) { @@ -493,19 +504,22 @@ else if ( getInput().get(0).areDimsBelowThreshold() || getInput().get(0).isVecto checkAndSetInvalidCPDimsAndSize(); } + //spark-specific decision refinement (execute unary w/ spark input and //single parent also in spark because it's likely cheap and reduces intermediates) - if( _etype == ExecType.CP && _etypeForced != ExecType.CP - && getInput().get(0).optFindExecType() == ExecType.SPARK - && getDataType().isMatrix() - && !isCumulativeUnaryOperation() && !isCastUnaryOperation() - && _op!=OpOp1.MEDIAN && _op!=OpOp1.IQM - && !(getInput().get(0) instanceof DataOp) //input is not checkpoint - && getInput().get(0).getParent().size()==1 ) //unary is only parent - { + if(_etype == ExecType.CP // currently CP instruction + && _etype != ExecType.SPARK /// currently not SP. + && _etypeForced != ExecType.CP // not forced as CP instruction + && getInput(0).hasSparkOutput() // input is a spark instruction + && (getDataType().isMatrix() || getDataType().isFrame()) // output is a matrix or frame + && !isDisallowedSparkOps() // is invalid spark instruction + // && !(getInput().get(0) instanceof DataOp) // input is not checkpoint + // && getInput(0).getParent().size() <= 1// unary is only parent + ) { //pull unary operation into spark _etype = ExecType.SPARK; } + //mark for recompile (forever) setRequiresRecompileIfNecessary(); @@ -519,7 +533,7 @@ && getInput().get(0).getParent().size()==1 ) //unary is only parent } else { setRequiresRecompileIfNecessary(); } - + return _etype; } diff --git a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java index aae2787cd35..0c8859f65f2 100644 --- a/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java +++ b/src/main/java/org/apache/sysds/hops/rewrite/HopRewriteUtils.java @@ -1392,6 +1392,25 @@ public static boolean isBasicN1Sequence(Hop hop) return ret; } + public static boolean isSequenceSizeOfA(Hop hop, Hop A) + { + boolean ret = false; + + if( hop instanceof DataGenOp ) + { + DataGenOp dgop = (DataGenOp) hop; + if( dgop.getOp() == OpOpDG.SEQ ){ + Hop from = dgop.getInput().get(dgop.getParamIndex(Statement.SEQ_FROM)); + Hop to = dgop.getInput().get(dgop.getParamIndex(Statement.SEQ_TO)); + Hop incr = dgop.getInput().get(dgop.getParamIndex(Statement.SEQ_INCR)); + ret = (from instanceof LiteralOp && getIntValueSafe((LiteralOp) from) == 1) && + (to instanceof LiteralOp && getIntValueSafe((LiteralOp) to) == A.getDim(0)) && + (incr instanceof LiteralOp && getIntValueSafe((LiteralOp)incr)==1); + } + } + + return ret; + } public static Hop getBasic1NSequenceMax(Hop hop) { if( isDataGenOp(hop, OpOpDG.SEQ) ) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java index 001e11dcd4b..f6f8ee2e313 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java @@ -26,8 +26,10 @@ import java.io.ObjectOutput; import java.lang.ref.SoftReference; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -42,17 +44,21 @@ import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; +import org.apache.sysds.runtime.compress.colgroup.ADictBasedColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; import org.apache.sysds.runtime.compress.colgroup.ColGroupIO; import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; -import org.apache.sysds.runtime.compress.lib.CLALibAppend; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IDictionary; import org.apache.sysds.runtime.compress.lib.CLALibBinaryCellOp; +import org.apache.sysds.runtime.compress.lib.CLALibCBind; import org.apache.sysds.runtime.compress.lib.CLALibCMOps; import org.apache.sysds.runtime.compress.lib.CLALibCompAgg; import org.apache.sysds.runtime.compress.lib.CLALibDecompress; import org.apache.sysds.runtime.compress.lib.CLALibMMChain; import org.apache.sysds.runtime.compress.lib.CLALibMatrixMult; import org.apache.sysds.runtime.compress.lib.CLALibMerge; +import org.apache.sysds.runtime.compress.lib.CLALibReorg; +import org.apache.sysds.runtime.compress.lib.CLALibReplace; import org.apache.sysds.runtime.compress.lib.CLALibReshape; import org.apache.sysds.runtime.compress.lib.CLALibRexpand; import org.apache.sysds.runtime.compress.lib.CLALibScalar; @@ -66,7 +72,6 @@ import org.apache.sysds.runtime.data.DenseBlock; import org.apache.sysds.runtime.data.SparseBlock; import org.apache.sysds.runtime.data.SparseRow; -import org.apache.sysds.runtime.functionobjects.SwapIndex; import org.apache.sysds.runtime.instructions.InstructionUtils; import org.apache.sysds.runtime.instructions.cp.CM_COV_Object; import org.apache.sysds.runtime.instructions.cp.ScalarObject; @@ -99,11 +104,12 @@ public class CompressedMatrixBlock extends MatrixBlock { private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); private static final long serialVersionUID = 73193720143154058L; - /** - * Debugging flag for Compressed Matrices - */ + /** Debugging flag for Compressed Matrices */ public static boolean debug = false; + /** Disallow caching of uncompressed Block */ + public static boolean allowCachingUncompressed = true; + /** * Column groups */ @@ -119,6 +125,9 @@ public class CompressedMatrixBlock extends MatrixBlock { */ protected transient SoftReference decompressedVersion; + /** Cached Memory size */ + protected transient long cachedMemorySize = -1; + public CompressedMatrixBlock() { super(true); sparse = false; @@ -169,7 +178,9 @@ protected CompressedMatrixBlock(MatrixBlock uncompressedMatrixBlock) { clen = uncompressedMatrixBlock.getNumColumns(); sparse = false; nonZeros = uncompressedMatrixBlock.getNonZeros(); - decompressedVersion = new SoftReference<>(uncompressedMatrixBlock); + if(!(uncompressedMatrixBlock instanceof CompressedMatrixBlock)) { + decompressedVersion = new SoftReference<>(uncompressedMatrixBlock); + } } /** @@ -189,6 +200,8 @@ public CompressedMatrixBlock(int rl, int cl, long nnz, boolean overlapping, List this.nonZeros = nnz; this.overlappingColGroups = overlapping; this._colGroups = groups; + + getInMemorySize(); // cache memory size } @Override @@ -204,6 +217,7 @@ public void reset(int rl, int cl, boolean sp, long estnnz, double val) { * @param cg The column group to use after. */ public void allocateColGroup(AColGroup cg) { + cachedMemorySize = -1; _colGroups = new ArrayList<>(1); _colGroups.add(cg); } @@ -214,6 +228,7 @@ public void allocateColGroup(AColGroup cg) { * @param colGroups new ColGroups in the MatrixBlock */ public void allocateColGroupList(List colGroups) { + cachedMemorySize = -1; _colGroups = colGroups; } @@ -270,6 +285,11 @@ public synchronized MatrixBlock decompress(int k) { ret = CLALibDecompress.decompress(this, k); + if(ret.getNonZeros() <= 0) { + ret.recomputeNonZeros(k); + } + ret.examSparsity(k); + // Set soft reference to the decompressed version decompressedVersion = new SoftReference<>(ret); @@ -290,7 +310,7 @@ public void putInto(MatrixBlock target, int rowOffset, int colOffset, boolean sp * @return The cached decompressed matrix, if it does not exist return null */ public MatrixBlock getCachedDecompressed() { - if(decompressedVersion != null) { + if( allowCachingUncompressed && decompressedVersion != null) { final MatrixBlock mb = decompressedVersion.get(); if(mb != null) { DMLCompressionStatistics.addDecompressCacheCount(); @@ -302,6 +322,7 @@ public MatrixBlock getCachedDecompressed() { } public CompressedMatrixBlock squash(int k) { + cachedMemorySize = -1; return CLALibSquash.squash(this, k); } @@ -332,7 +353,6 @@ public long recomputeNonZeros(int k) { List> tasks = new ArrayList<>(); for(AColGroup g : _colGroups) tasks.add(pool.submit(() -> g.getNumberNonZeros(rlen))); - long nnz = 0; for(Future t : tasks) nnz += t.get(); @@ -377,12 +397,27 @@ public long estimateSizeInMemory() { * @return an upper bound on the memory used to store this compressed block considering class overhead. */ public long estimateCompressedSizeInMemory() { - long total = baseSizeInMemory(); - for(AColGroup grp : _colGroups) - total += grp.estimateInMemorySize(); + if(cachedMemorySize <= -1L) { + + long total = baseSizeInMemory(); + // take into consideration duplicate dictionaries + Set dicts = new HashSet<>(); + for(AColGroup grp : _colGroups){ + if(grp instanceof ADictBasedColGroup){ + IDictionary dg = ((ADictBasedColGroup) grp).getDictionary(); + if(dicts.contains(dg)) + total -= dg.getInMemorySize(); + dicts.add(dg); + } + total += grp.estimateInMemorySize(); + } + cachedMemorySize = total; + return total; + } + else + return cachedMemorySize; - return total; } public static long baseSizeInMemory() { @@ -392,6 +427,7 @@ public static long baseSizeInMemory() { total += 8; // Col Group Ref total += 8; // v reference total += 8; // soft reference to decompressed version + total += 8; // long cached memory size total += 1 + 7; // Booleans plus padding total += 40; // Col Group Array List @@ -431,6 +467,7 @@ public long estimateSizeOnDisk() { @Override public void readFields(DataInput in) throws IOException { + cachedMemorySize = -1; // deserialize compressed block rlen = in.readInt(); clen = in.readInt(); @@ -521,8 +558,8 @@ public MatrixBlock binaryOperationsLeft(BinaryOperator op, MatrixValue thatValue @Override public MatrixBlock append(MatrixBlock[] that, MatrixBlock ret, boolean cbind) { - if(cbind && that.length == 1) - return CLALibAppend.append(this, that[0], InfrastructureAnalyzer.getLocalParallelism()); + if(cbind) + return CLALibCBind.cbind(this, that, InfrastructureAnalyzer.getLocalParallelism()); else { MatrixBlock left = getUncompressed("append list or r-bind not supported in compressed"); MatrixBlock[] thatUC = new MatrixBlock[that.length]; @@ -541,8 +578,7 @@ public void append(MatrixValue v2, ArrayList outlist, int bl } @Override - public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, - int k) { + public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) { checkMMChain(ctype, v, w); // multi-threaded MMChain of single uncompressed ColGroup @@ -595,45 +631,12 @@ public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType @Override public MatrixBlock replaceOperations(MatrixValue result, double pattern, double replacement) { - if(Double.isInfinite(pattern)) { - LOG.info("Ignoring replace infinite in compression since it does not contain this value"); - return this; - } - else if(isOverlapping()) { - final String message = "replaceOperations " + pattern + " -> " + replacement; - return getUncompressed(message).replaceOperations(result, pattern, replacement); - } - else { - - CompressedMatrixBlock ret = new CompressedMatrixBlock(getNumRows(), getNumColumns()); - final List prev = getColGroups(); - final int colGroupsLength = prev.size(); - final List retList = new ArrayList<>(colGroupsLength); - for(int i = 0; i < colGroupsLength; i++) - retList.add(prev.get(i).replace(pattern, replacement)); - ret.allocateColGroupList(retList); - ret.recomputeNonZeros(); - return ret; - } + return CLALibReplace.replace(this, (MatrixBlock) result, pattern, replacement, InfrastructureAnalyzer.getLocalParallelism()); } @Override public MatrixBlock reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) { - if(op.fn instanceof SwapIndex && this.getNumColumns() == 1) { - MatrixBlock tmp = decompress(op.getNumThreads()); - long nz = tmp.setNonZeros(tmp.getNonZeros()); - tmp = new MatrixBlock(tmp.getNumColumns(), tmp.getNumRows(), tmp.getDenseBlockValues()); - tmp.setNonZeros(nz); - return tmp; - } - else { - // Allow transpose to be compressed output. In general we need to have a transposed flag on - // the compressed matrix. https://issues.apache.org/jira/browse/SYSTEMDS-3025 - String message = op.getClass().getSimpleName() + " -- " + op.fn.getClass().getSimpleName(); - MatrixBlock tmp = getUncompressed(message, op.getNumThreads()); - return tmp.reorgOperations(op, ret, startRow, startColumn, length); - } - + return CLALibReorg.reorg(this, op, (MatrixBlock) ret, startRow, startColumn, length); } public boolean isOverlapping() { @@ -658,6 +661,11 @@ public void slice(ArrayList outlist, IndexRange range, int r tmp.slice(outlist, range, rowCut, colCut, blen, boundaryRlen, boundaryClen); } + @Override + public MatrixBlock reshape(int rows,int cols, boolean byRow){ + return CLALibReshape.reshape(this, rows, cols, byRow); + } + @Override public MatrixBlock unaryOperations(UnaryOperator op, MatrixValue result) { return CLALibUnary.unaryOperations(this, op, result); @@ -736,8 +744,22 @@ public MatrixBlock rexpandOperations(MatrixBlock ret, double max, boolean rows, @Override public boolean isEmptyBlock(boolean safe) { - final long nonZeros = getNonZeros(); - return _colGroups == null || nonZeros == 0 || (nonZeros == -1 && recomputeNonZeros() == 0); + if(nonZeros > 1) + return false; + else if(_colGroups == null || nonZeros == 0) + return true; + else{ + if(nonZeros == -1){ + // try to use column groups + for(AColGroup g : _colGroups) + if(!g.isEmpty()) + return false; + // Otherwise recompute non zeros. + recomputeNonZeros(); + } + + return getNonZeros() == 0; + } } @Override @@ -1045,6 +1067,7 @@ public void copy(int rl, int ru, int cl, int cu, MatrixBlock src, boolean awareD } private void copyCompressedMatrix(CompressedMatrixBlock that) { + cachedMemorySize = -1; this.rlen = that.getNumRows(); this.clen = that.getNumColumns(); this.sparseBlock = null; @@ -1059,7 +1082,7 @@ private void copyCompressedMatrix(CompressedMatrixBlock that) { } public SoftReference getSoftReferenceToDecompressed() { - return decompressedVersion; + return allowCachingUncompressed ? decompressedVersion : null; } public void clearSoftReferenceToDecompressed() { @@ -1127,8 +1150,7 @@ public void appendRow(int r, SparseRow row, boolean deep) { } @Override - public void appendRowToSparse(SparseBlock dest, MatrixBlock src, int i, int rowoffset, int coloffset, - boolean deep) { + public void appendRowToSparse(SparseBlock dest, MatrixBlock src, int i, int rowoffset, int coloffset, boolean deep) { throw new DMLCompressionException("Can't append row to compressed Matrix"); } @@ -1183,12 +1205,12 @@ public void examSparsity(boolean allowCSR, int k) { } @Override - public void sparseToDense(int k) { - // do nothing + public MatrixBlock sparseToDense(int k) { + return this; // do nothing } @Override - public void denseToSparse(boolean allowCSR, int k){ + public void denseToSparse(boolean allowCSR, int k) { // do nothing } @@ -1282,11 +1304,6 @@ public MatrixBlock transpose(int k) { return getUncompressed().transpose(k); } - @Override - public MatrixBlock reshape(int rows,int cols, boolean byRow){ - return CLALibReshape.reshape(this, rows, cols, byRow); - } - @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java index 83b21ab2359..7966a6cfdca 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java @@ -37,6 +37,7 @@ import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory; import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; import org.apache.sysds.runtime.compress.cost.ACostEstimate; +import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator; import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder; import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory; import org.apache.sysds.runtime.compress.cost.InstructionTypeCounter; @@ -86,6 +87,9 @@ public class CompressedMatrixBlockFactory { /** Compression information gathered through the sampling, used for the actual compression decided */ private CompressedSizeInfo compressionGroups; + // /** Indicate if the compression aborts we should decompress*/ + // private boolean shouldDecompress = false; + private CompressedMatrixBlockFactory(MatrixBlock mb, int k, CompressionSettingsBuilder compSettings, ACostEstimate costEstimator) { this(mb, k, compSettings.create(), costEstimator); @@ -178,6 +182,7 @@ public static Future compressAsync(ExecutionContext ec, String varName, In ExecutionContext.createCacheableData(mb); mo.acquireModify(mbc); mo.release(); + mbc.sum(); // calculate sum to forcefully materialize counts } } } @@ -288,11 +293,13 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv _stats.originalSize = mb.getInMemorySize(); _stats.originalCost = costEstimator.getCost(mb); + final double orgSum = mb.sum(k).getDouble(0, 0); + if(mb.isEmpty()) // empty input return empty compression return createEmpty(); res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference - + logInit(); classifyPhase(); if(compressionGroups == null) return abortCompression(); @@ -308,6 +315,12 @@ else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOv if(res == null) return abortCompression(); + final double afterComp = mb.sum(k).getDouble(0, 0); + + final double deltaSum = Math.abs(orgSum - afterComp); + + LOG.debug("compression Sum: Before:" + orgSum + " after: " + afterComp + " |delta|: " + deltaSum); + return new ImmutablePair<>(res, _stats); } @@ -334,7 +347,9 @@ private void classifyPhase() { final double scale = Math.sqrt(nCols); final double threshold = _stats.estimatedCostCols / scale; - if(threshold < _stats.originalCost) { + if(threshold < _stats.originalCost * ( + (costEstimator instanceof ComputationCostEstimator) && !(mb instanceof CompressedMatrixBlock) + ? 15 : 0.8)) { if(nCols > 1) coCodePhase(); else // LOG a short cocode phase (since there is one column we don't cocode) @@ -406,7 +421,7 @@ private void transposeHeuristics() { compSettings.transposed = false; break; default: - compSettings.transposed = transposeHeuristics(compressionGroups.getNumberColGroups() , mb); + compSettings.transposed = transposeHeuristics(compressionGroups.getNumberColGroups(), mb); } } @@ -442,20 +457,20 @@ private void finalizePhase() { _stats.compressedSize = res.getInMemorySize(); _stats.compressedCost = costEstimator.getCost(res.getColGroups(), res.getNumRows()); - - final double ratio = _stats.getRatio(); - final double denseRatio = _stats.getDenseRatio(); - _stats.setColGroupsCounts(res.getColGroups()); - if(ratio < 1 && denseRatio < 100.0) { + + if(_stats.compressedCost > _stats.originalCost) { LOG.info("--dense size: " + _stats.denseSize); LOG.info("--original size: " + _stats.originalSize); LOG.info("--compressed size: " + _stats.compressedSize); - LOG.info("--compression ratio: " + ratio); + LOG.info("--compression ratio: " + _stats.getRatio()); + LOG.info("--original Cost: " + _stats.originalCost); + LOG.info("--Compressed Cost: " + _stats.compressedCost); + LOG.info("--Cost Ratio: " + _stats.getCostRatio()); LOG.debug("--col groups types " + _stats.getGroupsTypesString()); LOG.debug("--col groups sizes " + _stats.getGroupsSizesString()); logLengths(); - LOG.info("Abort block compression because compression ratio is less than 1."); + LOG.info("Abort block compression because cost ratio is less than 1. "); res = null; setNextTimePhase(time.stop()); DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase); @@ -472,9 +487,23 @@ private void finalizePhase() { private Pair abortCompression() { LOG.warn("Compression aborted at phase: " + phase); + if(mb instanceof CompressedMatrixBlock) { + MatrixBlock ucmb = ((CompressedMatrixBlock) mb).getUncompressed("Decompressing for abort: ", k); + return new ImmutablePair<>(ucmb, _stats); + } return new ImmutablePair<>(mb, _stats); } + private void logInit() { + if(LOG.isDebugEnabled()) { + LOG.debug("--Seed used for comp : " + compSettings.seed); + LOG.debug(String.format("--number columns to compress: %10d", mb.getNumColumns())); + LOG.debug(String.format("--number rows to compress : %10d", mb.getNumRows())); + LOG.debug(String.format("--sparsity : %10.5f", mb.getSparsity())); + LOG.debug(String.format("--nonZeros : %10d", mb.getNonZeros())); + } + } + private void logPhase() { setNextTimePhase(time.stop()); DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase); @@ -486,7 +515,6 @@ private void logPhase() { else { switch(phase) { case 0: - LOG.debug("--Seed used for comp : " + compSettings.seed); LOG.debug("--compression phase " + phase + " Classify : " + getLastTimePhase()); LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols); if(mb instanceof CompressedMatrixBlock) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java index 062ccfc1201..e9a5782d03e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettings.java @@ -128,6 +128,9 @@ public class CompressionSettings { /** The sorting type used in sorting/joining offsets to create SDC groups */ public final SORT_TYPE sdcSortType; + + private static boolean printedStatus = false; + protected CompressionSettings(double samplingRatio, double samplePower, boolean allowSharedDictionary, String transposeInput, int seed, boolean lossy, EnumSet validCompressions, boolean sortValuesByLength, PartitionerType columnPartitioner, int maxColGroupCoCode, double coCodePercentage, @@ -151,8 +154,10 @@ protected CompressionSettings(double samplingRatio, double samplePower, boolean this.minimumCompressionRatio = minimumCompressionRatio; this.isInSparkInstruction = isInSparkInstruction; this.sdcSortType = sdcSortType; - if(LOG.isDebugEnabled()) + if(!printedStatus && LOG.isDebugEnabled()){ + printedStatus = true; LOG.debug(this.toString()); + } } public boolean isRLEAllowed(){ diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java index ec5512266e8..dc0908dc9bf 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java @@ -35,10 +35,7 @@ */ public class CompressionSettingsBuilder { private double samplingRatio; - // private double samplePower = 0.6; private double samplePower = 0.65; - // private double samplePower = 0.68; - // private double samplePower = 0.7; private boolean allowSharedDictionary = false; private String transposeInput; private int seed = -1; diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java index d54eb2c3525..01e7c8bc1a4 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java +++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionStatistics.java @@ -108,6 +108,10 @@ public double getRatio() { return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) originalSize / compressedSize; } + public double getCostRatio() { + return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) originalCost / compressedCost; + } + public double getDenseRatio() { return compressedSize == 0.0 ? Double.POSITIVE_INFINITY : (double) denseSize / compressedSize; } @@ -121,7 +125,7 @@ public String toString() { sb.append("\nCompressed Size : " + compressedSize); sb.append("\nCompressionRatio : " + getRatio()); sb.append("\nDenseCompressionRatio : " + getDenseRatio()); - + if(colGroupCounts != null) { sb.append("\nCompressionTypes : " + getGroupsTypesString()); sb.append("\nCompressionGroupSizes : " + getGroupsSizesString()); diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java index 86ebb4400e4..5b8c271a27e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java @@ -26,8 +26,8 @@ import java.util.List; import java.util.concurrent.ExecutorService; -// import jdk.incubator.vector.DoubleVector; -// import jdk.incubator.vector.VectorSpecies; +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; import org.apache.commons.lang3.NotImplementedException; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; @@ -75,7 +75,7 @@ public class ColGroupDDC extends APreAgg implements IMapToDataGroup { protected final AMapToData _data; - // static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED; + static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED; private ColGroupDDC(IColIndex colIndexes, IDictionary dict, AMapToData data, int[] cachedCounts) { super(colIndexes, dict, cachedCounts); @@ -606,8 +606,8 @@ public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, i final double[] c = ret.getDenseBlockValues(); final int kd = _colIndexes.size(); final int jd = right.getNumColumns(); - // final DoubleVector vVec = DoubleVector.zero(SPECIES); - final int vLen = 8; + final DoubleVector vVec = DoubleVector.zero(SPECIES); + final int vLen = SPECIES.length(); final int blkzI = 32; final int blkzK = 24; @@ -623,32 +623,31 @@ public void rightDecompressingMult(MatrixBlock right, MatrixBlock ret, int rl, i for(int k = bk; k < bke; k++) { final double aa = a[offi + k]; final int k_right = _colIndexes.get(k); - vectMM(aa, b, c, end, jd, crl, cru, offOut, k_right, vLen); + vectMM(aa, b, c, end, jd, crl, cru, offOut, k_right, vLen, vVec); } } } } } - final void vectMM(double aa, double[] b, double[] c, int endT, int jd, int crl, int cru, int offOut, int k, - int vLen) { - // vVec = vVec.broadcast(aa); + final void vectMM(double aa, double[] b, double[] c, int endT, int jd, int crl, int cru, int offOut, int k, int vLen, DoubleVector vVec) { + vVec = vVec.broadcast(aa); final int offj = k * jd; final int end = endT + offj; for(int j = offj + crl; j < end; j += vLen, offOut += vLen) { - // DoubleVector res = DoubleVector.fromArray(SPECIES, c, offOut); - // DoubleVector bVec = DoubleVector.fromArray(SPECIES, b, j); - // res = vVec.fma(bVec, res); - // res.intoArray(c, offOut); - - c[offOut] += aa * b[j]; - c[offOut + 1] += aa * b[j + 1]; - c[offOut + 2] += aa * b[j + 2]; - c[offOut + 3] += aa * b[j + 3]; - c[offOut + 4] += aa * b[j + 4]; - c[offOut + 5] += aa * b[j + 5]; - c[offOut + 6] += aa * b[j + 6]; - c[offOut + 7] += aa * b[j + 7]; + DoubleVector res = DoubleVector.fromArray(SPECIES, c, offOut); + DoubleVector bVec = DoubleVector.fromArray(SPECIES, b, j); + res = vVec.fma(bVec, res); + res.intoArray(c, offOut); + + // c[offOut] += aa * b[j]; + // c[offOut + 1] += aa * b[j + 1]; + // c[offOut + 2] += aa * b[j + 2]; + // c[offOut + 3] += aa * b[j + 3]; + // c[offOut + 4] += aa * b[j + 4]; + // c[offOut + 5] += aa * b[j + 5]; + // c[offOut + 6] += aa * b[j + 6]; + // c[offOut + 7] += aa * b[j + 7]; } for(int j = end; j < cru + offj; j++, offOut++) { double bb = b[j]; diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java index 12a063ad2a8..b225fd5a024 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java +++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/dictionary/MatrixBlockDictionary.java @@ -28,6 +28,8 @@ import java.util.HashSet; import java.util.Set; +import jdk.incubator.vector.DoubleVector; +import jdk.incubator.vector.VectorSpecies; import org.apache.commons.lang3.NotImplementedException; import org.apache.sysds.runtime.compress.DMLCompressionException; import org.apache.sysds.runtime.compress.colgroup.indexes.ArrayIndex; @@ -63,6 +65,8 @@ public class MatrixBlockDictionary extends ADictionary { final private MatrixBlock _data; + static final VectorSpecies SPECIES = DoubleVector.SPECIES_PREFERRED; + /** * Unsafe private constructor that does not check the data validity. USE WITH CAUTION. * @@ -2007,7 +2011,102 @@ private void preaggValuesFromDenseDictDenseAggArray(final int numVals, final ICo private void preaggValuesFromDenseDictDenseAggRange(final int numVals, final IColIndex colIndexes, final int s, final int e, final double[] b, final int cut, final double[] ret) { - preaggValuesFromDenseDictDenseAggRangeGeneric(numVals, colIndexes, s, e, b, cut, ret); + if(colIndexes instanceof RangeIndex) { + RangeIndex ri = (RangeIndex) colIndexes; + preaggValuesFromDenseDictDenseAggRangeRange(numVals, ri.get(0), ri.get(0) + ri.size(), s, e, b, cut, ret); + } + else + preaggValuesFromDenseDictDenseAggRangeGeneric(numVals, colIndexes, s, e, b, cut, ret); + } + + private void preaggValuesFromDenseDictDenseAggRangeRange(final int numVals, final int ls, final int le, final int rs, + final int re, final double[] b, final int cut, final double[] ret) { + final int cz = le - ls; + final int az = re - rs; + // final int nCells = numVals * cz; + final double[] values = _data.getDenseBlockValues(); + // Correctly named ikj matrix multiplication . + + final int blkzI = 32; + final int blkzK = 24; + final int blkzJ = 1024; + for(int bi = 0; bi < numVals; bi += blkzI) { + final int bie = Math.min(numVals, bi + blkzI); + for(int bk = 0; bk < cz; bk += blkzK) { + final int bke = Math.min(cz, bk + blkzK); + for(int bj = 0; bj < az; bj += blkzJ) { + final int bje = Math.min(az, bj + blkzJ); + final int sOffT = rs + bj; + final int eOffT = rs + bje; + preaggValuesFromDenseDictBlockedIKJ(values, b, ret, bi, bk, bj, bie, bke, cz, az, ls, cut, sOffT, eOffT); + // preaggValuesFromDenseDictBlockedIJK(values, b, ret, bi, bk, bj, bie, bke, bje, cz, az, ls, cut, sOffT, eOffT); + } + } + } + } + + // private static void preaggValuesFromDenseDictBlockedIJK(double[] a, double[] b, double[] ret, int bi, int bk, int bj, + // int bie, int bke, int bje, int cz, int az, int ls, int cut, int sOffT, int eOffT) { + // final int vLen = SPECIES.length(); + // final DoubleVector vVec = DoubleVector.zero(SPECIES); + // for(int i = bi; i < bie; i++) { + // final int offI = i * cz; + // final int offOutT = i * az + bj; + // int offOut = offOutT; + // final int end = (bje - bj) % vLen; + // for(int j = bj + sOffT; j < end + sOffT; j += vLen, offOut += vLen) { + // final DoubleVector res = DoubleVector.fromArray(SPECIES, ret, offOut); + // for(int k = bk; k < bke; k++) { + // final int idb = (k + ls) * cut; + // final double v = a[offI + k]; + // vVec.broadcast(v); + // DoubleVector bVec = DoubleVector.fromArray(SPECIES, b, idb + j); + // vVec.fma(bVec, res); + // } + // res.intoArray(ret, offOut); + // } + // for(int j = end + sOffT; j < bje + sOffT; j++, offOut++) { + // for(int k = bk; k < bke; k++) { + // final int idb = (k + ls) * cut; + // final double v = a[offI + k]; + // ret[offOut] += v * b[idb + j]; + // } + // } + // } + // } + + private static void preaggValuesFromDenseDictBlockedIKJ(double[] a, double[] b, double[] ret, int bi, int bk, int bj, + int bie, int bke, int cz, int az, int ls, int cut, int sOffT, int eOffT) { + final int vLen = SPECIES.length(); + final DoubleVector vVec = DoubleVector.zero(SPECIES); + final int leftover = sOffT - eOffT % vLen; // leftover not vectorized + for(int i = bi; i < bie; i++) { + final int offI = i * cz; + final int offOutT = i * az + bj; + for(int k = bk; k < bke; k++) { + final int idb = (k + ls) * cut; + final int sOff = sOffT + idb; + final int eOff = eOffT + idb; + final double v = a[offI + k]; + vecInnerLoop(v, b, ret, offOutT, eOff, sOff, leftover, vLen, vVec); + } + } + } + + private static void vecInnerLoop(final double v, final double[] b, final double[] ret, final int offOutT, + final int eOff, final int sOff, final int leftover, final int vLen, DoubleVector vVec) { + int offOut = offOutT; + vVec = vVec.broadcast(v); + final int end = eOff - leftover; + for(int j = sOff; j < end; j += vLen, offOut += vLen) { + DoubleVector res = DoubleVector.fromArray(SPECIES, ret, offOut); + DoubleVector bVec = DoubleVector.fromArray(SPECIES, b, j); + vVec.fma(bVec, res).intoArray(ret, offOut); + } + for(int j = end; j < eOff; j++, offOut++) { + ret[offOut] += v * b[j]; + } + } private void preaggValuesFromDenseDictDenseAggRangeGeneric(final int numVals, final IColIndex colIndexes, diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCBind.java similarity index 56% rename from src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java rename to src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCBind.java index cedf98494c6..49533e4bccc 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibAppend.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibCBind.java @@ -21,27 +21,60 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory; +import org.apache.sysds.runtime.compress.DMLCompressionException; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty; import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; -public final class CLALibAppend { +public final class CLALibCBind { - private CLALibAppend(){ + private CLALibCBind() { // private constructor. } - private static final Log LOG = LogFactory.getLog(CLALibAppend.class.getName()); + private static final Log LOG = LogFactory.getLog(CLALibCBind.class.getName()); - public static MatrixBlock append(MatrixBlock left, MatrixBlock right, int k) { + public static MatrixBlock cbind(MatrixBlock left, MatrixBlock[] right, int k) { + try { + + if(right.length == 1) { + return cbind(left, right[0], k); + } + else { + boolean allCompressed = true; + for(int i = 0; i < right.length && allCompressed; i++) + allCompressed = right[i] instanceof CompressedMatrixBlock; + if(allCompressed) + return cbindAllCompressed((CompressedMatrixBlock) left, right, k); + else + return cbindAllNormalCompressed(left, right, k); + } + } + catch(Exception e) { + throw new DMLCompressionException("Failed to Cbind with compressed input", e); + } + } + + private static MatrixBlock cbindAllNormalCompressed(MatrixBlock left, MatrixBlock[] right, int k) { + for(int i = 0; i < right.length; i++) { + left = cbind(left, right[i], k); + } + return left; + } + + public static MatrixBlock cbind(MatrixBlock left, MatrixBlock right, int k) { final int m = left.getNumRows(); final int n = left.getNumColumns() + right.getNumColumns(); @@ -66,6 +99,9 @@ else if(right.isEmpty() && left instanceof CompressedMatrixBlock) final double spar = (left.getNonZeros() + right.getNonZeros()) / ((double) m * n); final double estSizeUncompressed = MatrixBlock.estimateSizeInMemory(m, n, spar); final double estSizeCompressed = left.getInMemorySize() + right.getInMemorySize(); + // if(isAligned((CompressedMatrixBlock) left, (CompressedMatrixBlock) right)) + // return combineCompressed((CompressedMatrixBlock) left, (CompressedMatrixBlock) right); + // else if(estSizeUncompressed < estSizeCompressed) return uc(left).append(uc(right), null); else if(left instanceof CompressedMatrixBlock) @@ -73,8 +109,86 @@ else if(left instanceof CompressedMatrixBlock) else return appendLeftUncompressed(left, (CompressedMatrixBlock) right, m, n); } + if(isAligned((CompressedMatrixBlock) left, (CompressedMatrixBlock) right)) + return combineCompressed((CompressedMatrixBlock) left, (CompressedMatrixBlock) right); + else + return append((CompressedMatrixBlock) left, (CompressedMatrixBlock) right, m, n); + } + + private static MatrixBlock cbindAllCompressed(CompressedMatrixBlock left, MatrixBlock[] right, int k) + throws InterruptedException, ExecutionException { + + final int nCol = left.getNumColumns(); + for(int i = 0; i < right.length; i++) { + CompressedMatrixBlock rightCM = ((CompressedMatrixBlock) right[i]); + if(nCol != right[i].getNumColumns() || !isAligned(left, rightCM)) + return cbindAllNormalCompressed(left, right, k); + } + return cbindAllCompressedAligned(left, right, k); + + } + + private static boolean isAligned(CompressedMatrixBlock left, CompressedMatrixBlock right) { + final List gl = left.getColGroups(); + for(int j = 0; j < gl.size(); j++) { + final AColGroup glj = gl.get(j); + final int aColumnInGroup = glj.getColIndices().get(0); + final AColGroup grj = right.getColGroupForColumn(aColumnInGroup); + + if(!glj.sameIndexStructure(grj) || glj.getNumCols() != grj.getNumCols()) + return false; + + } + return true; + } + + private static CompressedMatrixBlock combineCompressed(CompressedMatrixBlock left, CompressedMatrixBlock right) { + final List gl = left.getColGroups(); + final List retCG = new ArrayList<>(gl.size()); + for(int j = 0; j < gl.size(); j++) { + AColGroup glj = gl.get(j); + int aColumnInGroup = glj.getColIndices().get(0); + AColGroup grj = right.getColGroupForColumn(aColumnInGroup); + // parallel combine... + retCG.add(glj.combineWithSameIndex(left.getNumRows(), left.getNumColumns(), grj)); + } + return new CompressedMatrixBlock(left.getNumRows(), left.getNumColumns() + right.getNumColumns(), + left.getNonZeros() + right.getNonZeros(), false, retCG); + } + + private static CompressedMatrixBlock cbindAllCompressedAligned(CompressedMatrixBlock left, MatrixBlock[] right, + final int k) throws InterruptedException, ExecutionException { + + final ExecutorService pool = CommonThreadPool.get(k); + try { + final List gl = left.getColGroups(); + final List> tasks = new ArrayList<>(); + final int nCol = left.getNumColumns(); + final int nRow = left.getNumRows(); + for(int i = 0; i < gl.size(); i++) { + final AColGroup gli = gl.get(i); + tasks.add(pool.submit(() -> { + List combines = new ArrayList<>(); + final int cId = gli.getColIndices().get(0); + for(int j = 0; j < right.length; j++) { + combines.add(((CompressedMatrixBlock) right[j]).getColGroupForColumn(cId)); + } + return gli.combineWithSameIndex(nRow, nCol, combines); + })); + } + + final List retCG = new ArrayList<>(gl.size()); + for(Future t : tasks) + retCG.add(t.get()); + + int totalCol = nCol + right.length * nCol; + + return new CompressedMatrixBlock(left.getNumRows(), totalCol, -1, false, retCG); + } + finally { + pool.shutdown(); + } - return append((CompressedMatrixBlock) left, (CompressedMatrixBlock) right, m, n); } private static MatrixBlock appendLeftUncompressed(MatrixBlock left, CompressedMatrixBlock right, final int m, @@ -123,17 +237,17 @@ private static MatrixBlock append(CompressedMatrixBlock left, CompressedMatrixBl ret.setNonZeros(left.getNonZeros() + right.getNonZeros()); ret.setOverlapping(left.isOverlapping() || right.isOverlapping()); - final double compressedSize = ret.getInMemorySize(); - final double uncompressedSize = MatrixBlock.estimateSizeInMemory(m, n, ret.getSparsity()); + // final double compressedSize = ret.getInMemorySize(); + // final double uncompressedSize = MatrixBlock.estimateSizeInMemory(m, n, ret.getSparsity()); - if(compressedSize < uncompressedSize) - return ret; - else { - final double ratio = uncompressedSize / compressedSize; - String message = String.format("Decompressing c bind matrix because it had to small compression ratio: %2.3f", - ratio); - return ret.getUncompressed(message); - } + // if(compressedSize < uncompressedSize) + return ret; + // else { + // final double ratio = uncompressedSize / compressedSize; + // String message = String.format("Decompressing c bind matrix because it had to small compression ratio: %2.3f", + // ratio); + // return ret.getUncompressed(message); + // } } private static MatrixBlock appendRightEmpty(CompressedMatrixBlock left, MatrixBlock right, int m, int n) { diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMMChain.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMMChain.java index 6207460d3d2..d82d58e323e 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMMChain.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMMChain.java @@ -34,6 +34,7 @@ import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; import org.apache.sysds.runtime.matrix.data.MatrixBlock; import org.apache.sysds.runtime.matrix.operators.BinaryOperator; +import org.apache.sysds.utils.stats.Timing; /** * Support compressed MM chain operation to fuse the following cases : @@ -53,6 +54,9 @@ public final class CLALibMMChain { static final Log LOG = LogFactory.getLog(CLALibMMChain.class.getName()); + /** Reusable cache intermediate double array for temporary decompression */ + private static ThreadLocal cacheIntermediate = null; + private CLALibMMChain() { // private constructor } @@ -87,20 +91,31 @@ private CLALibMMChain() { public static MatrixBlock mmChain(CompressedMatrixBlock x, MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) { + Timing t = new Timing(); if(x.isEmpty()) return returnEmpty(x, out); // Morph the columns to efficient types for the operation. x = filterColGroups(x); + double preFilterTime = t.stop(); // Allow overlapping intermediate if the intermediate is guaranteed not to be overlapping. final boolean allowOverlap = x.getColGroups().size() == 1 && isOverlappingAllowed(); // Right hand side multiplication - MatrixBlock tmp = CLALibRightMultBy.rightMultByMatrix(x, v, null, k, allowOverlap); + MatrixBlock tmp = CLALibRightMultBy.rightMultByMatrix(x, v, null, k, true); + + double rmmTime = t.stop(); - if(ctype == ChainType.XtwXv) // Multiply intermediate with vector if needed + if(ctype == ChainType.XtwXv) { // Multiply intermediate with vector if needed tmp = binaryMultW(tmp, w, k); + } + + if(!allowOverlap && tmp instanceof CompressedMatrixBlock) { + tmp = decompressIntermediate((CompressedMatrixBlock) tmp, k); + } + + double decompressTime = t.stop(); if(tmp instanceof CompressedMatrixBlock) // Compressed Compressed Matrix Multiplication @@ -109,12 +124,50 @@ public static MatrixBlock mmChain(CompressedMatrixBlock x, MatrixBlock v, Matrix // LMM with Compressed - uncompressed multiplication. CLALibLeftMultBy.leftMultByMatrixTransposed(x, tmp, out, k); + double lmmTime = t.stop(); if(out.getNumColumns() != 1) // transpose the output to make it a row output if needed out = LibMatrixReorg.transposeInPlace(out, k); + if(LOG.isDebugEnabled()) { + StringBuilder sb = new StringBuilder("\n"); + sb.append("\nPreFilter Time : " + preFilterTime); + sb.append("\nChain RMM : " + rmmTime); + sb.append("\nChain RMM Decompress: " + decompressTime); + sb.append("\nChain LMM : " + lmmTime); + sb.append("\nChain Transpose : " + t.stop()); + LOG.debug(sb.toString()); + } + return out; } + private static MatrixBlock decompressIntermediate(CompressedMatrixBlock tmp, int k) { + // cacheIntermediate + final int rows = tmp.getNumRows(); + final int cols = tmp.getNumColumns(); + final int nCells = rows * cols; + final double[] tmpArr; + if(cacheIntermediate == null) { + tmpArr = new double[nCells]; + cacheIntermediate = new ThreadLocal<>(); + cacheIntermediate.set(tmpArr); + } + else { + double[] cachedArr = cacheIntermediate.get(); + if(cachedArr == null || cachedArr.length < nCells) { + tmpArr = new double[nCells]; + cacheIntermediate.set(tmpArr); + } + else { + tmpArr = cachedArr; + } + } + + final MatrixBlock tmpV = new MatrixBlock(tmp.getNumRows(), tmp.getNumColumns(), tmpArr); + CLALibDecompress.decompressTo((CompressedMatrixBlock) tmp, tmpV, 0, 0, k, false, true); + return tmpV; + } + private static boolean isOverlappingAllowed() { return ConfigurationManager.getDMLConfig().getBooleanValue(DMLConfig.COMPRESSED_OVERLAPPING); } @@ -146,6 +199,8 @@ private static CompressedMatrixBlock filterColGroups(CompressedMatrixBlock x) { final List groups = x.getColGroups(); final boolean shouldFilter = CLALibUtils.shouldPreFilter(groups); if(shouldFilter) { + if(CLALibUtils.alreadyPreFiltered(groups, x.getNumColumns())) + return x; final int nCol = x.getNumColumns(); final double[] constV = new double[nCol]; final List filteredGroups = CLALibUtils.filterGroups(groups, constV); diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMatrixMult.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMatrixMult.java index 92594000458..237c943ca3b 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMatrixMult.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibMatrixMult.java @@ -41,7 +41,7 @@ public static MatrixBlock matrixMultiply(MatrixBlock m1, MatrixBlock m2, MatrixB public static MatrixBlock matrixMultiply(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, int k, boolean transposeLeft, boolean transposeRight) { - + if(m1 instanceof CompressedMatrixBlock && m2 instanceof CompressedMatrixBlock) { return doubleCompressedMatrixMultiply((CompressedMatrixBlock) m1, (CompressedMatrixBlock) m2, ret, k, transposeLeft, transposeRight); diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReorg.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReorg.java new file mode 100644 index 00000000000..9a869453adb --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReorg.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.lib; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.DMLCompressionException; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.data.DenseBlock; +import org.apache.sysds.runtime.data.SparseBlock; +import org.apache.sysds.runtime.data.SparseBlockMCSR; +import org.apache.sysds.runtime.functionobjects.SwapIndex; +import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.matrix.operators.ReorgOperator; +import org.apache.sysds.runtime.util.CommonThreadPool; + +public class CLALibReorg { + + protected static final Log LOG = LogFactory.getLog(CLALibReorg.class.getName()); + + public static boolean warned = false; + + public static MatrixBlock reorg(CompressedMatrixBlock cmb, ReorgOperator op, MatrixBlock ret, int startRow, + int startColumn, int length) { + // SwapIndex is transpose + if(op.fn instanceof SwapIndex && cmb.getNumColumns() == 1) { + MatrixBlock tmp = cmb.decompress(op.getNumThreads()); + long nz = tmp.setNonZeros(tmp.getNonZeros()); + if(tmp.isInSparseFormat()) + return LibMatrixReorg.transpose(tmp); // edge case... + else + tmp = new MatrixBlock(tmp.getNumColumns(), tmp.getNumRows(), tmp.getDenseBlockValues()); + tmp.setNonZeros(nz); + return tmp; + } + else if(op.fn instanceof SwapIndex) { + if(cmb.getCachedDecompressed() != null) + return cmb.getCachedDecompressed().reorgOperations(op, ret, startRow, startColumn, length); + + return transpose(cmb, ret, op.getNumThreads()); + } + else { + // Allow transpose to be compressed output. In general we need to have a transposed flag on + // the compressed matrix. https://issues.apache.org/jira/browse/SYSTEMDS-3025 + String message = !warned ? op.getClass().getSimpleName() + " -- " + op.fn.getClass().getSimpleName() : null; + MatrixBlock tmp = cmb.getUncompressed(message, op.getNumThreads()); + warned = true; + return tmp.reorgOperations(op, ret, startRow, startColumn, length); + } + } + + private static MatrixBlock transpose(CompressedMatrixBlock cmb, MatrixBlock ret, int k) { + + final long nnz = cmb.getNonZeros(); + final int nRow = cmb.getNumRows(); + final int nCol = cmb.getNumColumns(); + final boolean sparseOut = MatrixBlock.evalSparseFormatInMemory(nRow, nCol, nnz); + if(sparseOut) + return transposeSparse(cmb, ret, k, nRow, nCol, nnz); + else + return transposeDense(cmb, ret, k, nRow, nCol, nnz); + } + + private static MatrixBlock transposeSparse(CompressedMatrixBlock cmb, MatrixBlock ret, int k, int nRow, int nCol, + long nnz) { + if(ret == null) + ret = new MatrixBlock(nCol, nRow, true, nnz); + else + ret.reset(nCol, nRow, true, nnz); + + ret.allocateAndResetSparseBlock(true, SparseBlock.Type.MCSR); + + final int nColOut = ret.getNumColumns(); + + if(k > 1) + decompressToTransposedSparseParallel((SparseBlockMCSR) ret.getSparseBlock(), cmb.getColGroups(), nColOut, k); + else + decompressToTransposedSparseSingleThread((SparseBlockMCSR) ret.getSparseBlock(), cmb.getColGroups(), nColOut); + + return ret; + } + + private static MatrixBlock transposeDense(CompressedMatrixBlock cmb, MatrixBlock ret, int k, int nRow, int nCol, + long nnz) { + if(ret == null) + ret = new MatrixBlock(nCol, nRow, false, nnz); + else + ret.reset(nCol, nRow, false, nnz); + + // TODO: parallelize + ret.allocateDenseBlock(); + + decompressToTransposedDense(ret.getDenseBlock(), cmb.getColGroups(), nRow, 0, nRow); + return ret; + } + + private static void decompressToTransposedDense(DenseBlock ret, List groups, int rlen, int rl, int ru) { + for(int i = 0; i < groups.size(); i++) { + AColGroup g = groups.get(i); + g.decompressToDenseBlockTransposed(ret, rl, ru); + } + } + + private static void decompressToTransposedSparseSingleThread(SparseBlockMCSR ret, List groups, + int nColOut) { + for(int i = 0; i < groups.size(); i++) { + AColGroup g = groups.get(i); + g.decompressToSparseBlockTransposed(ret, nColOut); + } + } + + private static void decompressToTransposedSparseParallel(SparseBlockMCSR ret, List groups, int nColOut, + int k) { + final ExecutorService pool = CommonThreadPool.get(k); + try { + final List> tasks = new ArrayList<>(groups.size()); + + for(int i = 0; i < groups.size(); i++) { + final AColGroup g = groups.get(i); + tasks.add(pool.submit(() -> g.decompressToSparseBlockTransposed(ret, nColOut))); + } + + for(Future f : tasks) + f.get(); + + } + catch(Exception e) { + throw new DMLCompressionException("Failed to parallel decompress transpose sparse", e); + } + finally { + pool.shutdown(); + } + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReplace.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReplace.java new file mode 100644 index 00000000000..8121a82f4f9 --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReplace.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.lib; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; + +public class CLALibReplace { + private static final Log LOG = LogFactory.getLog(CLALibReplace.class.getName()); + + public static MatrixBlock replace(CompressedMatrixBlock in, MatrixBlock out, double pattern, double replacement, + int k) { + if(Double.isInfinite(pattern)) { + LOG.info("Ignoring replace infinite in compression since it does not contain this value"); + return in; + } + else if(in.isOverlapping()) { + final String message = "replaceOperations " + pattern + " -> " + replacement; + return in.getUncompressed(message).replaceOperations(out, pattern, replacement); + } + else + return replaceNormal(in, out, pattern, replacement, k); + } + + private static MatrixBlock replaceNormal(CompressedMatrixBlock in, MatrixBlock out, double pattern, + double replacement, int k) { + CompressedMatrixBlock ret = new CompressedMatrixBlock(in.getNumRows(), in.getNumColumns()); + final List prev = in.getColGroups(); + final int colGroupsLength = prev.size(); + final List retList = new ArrayList<>(colGroupsLength); + + if(k <= 0) { + for(int i = 0; i < colGroupsLength; i++) + retList.add(prev.get(i).replace(pattern, replacement)); + } + else { + ExecutorService pool = CommonThreadPool.get(k); + + try { + List> tasks = new ArrayList<>(colGroupsLength); + for(int i = 0; i < colGroupsLength; i++) { + final int j = i; + tasks.add(pool.submit(() -> prev.get(j).replace(pattern, replacement))); + } + for(int i = 0; i < colGroupsLength; i++) { + retList.add(tasks.get(i).get()); + } + } + catch(Exception e) { + throw new RuntimeException("Failed parallel replace", e); + } + finally { + pool.shutdown(); + } + } + + ret.allocateColGroupList(retList); + if(replacement == 0) + ret.recomputeNonZeros(); + else if( pattern == 0) + ret.setNonZeros(((long)in.getNumRows()) * in.getNumColumns()); + else + ret.setNonZeros(in.getNonZeros()); + return ret; + } +} diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReshape.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReshape.java index f91779385dc..33ffd34f3cd 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReshape.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibReshape.java @@ -161,7 +161,11 @@ private void checkValidity() { private boolean shouldItBeCompressedOutputs() { // The number of rows in the reshaped allocations is fairly large. +<<<<<<< HEAD return rlen > COMPRESSED_RESHAPE_THRESHOLD && rowwise && +======= + return rlen > COMPRESSED_RESHAPE_THRESHOLD && +>>>>>>> 6f381cbea2 ([DO NOT MERGE][skip ci] JAVA 17 BWARE COMMIT) // the reshape is a clean multiplier of number of rows, meaning each column group cleanly reshape into x others (double) rlen / rows % 1.0 == 0.0; } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java index 966051cd8bd..2b652131602 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibRightMultBy.java @@ -34,19 +34,17 @@ import org.apache.sysds.runtime.compress.CompressedMatrixBlock; import org.apache.sysds.runtime.compress.colgroup.AColGroup; import org.apache.sysds.runtime.compress.colgroup.ColGroupConst; -import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; +import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed; import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; -import org.apache.sysds.runtime.functionobjects.Plus; import org.apache.sysds.runtime.matrix.data.LibMatrixMult; import org.apache.sysds.runtime.matrix.data.MatrixBlock; -import org.apache.sysds.runtime.matrix.operators.BinaryOperator; import org.apache.sysds.runtime.util.CommonThreadPool; public final class CLALibRightMultBy { private static final Log LOG = LogFactory.getLog(CLALibRightMultBy.class.getName()); - private CLALibRightMultBy(){ + private CLALibRightMultBy() { // private constructor } @@ -74,6 +72,11 @@ public static MatrixBlock rightMultByMatrix(CompressedMatrixBlock m1, MatrixBloc if(m2 instanceof CompressedMatrixBlock) m2 = ((CompressedMatrixBlock) m2).getUncompressed("Uncompressed right side of right MM", k); + if(betterIfDecompressed(m1)) { + // perform uncompressed multiplication. + return decompressingMatrixMult(m1, m2, k); + } + if(!allowOverlap) { LOG.trace("Overlapping output not allowed in call to Right MM"); return RMM(m1, m2, k); @@ -87,14 +90,67 @@ public static MatrixBlock rightMultByMatrix(CompressedMatrixBlock m1, MatrixBloc if(retC.isOverlapping()) retC.setNonZeros((long) rr * rc); // set non zeros to fully dense in case of overlapping. else - retC.recomputeNonZeros(); // recompute if non overlapping compressed out. + retC.recomputeNonZeros(k); // recompute if non overlapping compressed out. return retC; } } + } + + private static MatrixBlock decompressingMatrixMult(CompressedMatrixBlock m1, MatrixBlock m2, int k) { + ExecutorService pool = CommonThreadPool.get(k); + try { + final int rl = m1.getNumRows(); + final int cr = m2.getNumColumns(); + // final int rr = m2.getNumRows(); // shared dim + final MatrixBlock ret = new MatrixBlock(rl, cr, false); + ret.allocateBlock(); + + // MatrixBlock m1uc = m1.decompress(k); + final List> tasks = new ArrayList<>(); + final List groups = m1.getColGroups(); + final int blkI = Math.max((int) Math.ceil((double) rl / k), 16); + final int blkJ = blkI > 16 ? cr : Math.max((cr / k), 512); // make it a multiplicative of 8. + for(int i = 0; i < rl; i += blkI) { + final int startI = i; + final int endI = Math.min(i + blkI, rl); + for(int j = 0; j < cr; j += blkJ){ + final int startJ = j; + final int endJ = Math.min(j + blkJ, cr); + tasks.add(pool.submit(() -> { + for(AColGroup g : groups) + g.rightDecompressingMult(m2, ret, startI, endI, rl, startJ, endJ); + return ret.recomputeNonZeros(startI, endI - 1, startJ, endJ-1); + })); + } + } + long nnz = 0; + for(Future t : tasks) + nnz += t.get(); + + ret.setNonZeros(nnz); + ret.examSparsity(); + return ret; + } + catch(InterruptedException | ExecutionException e) { + throw new DMLRuntimeException(e); + } + finally { + pool.shutdown(); + } } + private static boolean betterIfDecompressed(CompressedMatrixBlock m) { + for(AColGroup g : m.getColGroups()) { + if(!(g instanceof ColGroupUncompressed) && g.getNumValues() * 2 >= m.getNumRows()) { + return true; + } + } + return false; + } + private static CompressedMatrixBlock RMMOverlapping(CompressedMatrixBlock m1, MatrixBlock that, int k) { + final int rl = m1.getNumRows(); final int cr = that.getNumColumns(); final int rr = that.getNumRows(); // shared dim @@ -103,13 +159,19 @@ private static CompressedMatrixBlock RMMOverlapping(CompressedMatrixBlock m1, Ma final CompressedMatrixBlock ret = new CompressedMatrixBlock(rl, cr); final boolean shouldFilter = CLALibUtils.shouldPreFilter(colGroups); + final double[] constV; + final List filteredGroups; - double[] constV = shouldFilter ? new double[rr] : null; - final List filteredGroups = CLALibUtils.filterGroups(colGroups, constV); - if(colGroups == filteredGroups) + if(shouldFilter) { + constV = new double[rr]; + filteredGroups = CLALibUtils.filterGroups(colGroups, constV); + } + else { + filteredGroups = colGroups; constV = null; + } - if(k == 1) + if(k == 1 || filteredGroups.size() == 1) RMMSingle(filteredGroups, that, retCg); else RMMParallel(filteredGroups, that, retCg, k); @@ -117,7 +179,7 @@ private static CompressedMatrixBlock RMMOverlapping(CompressedMatrixBlock m1, Ma if(constV != null) { final MatrixBlock cb = new MatrixBlock(1, constV.length, constV); final MatrixBlock cbRet = new MatrixBlock(1, that.getNumColumns(), false); - LibMatrixMult.matrixMult(cb, that, cbRet); + LibMatrixMult.matrixMult(cb, that, cbRet); // mm on row vector left. if(!cbRet.isEmpty()) addConstant(cbRet, retCg); } @@ -133,35 +195,39 @@ private static CompressedMatrixBlock RMMOverlapping(CompressedMatrixBlock m1, Ma } private static void addConstant(MatrixBlock constantRow, List out) { - final int nCol = constantRow.getNumColumns(); - int bestCandidate = -1; - int bestCandidateValuesSize = Integer.MAX_VALUE; - for(int i = 0; i < out.size(); i++) { - AColGroup g = out.get(i); - if(g instanceof ColGroupDDC && g.getNumCols() == nCol && g.getNumValues() < bestCandidateValuesSize) - bestCandidate = i; - } + // it is fairly safe to add the constant row to a column group. + // but it is not necessary the fastest. + + // final int nCol = constantRow.getNumColumns(); + // int bestCandidate = -1; + // int bestCandidateValuesSize = Integer.MAX_VALUE; + // for(int i = 0; i < out.size(); i++) { + // AColGroup g = out.get(i); + // if(g instanceof ColGroupDDC && g.getNumCols() == nCol && g.getNumValues() < bestCandidateValuesSize) + // bestCandidate = i; + // } constantRow.sparseToDense(); - if(bestCandidate != -1) { - AColGroup bc = out.get(bestCandidate); - out.remove(bestCandidate); - AColGroup ng = bc.binaryRowOpRight(new BinaryOperator(Plus.getPlusFnObject(), 1), - constantRow.getDenseBlockValues(), true); - out.add(ng); - } - else - out.add(ColGroupConst.create(constantRow.getDenseBlockValues())); + // if(bestCandidate != -1) { + // AColGroup bc = out.get(bestCandidate); + // out.remove(bestCandidate); + // AColGroup ng = bc.binaryRowOpRight(new BinaryOperator(Plus.getPlusFnObject(), 1), + // constantRow.getDenseBlockValues(), true); + // out.add(ng); + // } + // else + out.add(ColGroupConst.create(constantRow.getDenseBlockValues())); } private static MatrixBlock RMM(CompressedMatrixBlock m1, MatrixBlock that, int k) { + + // Timing t = new Timing(); // this version returns a decompressed result. final int rl = m1.getNumRows(); final int cr = that.getNumColumns(); final int rr = that.getNumRows(); // shared dim final List colGroups = m1.getColGroups(); - final List retCg = new ArrayList<>(); final boolean shouldFilter = CLALibUtils.shouldPreFilter(colGroups); @@ -169,16 +235,32 @@ private static MatrixBlock RMM(CompressedMatrixBlock m1, MatrixBlock that, int k MatrixBlock ret = new MatrixBlock(rl, cr, false); final Future f = ret.allocateBlockAsync(); - double[] constV = shouldFilter ? new double[rr] : null; - final List filteredGroups = CLALibUtils.filterGroups(colGroups, constV); - if(colGroups == filteredGroups) + double[] constV; + final List filteredGroups; + + if(shouldFilter) { + if(CLALibUtils.alreadyPreFiltered(colGroups, cr)) { + filteredGroups = new ArrayList<>(colGroups.size() - 1); + constV = CLALibUtils.filterGroupsAndSplitPreAggOneConst(colGroups, filteredGroups); + } + else { + constV = new double[rr]; + filteredGroups = CLALibUtils.filterGroups(colGroups, constV); + } + } + else { + filteredGroups = colGroups; constV = null; + } + + final List retCg = new ArrayList<>(filteredGroups.size()); if(k == 1) RMMSingle(filteredGroups, that, retCg); else RMMParallel(filteredGroups, that, retCg, k); + if(constV != null) { MatrixBlock constVMB = new MatrixBlock(1, constV.length, constV); MatrixBlock mmTemp = new MatrixBlock(1, cr, false); @@ -233,7 +315,7 @@ private static boolean RMMParallel(List filteredGroups, MatrixBlock t catch(InterruptedException | ExecutionException e) { throw new DMLRuntimeException(e); } - finally{ + finally { pool.shutdown(); } return containsNull; diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTSMM.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTSMM.java index 5f5e63c9ac0..a1d47a9b150 100644 --- a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTSMM.java +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTSMM.java @@ -52,8 +52,15 @@ private CLALibTSMM() { * @param k The parallelization degree allowed */ public static void leftMultByTransposeSelf(CompressedMatrixBlock cmb, MatrixBlock ret, int k) { + final List groups = cmb.getColGroups(); + final int numColumns = cmb.getNumColumns(); + if(groups.size() >= numColumns) { + MatrixBlock m = cmb.getUncompressed("TSMM to many columngroups", k); + LibMatrixMult.matrixMultTransposeSelf(m, ret, true, k); + return; + } final int numRows = cmb.getNumRows(); final boolean shouldFilter = CLALibUtils.shouldPreFilter(groups); final boolean overlapping = cmb.isOverlapping(); @@ -63,8 +70,10 @@ public static void leftMultByTransposeSelf(CompressedMatrixBlock cmb, MatrixBloc tsmmColGroups(filteredGroups, ret, numRows, overlapping, k); addCorrectionLayer(filteredGroups, ret, numRows, numColumns, constV); } - else + else { + tsmmColGroups(groups, ret, numRows, overlapping, k); + } ret.setNonZeros(LibMatrixMult.copyUpperToLowerTriangle(ret)); ret.examSparsity(); @@ -77,10 +86,7 @@ private static void addCorrectionLayer(List filteredGroups, MatrixBlo addCorrectionLayer(constV, filteredColSum, nRows, retV); } - public static void addCorrectionLayer(double[] constV, double[] correctedSum, int nRow, double[] ret) { - outerProductUpperTriangle(constV, correctedSum, ret); - outerProductUpperTriangleWithScaling(correctedSum, constV, nRow, ret); - } + private static void tsmmColGroups(List groups, MatrixBlock ret, int nRows, boolean overlapping, int k) { if(k <= 1) @@ -108,7 +114,7 @@ private static void tsmmColGroupsMultiThreadOverlapping(List groups, } private static void tsmmColGroupsMultiThread(List groups, MatrixBlock ret, int nRows, int k) { - final ExecutorService pool = CommonThreadPool.get(k); + final ExecutorService pool = CommonThreadPool.get(k); try { final ArrayList> tasks = new ArrayList<>((groups.size() * (1 + groups.size())) / 2); for(int i = 0; i < groups.size(); i++) { @@ -123,31 +129,19 @@ private static void tsmmColGroupsMultiThread(List groups, MatrixBlock catch(InterruptedException | ExecutionException e) { throw new DMLRuntimeException(e); } - finally{ + finally { pool.shutdown(); } } - private static void outerProductUpperTriangle(final double[] leftRowSum, final double[] rightColumnSum, - final double[] result) { - for(int row = 0; row < leftRowSum.length; row++) { - final int offOut = rightColumnSum.length * row; - final double vLeft = leftRowSum[row]; - for(int col = row; col < rightColumnSum.length; col++) { - result[offOut + col] += vLeft * rightColumnSum[col]; - } - } - } - - private static void outerProductUpperTriangleWithScaling(final double[] leftRowSum, final double[] rightColumnSum, - final int scale, final double[] result) { - // note this scaling is a bit different since it is encapsulating two scalar multiplications via an addition in - // the outer loop. - for(int row = 0; row < leftRowSum.length; row++) { - final int offOut = rightColumnSum.length * row; - final double vLeft = leftRowSum[row] + rightColumnSum[row] * scale; - for(int col = row; col < rightColumnSum.length; col++) { - result[offOut + col] += vLeft * rightColumnSum[col]; + public static void addCorrectionLayer(double[] constV, double[] filteredColSum, int nRow, double[] ret) { + final int nColRow = constV.length; + for(int row = 0; row < nColRow; row++){ + int offOut = nColRow * row; + final double v1l = constV[row]; + final double v2l = filteredColSum[row] + constV[row] * nRow; + for(int col = row; col < nColRow; col++){ + ret[offOut + col] += v1l * filteredColSum[col] + v2l * constV[col]; } } } diff --git a/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTable.java b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTable.java new file mode 100644 index 00000000000..aa3d384263c --- /dev/null +++ b/src/main/java/org/apache/sysds/runtime/compress/lib/CLALibTable.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysds.runtime.compress.lib; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.compress.CompressedMatrixBlock; +import org.apache.sysds.runtime.compress.colgroup.AColGroup; +import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC; +import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; +import org.apache.sysds.runtime.compress.colgroup.dictionary.IdentityDictionary; +import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; +import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; +import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; +import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; +import org.apache.sysds.runtime.matrix.data.MatrixBlock; +import org.apache.sysds.runtime.util.CommonThreadPool; +import org.apache.sysds.runtime.util.UtilFunctions; +import org.apache.sysds.utils.stats.InfrastructureAnalyzer; + +public class CLALibTable { + + protected static final Log LOG = LogFactory.getLog(CLALibTable.class.getName()); + + private CLALibTable() { + // empty constructor + } + + public static MatrixBlock tableSeqOperations(int seqHeight, MatrixBlock A, int nColOut){ + + int k = InfrastructureAnalyzer.getLocalParallelism(); + final int[] map = new int[seqHeight]; + int maxCol = constructInitialMapping(map, A, k); + boolean containsNull = maxCol < 0; + maxCol = Math.abs(maxCol); + + if(nColOut == -1) + nColOut = maxCol; + else if(nColOut < maxCol) + throw new DMLRuntimeException("invalid nColOut, requested: " + nColOut + " but have to be : " + maxCol); + + final int nNulls = containsNull ? correctNulls(map, nColOut) : 0; + if(nColOut == 0) // edge case of empty zero dimension block. + return new MatrixBlock(seqHeight, 0, 0.0); + return createCompressedReturn(map, nColOut, seqHeight, nNulls, containsNull, k); + } + + private static CompressedMatrixBlock createCompressedReturn(int[] map, int nColOut, int seqHeight, int nNulls, + boolean containsNull, int k) { + // create a single DDC Column group. + final IColIndex i = ColIndexFactory.create(0, nColOut); + final ADictionary d = new IdentityDictionary(nColOut, containsNull); + final AMapToData m = MapToFactory.create(seqHeight, map, nColOut + (containsNull ? 1 : 0), k); + final AColGroup g = ColGroupDDC.create(i, d, m, null); + + final CompressedMatrixBlock cmb = new CompressedMatrixBlock(seqHeight, nColOut); + cmb.allocateColGroup(g); + cmb.setNonZeros(seqHeight - nNulls); + return cmb; + } + + private static int correctNulls(int[] map, int nColOut) { + int nNulls = 0; + for(int i = 0; i < map.length; i++) { + if(map[i] == -1) { + map[i] = nColOut; + nNulls++; + } + } + return nNulls; + } + + private static int constructInitialMapping(int[] map, MatrixBlock A, int k) { + if(A.isEmpty() || A.isInSparseFormat()) + throw new DMLRuntimeException("not supported empty or sparse construction of seq table"); + + ExecutorService pool = CommonThreadPool.get(k); + try { + + int blkz = Math.max((map.length / k), 1000); + List> tasks = new ArrayList<>(); + for(int i = 0; i < map.length; i+= blkz){ + final int start = i; + final int end = Math.min(i + blkz, map.length); + tasks.add(pool.submit(() -> partialMapping(map, A, start, end))); + } + + int maxCol = 0; + for( Future f : tasks){ + int tmp = f.get(); + if(Math.abs(tmp) >Math.abs(maxCol)) + maxCol = tmp; + } + return maxCol; + } + catch(Exception e) { + throw new DMLRuntimeException(e); + } + finally { + pool.shutdown(); + } + + } + + private static int partialMapping(int[] map, MatrixBlock A, int start, int end) { + + int maxCol = 0; + boolean containsNull = false; + final double[] aVals = A.getDenseBlockValues(); + + for(int i = start; i < end; i++) { + final double v2 = aVals[i]; + if(Double.isNaN(v2)) { + map[i] = -1; // assign temporarily to -1 + containsNull = true; + } + else { + // safe casts to long for consistent behavior with indexing + int col = UtilFunctions.toInt(v2); + if(col <= 0) + throw new DMLRuntimeException( + "Erroneous input while computing the contingency table (value <= zero): " + v2); + + map[i] = col - 1; + // maintain max seen col + maxCol = Math.max(col, maxCol); + } + } + + return containsNull ? maxCol * -1 : maxCol; + } + +} diff --git a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java index b055a6848fc..9036813ad9d 100644 --- a/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java +++ b/src/main/java/org/apache/sysds/runtime/controlprogram/federated/FederatedWorker.java @@ -85,6 +85,8 @@ public FederatedWorker(int port, boolean debug) { else _fan = null; + log.debug("Running federated worker " + (_fan == null ? "": " with AWARE")); + _port = (port == -1) ? DMLConfig.DEFAULT_FEDERATED_PORT : port; _debug = debug; diff --git a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java index 5c72b854362..5014c0ac30e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/SPInstructionParser.java @@ -39,6 +39,7 @@ import org.apache.sysds.lops.WeightedUnaryMM; import org.apache.sysds.lops.WeightedUnaryMMR; import org.apache.sysds.runtime.DMLRuntimeException; +import org.apache.sysds.runtime.instructions.cp.CPInstruction.CPType; import org.apache.sysds.runtime.instructions.cp.CPOperand; import org.apache.sysds.runtime.instructions.spark.AggregateTernarySPInstruction; import org.apache.sysds.runtime.instructions.spark.AggregateUnarySPInstruction; @@ -195,6 +196,7 @@ public class SPInstructionParser extends InstructionParser String2SPInstructionType.put( "freplicate", SPType.Binary); String2SPInstructionType.put( "mapdropInvalidLength", SPType.Binary); String2SPInstructionType.put( "valueSwap", SPType.Binary); + String2SPInstructionType.put( "applySchema" , SPType.Binary); String2SPInstructionType.put( "_map", SPType.Ternary); // _map refers to the operation map // Relational Instruction Opcodes String2SPInstructionType.put( "==" , SPType.Binary); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java index cff0650235e..2ec23037385 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/BinaryMatrixMatrixCPInstruction.java @@ -93,8 +93,10 @@ public void processInstruction(ExecutionContext ec) { // Release the memory occupied by input matrices ec.releaseMatrixInput(input1.getName(), input2.getName()); // Ensure right dense/sparse output representation (guarded by released input memory) - if(checkGuardedRepresentationChange(inBlock1, inBlock2, retBlock)) - retBlock.examSparsity(); + if(checkGuardedRepresentationChange(inBlock1, inBlock2, retBlock)){ + int k = (_optr instanceof BinaryOperator) ? ((BinaryOperator) _optr).getNumThreads() : 1; + retBlock.examSparsity(k); + } } // Attach result matrix with MatrixObject associated with output_name diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/CtableCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/CtableCPInstruction.java index 69b24ebc2b0..c94a0e96689 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/CtableCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/CtableCPInstruction.java @@ -96,6 +96,8 @@ public void processInstruction(ExecutionContext ec) { MatrixBlock matBlock1 =! _isExpand ? ec.getMatrixInput(input1): null; MatrixBlock matBlock2 = null, wtBlock=null; double cst1, cst2; + if(!input1.isScalar()) + matBlock1 = ec.getMatrixInput(input1.getName()); CTableMap resultMap = new CTableMap(EntryType.INT); MatrixBlock resultBlock = null; @@ -115,7 +117,6 @@ public void processInstruction(ExecutionContext ec) { if( !sparse ) resultBlock = new MatrixBlock((int)outputDim1, (int)outputDim2, false); } - switch(ctableOp) { case CTABLE_TRANSFORM: //(VECTOR) // F=ctable(A,B,W) diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java index 9027d4514aa..198ecc61a4a 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MatrixAppendCPInstruction.java @@ -22,7 +22,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.sysds.runtime.DMLRuntimeException; import org.apache.sysds.runtime.compress.CompressedMatrixBlock; -import org.apache.sysds.runtime.compress.lib.CLALibAppend; +import org.apache.sysds.runtime.compress.lib.CLALibCBind; import org.apache.sysds.runtime.controlprogram.context.ExecutionContext; import org.apache.sysds.runtime.lineage.LineageItem; import org.apache.sysds.runtime.lineage.LineageItemUtils; @@ -46,8 +46,8 @@ public void processInstruction(ExecutionContext ec) { validateInput(matBlock1, matBlock2); MatrixBlock ret; - if(matBlock1 instanceof CompressedMatrixBlock || matBlock2 instanceof CompressedMatrixBlock) - ret = CLALibAppend.append(matBlock1, matBlock2, InfrastructureAnalyzer.getLocalParallelism()); + if(_type == AppendType.CBIND && (matBlock1 instanceof CompressedMatrixBlock || matBlock2 instanceof CompressedMatrixBlock) ) + ret = CLALibCBind.cbind(matBlock1, matBlock2, InfrastructureAnalyzer.getLocalParallelism()); else ret = matBlock1.append(matBlock2, new MatrixBlock(), _type == AppendType.CBIND); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java index e5e486752d0..09280513083 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/cp/MultiReturnParameterizedBuiltinCPInstruction.java @@ -104,6 +104,10 @@ public void processInstruction(ExecutionContext ec) { ec.releaseFrameInput(input1.getName()); ec.setMatrixOutput(getOutput(0).getName(), data); ec.setFrameOutput(getOutput(1).getName(), meta); + + if(LOG.isDebugEnabled()) + // debug the size of the output metadata. + LOG.debug("Memory size of metadata: " + meta.getInMemorySize()); } @Override diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java index 6f6232e71af..dfad7a165e7 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/BinaryFrameFrameSPInstruction.java @@ -59,6 +59,11 @@ else if(getOpcode().equals("valueSwap")) { // Attach result frame with FrameBlock associated with output_name sec.releaseFrameInput(input2.getName()); } + else if(getOpcode().equals("applySchema")){ + Broadcast fb = sec.getSparkContext().broadcast(sec.getFrameInput(input2.getName())); + out = in1.mapValues(new applySchema(fb.getValue())); + sec.releaseFrameInput(input2.getName()); + } else { JavaPairRDD in2 = sec.getFrameBinaryBlockRDDHandleForVariable(input2.getName()); // create output frame @@ -70,7 +75,9 @@ else if(getOpcode().equals("valueSwap")) { //set output RDD and maintain dependencies sec.setRDDHandleForVariable(output.getName(), out); sec.addLineageRDD(output.getName(), input1.getName()); - if( !getOpcode().equals("dropInvalidType") && !getOpcode().equals("valueSwap")) + if(!getOpcode().equals("dropInvalidType") && // + !getOpcode().equals("valueSwap") && // + !getOpcode().equals("applySchema")) sec.addLineageRDD(output.getName(), input2.getName()); } @@ -116,4 +123,20 @@ public FrameBlock call(FrameBlock arg0) throws Exception { return arg0.valueSwap(schema_frame); } } + + + private static class applySchema implements Function{ + private static final long serialVersionUID = 58504021316402L; + + private FrameBlock schema; + + public applySchema(FrameBlock schema ) { + this.schema = schema; + } + + @Override + public FrameBlock call(FrameBlock arg0) throws Exception { + return arg0.applySchema(schema); + } + } } diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java index c6ff8c7a384..0ad5432ff9e 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/WriteSPInstruction.java @@ -356,16 +356,13 @@ private static void customSaveTextFile(JavaRDD rdd, String fname, boolea } rdd.saveAsTextFile(randFName); - HDFSTool.mergeIntoSingleFile(randFName, fname); // Faster version :) - - // rdd.coalesce(1, true).saveAsTextFile(randFName); - // MapReduceTool.copyFileOnHDFS(randFName + "/part-00000", fname); + HDFSTool.mergeIntoSingleFile(randFName, fname); } catch (IOException e) { throw new DMLRuntimeException("Cannot merge the output into single file: " + e.getMessage()); } finally { try { - // This is to make sure that we donot create random files on HDFS + // This is to make sure that we do not create random files on HDFS HDFSTool.deleteFileIfExistOnHDFS( randFName ); } catch (IOException e) { throw new DMLRuntimeException("Cannot merge the output into single file: " + e.getMessage()); diff --git a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java index 9371d43094c..a5974640cc5 100644 --- a/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java +++ b/src/main/java/org/apache/sysds/runtime/instructions/spark/utils/FrameRDDConverterUtils.java @@ -90,10 +90,7 @@ public static JavaPairRDD csvToBinaryBlock(JavaSparkContext sc JavaRDD tmp = input.values() .map(new TextToStringFunction()); String tmpStr = tmp.first(); - boolean metaHeader = tmpStr.startsWith(TfUtils.TXMTD_MVPREFIX) - || tmpStr.startsWith(TfUtils.TXMTD_NDPREFIX); - tmpStr = (metaHeader) ? tmpStr.substring(tmpStr.indexOf(delim)+1) : tmpStr; - long rlen = tmp.count() - (hasHeader ? 1 : 0) - (metaHeader ? 2 : 0); + long rlen = tmp.count() ; long clen = IOUtilFunctions.splitCSV(tmpStr, delim).length; mc.set(rlen, clen, mc.getBlocksize(), -1); } @@ -582,14 +579,14 @@ public Iterator> call(Iterator> arg0) _colnames = row.split(_delim); continue; } - if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) { - _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); - continue; - } - else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) { - _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); - continue; - } + // if( row.startsWith(TfUtils.TXMTD_MVPREFIX) ) { + // _mvMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); + // continue; + // } + // else if( row.startsWith(TfUtils.TXMTD_NDPREFIX) ) { + // _ndMeta = Arrays.asList(Arrays.copyOfRange(IOUtilFunctions.splitCSV(row, _delim), 1, (int)_clen+1)); + // continue; + // } //adjust row index for header and meta data rowix += (_hasHeader ? 0 : 1) - ((_mvMeta == null) ? 0 : 2); @@ -670,18 +667,18 @@ public Iterator call(Tuple2 arg0) ret.add(sb.toString()); sb.setLength(0); //reset } - if( !blk.isColumnMetadataDefault() ) { - sb.append(TfUtils.TXMTD_MVPREFIX + _props.getDelim()); - for( int j=0; j