Skip to content

Commit

Permalink
better abort frame compressiong
Browse files Browse the repository at this point in the history
  • Loading branch information
Baunsgaard committed Dec 2, 2023
1 parent 874b8ce commit d452cb5
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,10 @@ public boolean containsNull() {

public abstract boolean possiblyContainsNaN();

public Array<?> changeType(ValueType t, boolean containsNull) {
return containsNull ? changeTypeWithNulls(t) : changeType(t);
}

public Array<?> changeTypeWithNulls(ValueType t) {

final ABooleanArray nulls = getNulls();
Expand Down Expand Up @@ -720,7 +724,11 @@ public ArrayCompressionStatistics statistics(int nSamples) {
freq[id++] = e.getValue();

int estDistinct = SampleEstimatorFactory.distinctCount(freq, size(), nSamples);
long memSize = getInMemorySize(); // uncompressed size
// memory size is different depending on valuetype.
long memSize = vt.getKey() != getValueType() ? //
ArrayFactory.getInMemorySize(vt.getKey(), size(), containsNull()) : //
getInMemorySize(); // uncompressed size

int memSizePerElement;
switch(vt.getKey()) {
case UINT4:
Expand All @@ -742,15 +750,17 @@ public ArrayCompressionStatistics statistics(int nSamples) {
case UNKNOWN:
case STRING:
default:
memSizePerElement = (int) ((memSize * 8L) / size());
memSizePerElement = (int) (memSize / size());
}

long ddcSize = DDCArray.estimateInMemorySize(memSizePerElement, estDistinct, size());

if(ddcSize < memSize)
return new ArrayCompressionStatistics(memSizePerElement, //
estDistinct, true, vt.getKey(), vt.getValue(), FrameArrayType.DDC, memSize, ddcSize);

estDistinct, true, vt.getKey(), vt.getValue(), FrameArrayType.DDC, getInMemorySize(), ddcSize);
else if(vt.getKey() != getValueType() )
return new ArrayCompressionStatistics(memSizePerElement, //
estDistinct, true, vt.getKey(), vt.getValue(), null, getInMemorySize(), memSize);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.DMLCompressionException;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToCharPByte;
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory.FrameArrayType;
Expand Down Expand Up @@ -71,24 +72,34 @@ public DDCArray<T> nullDict() {
return new DDCArray<T>(null, map);
}

private static int getTryThreshold(ValueType t, int allRows) {
private static int getTryThreshold(ValueType t, int allRows, long inMemSize) {
switch(t) {
case BOOLEAN:
return 1; // booleans do not compress well unless all constant.
case UINT4:
case UINT8:
return 2;
case CHARACTER:
return 256;
case FP32:
case UINT4:
case UINT8:
case INT32:
return 65536;
return 65536; // char distinct
case HASH64:
case FP64:
case INT64:
case STRING:
case UNKNOWN:
default:
return allRows;
long MapSize = MapToFactory.estimateInMemorySize(allRows, allRows);
int i = 2;

while(allRows/i >= 1 && inMemSize - MapSize < ArrayFactory.getInMemorySize(t, allRows/i, false)){
i = i*2;
}

int d = Math.max(0, allRows/i);
return d;

}
}

Expand All @@ -107,7 +118,7 @@ public static <T> Array<T> compressToDDC(Array<T> arr) {
// or if the instance if RaggedArray where all values typically are unique.
if(s <= 10 || arr instanceof RaggedArray)
return arr;
final int t = getTryThreshold(arr.getValueType(), s);
final int t = getTryThreshold(arr.getValueType(), s, arr.getInMemorySize());

// Two pass algorithm
// 1.full iteration: Get unique
Expand Down Expand Up @@ -383,7 +394,7 @@ protected Map<T, Integer> getDictionary() {
}

public static long estimateInMemorySize(int memSizeBitPerElement, int estDistinct, int nRow) {
return (estDistinct * memSizeBitPerElement) / 8 + MapToFactory.estimateInMemorySize(nRow, estDistinct);
return (long)estDistinct * memSizeBitPerElement + MapToFactory.estimateInMemorySize(nRow, estDistinct);
}

protected DDCArray<T> allocateLarger(int nRow) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ public class ArrayCompressionStatistics {
public final ValueType valueType;
public final boolean containsNull;
public final FrameArrayType bestType;
public final int bitPerValue;
public final int bytePerValue;
public final int nUnique;

public ArrayCompressionStatistics(int bitPerValue, int nUnique, boolean shouldCompress, ValueType valueType,
public ArrayCompressionStatistics(int bytePerValue, int nUnique, boolean shouldCompress, ValueType valueType,
boolean containsNull, FrameArrayType bestType, long originalSize, long compressedSizeEstimate) {
this.bitPerValue = bitPerValue;
this.bytePerValue = bytePerValue;
this.nUnique = nUnique;
this.shouldCompress = shouldCompress;
this.valueType = valueType;
Expand All @@ -49,7 +49,7 @@ public ArrayCompressionStatistics(int bitPerValue, int nUnique, boolean shouldCo
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("Compressed Stats: size:%8d->%8d, Use:%10s, Unique:%6d, ValueType:%7s", originalSize,
compressedSizeEstimate, bestType.toString(), nUnique, valueType));
compressedSizeEstimate, bestType == null ? "None" : bestType.toString(), nUnique, valueType));
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,23 @@ private void encodeParallel() {
private void compressCol(int i) {
stats[i] = in.getColumn(i).statistics(nSamples);
if(stats[i] != null) {
// commented out because no other encodings are supported yet
// switch(stats[i].bestType) {
// case DDC:
compressedColumns[i] = DDCArray.compressToDDC(in.getColumn(i), stats[i].valueType, stats[i].containsNull);
// break;
// default:
// compressedColumns[i] = in.getColumn(i);
// break;
// }
if(stats[i].bestType == null){
// just cast to other value type.
compressedColumns[i] = in.getColumn(i).changeType(stats[i].valueType, stats[i].containsNull);
}
else{
// commented out because no other encodings are supported yet
switch(stats[i].bestType) {
case DDC:
compressedColumns[i] = DDCArray.compressToDDC(in.getColumn(i), stats[i].valueType,
stats[i].containsNull);
break;
default:
LOG.error("Unsupported encoding default to do nothing: " + stats[i].bestType);
compressedColumns[i] = in.getColumn(i);
break;
}
}
}
else
compressedColumns[i] = in.getColumn(i);
Expand Down

0 comments on commit d452cb5

Please sign in to comment.