Skip to content

Commit

Permalink
[SYSTEMDS-3548] Optimize python dataframe transfer
Browse files Browse the repository at this point in the history
This commit optimizes how the pandas_to_frame_block function accesses Java types.
It also fixes a small regression, where exceptions from the parallelization threads weren't propagating exceptions properly.

- Fix perftests not working with large, split-up datasets IO datagen splits large datasets into multiple files (for example 100k_1k). This commit makes load_pandas.py and load_numpy.py able to read those.
- Add pandas to FrameBlock row-wise parallel processing in the case of cols > rows. It also adds some other small, unused utility methods.
- Add javadocs
- Adjust Py4jConverterUtilsTest to reflect the code changes in the main class.
- adds missing tests for added code in SYSTEMDS-3548. This includes the FrameBlock and Py4jConverterUtils functions, as well as python pandas to systemds io e2e tests.
- Fix pandas io test (rows have to be >4)

Closes #2189
  • Loading branch information
Nakroma authored and Baunsgaard committed Feb 5, 2025
1 parent 22642a1 commit bea9c96
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 129 deletions.
7 changes: 6 additions & 1 deletion scripts/perftest/python/io/load_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
[
"from systemds.script_building.script import DMLScript",
"import numpy as np",
"array = np.loadtxt(src, delimiter=',')",
"import os",
"if os.path.isdir(src):",
" files = [os.path.join(src, f) for f in os.listdir(src)]",
" array = np.concatenate([np.loadtxt(f, delimiter=',') for f in files])",
"else:",
" array = np.loadtxt(src, delimiter=',')",
"if dtype is not None:",
" array = array.astype(dtype)",
]
Expand Down
7 changes: 6 additions & 1 deletion scripts/perftest/python/io/load_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
[
"from systemds.script_building.script import DMLScript",
"import pandas as pd",
"df = pd.read_csv(src, header=None)",
"import os",
"if os.path.isdir(src):",
" files = [os.path.join(src, f) for f in os.listdir(src)]",
" df = pd.concat([pd.read_csv(f, header=None) for f in files])",
"else:",
" df = pd.read_csv(src, header=None)",
"if dtype is not None:",
" df = df.astype(dtype)",
]
Expand Down
60 changes: 60 additions & 0 deletions src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,17 @@ public void reset() {
reset(0, true);
}

/**
* Sets row at position r to the input array of objects, corresponding to the schema.
* @param r row index
* @param row array of objects
*/
public void setRow(int r, Object[] row) {
for (int i = 0; i < row.length; i++) {
set(r, i, row[i]);
}
}

/**
* Append a row to the end of the data frame, where all row fields are boxed objects according to the schema.
*
Expand Down Expand Up @@ -753,6 +764,55 @@ else if(column != null && column.size() != _nRow)
_msize = -1;
}

/**
* Appends a chunk of data to the end of a specified column.
*
* @param c column index
* @param chunk chunk of data to append
*/
public void appendColumnChunk(int c, Array<?> chunk) {
if (_coldata == null) {
_coldata = new Array[getNumColumns()];
}

if (_coldata[c] == null) {
_coldata[c] = chunk;
_nRow = chunk.size();
} else {
_coldata[c] = ArrayFactory.append(_coldata[c], chunk);
_nRow += chunk.size();
}

_msize = -1;
}

/**
* Sets a chunk of data to a specified column, starting at the specified offset.
*
* @param c column index
* @param chunk chunk of data to set
* @param offset offset position where it should set the chunk
* @param colSize size of columns, in case columns aren't initialized yet
*/
public void setColumnChunk(int c, Array<?> chunk, int offset, int colSize) {
if (_coldata == null) {
_coldata = new Array[getNumColumns()];
_nRow = colSize;
}

if (_coldata[c] == null) {
_coldata[c] = ArrayFactory.allocate(chunk.getValueType(), _nRow);
}

if (_coldata[c].getValueType() != chunk.getValueType()) {
throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " +
_coldata[c].getValueType() + " but got " + chunk.getValueType());
}

ArrayFactory.set(_coldata[c], chunk, offset, offset + chunk.size() - 1, _nRow);
}


@Override
public void write(DataOutput out) throws IOException {
final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault();
Expand Down
114 changes: 60 additions & 54 deletions src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,75 +128,81 @@ public static Array<?> convert(byte[] data, int numElements, Types.ValueType val
buffer.order(ByteOrder.LITTLE_ENDIAN);

Array<?> array = ArrayFactory.allocate(valueType, numElements);
readBufferIntoArray(buffer, array, valueType, numElements);

// Process the data based on the value type
switch(valueType) {
case UINT8:
for(int i = 0; i < numElements; i++) {
return array;
}

// Right now row conversion is only supported for if all columns have the same datatype, so this is a placeholder for now that essentially just casts to Object[]
public static Object[] convertRow(byte[] data, int numElements, Types.ValueType valueType) {
Array<?> converted = convert(data, numElements, valueType);

Object[] row = new Object[numElements];
for(int i = 0; i < numElements; i++) {
row[i] = converted.get(i);
}

return row;
}

public static Array<?>[] convertFused(byte[] data, int numElements, Types.ValueType[] valueTypes) {
int numOperations = valueTypes.length;

ByteBuffer buffer = ByteBuffer.wrap(data);
buffer.order(ByteOrder.LITTLE_ENDIAN);

Array<?>[] arrays = new Array<?>[numOperations];

for (int i = 0; i < numOperations; i++) {
arrays[i] = ArrayFactory.allocate(valueTypes[i], numElements);
readBufferIntoArray(buffer, arrays[i], valueTypes[i], numElements);
}

return arrays;
}

private static void readBufferIntoArray(ByteBuffer buffer, Array<?> array, Types.ValueType valueType, int numElements) {
for (int i = 0; i < numElements; i++) {
switch (valueType) {
case UINT8:
array.set(i, (int) (buffer.get() & 0xFF));
}
break;
case INT32:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getInt());
}
break;
case INT64:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());
}
break;
case FP32:
for(int i = 0; i < numElements; i++) {
break;
case INT32:
case HASH32:
array.set(i, buffer.getInt());
break;
case INT64:
case HASH64:
array.set(i, buffer.getLong());
break;
case FP32:
array.set(i, buffer.getFloat());
}
break;
case FP64:
for(int i = 0; i < numElements; i++) {
break;
case FP64:
array.set(i, buffer.getDouble());
}
break;
case BOOLEAN:
for(int i = 0; i < numElements; i++) {
break;
case BOOLEAN:
if (array instanceof BooleanArray) {
((BooleanArray) array).set(i, buffer.get() != 0);
} else if (array instanceof BitSetArray) {
((BitSetArray) array).set(i, buffer.get() != 0);
} else {
throw new DMLRuntimeException("Array factory returned invalid array type for boolean values.");
}
}
break;
case STRING:
for(int i = 0; i < numElements; i++) {
buffer.order(ByteOrder.BIG_ENDIAN);
int strLen = buffer.getInt();
buffer.order(ByteOrder.LITTLE_ENDIAN);
byte[] strBytes = new byte[strLen];
break;
case STRING:
int strLength = buffer.getInt();
byte[] strBytes = new byte[strLength];
buffer.get(strBytes);
array.set(i, new String(strBytes, StandardCharsets.UTF_8));
}
break;
case CHARACTER:
for(int i = 0; i < numElements; i++) {
break;
case CHARACTER:
array.set(i, buffer.getChar());
}
break;
case HASH32:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getInt());
}
break;
case HASH64:
for(int i = 0; i < numElements; i++) {
array.set(i, buffer.getLong());
}
break;
default:
throw new DMLRuntimeException("Unsupported value type: " + valueType);
break;
default:
throw new DMLRuntimeException("Unsupported value type: " + valueType);
}
}

return array;
}

public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) {
Expand Down
Loading

0 comments on commit bea9c96

Please sign in to comment.