diff --git a/scripts/perftest/python/io/load_numpy.py b/scripts/perftest/python/io/load_numpy.py index 11622b37226..3395bbffc5f 100644 --- a/scripts/perftest/python/io/load_numpy.py +++ b/scripts/perftest/python/io/load_numpy.py @@ -28,7 +28,12 @@ [ "from systemds.script_building.script import DMLScript", "import numpy as np", - "array = np.loadtxt(src, delimiter=',')", + "import os", + "if os.path.isdir(src):", + " files = [os.path.join(src, f) for f in os.listdir(src)]", + " array = np.concatenate([np.loadtxt(f, delimiter=',') for f in files])", + "else:", + " array = np.loadtxt(src, delimiter=',')", "if dtype is not None:", " array = array.astype(dtype)", ] diff --git a/scripts/perftest/python/io/load_pandas.py b/scripts/perftest/python/io/load_pandas.py index 60cb46f0cce..c6bd4e182f6 100644 --- a/scripts/perftest/python/io/load_pandas.py +++ b/scripts/perftest/python/io/load_pandas.py @@ -27,7 +27,12 @@ [ "from systemds.script_building.script import DMLScript", "import pandas as pd", - "df = pd.read_csv(src, header=None)", + "import os", + "if os.path.isdir(src):", + " files = [os.path.join(src, f) for f in os.listdir(src)]", + " df = pd.concat([pd.read_csv(f, header=None) for f in files])", + "else:", + " df = pd.read_csv(src, header=None)", "if dtype is not None:", " df = df.astype(dtype)", ] diff --git a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java index f8c90fcdeec..63cadb43cf4 100644 --- a/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java +++ b/src/main/java/org/apache/sysds/runtime/frame/data/FrameBlock.java @@ -555,6 +555,17 @@ public void reset() { reset(0, true); } + /** + * Sets row at position r to the input array of objects, corresponding to the schema. + * @param r row index + * @param row array of objects + */ + public void setRow(int r, Object[] row) { + for (int i = 0; i < row.length; i++) { + set(r, i, row[i]); + } + } + /** * Append a row to the end of the data frame, where all row fields are boxed objects according to the schema. * @@ -753,6 +764,55 @@ else if(column != null && column.size() != _nRow) _msize = -1; } + /** + * Appends a chunk of data to the end of a specified column. + * + * @param c column index + * @param chunk chunk of data to append + */ + public void appendColumnChunk(int c, Array chunk) { + if (_coldata == null) { + _coldata = new Array[getNumColumns()]; + } + + if (_coldata[c] == null) { + _coldata[c] = chunk; + _nRow = chunk.size(); + } else { + _coldata[c] = ArrayFactory.append(_coldata[c], chunk); + _nRow += chunk.size(); + } + + _msize = -1; + } + + /** + * Sets a chunk of data to a specified column, starting at the specified offset. + * + * @param c column index + * @param chunk chunk of data to set + * @param offset offset position where it should set the chunk + * @param colSize size of columns, in case columns aren't initialized yet + */ + public void setColumnChunk(int c, Array chunk, int offset, int colSize) { + if (_coldata == null) { + _coldata = new Array[getNumColumns()]; + _nRow = colSize; + } + + if (_coldata[c] == null) { + _coldata[c] = ArrayFactory.allocate(chunk.getValueType(), _nRow); + } + + if (_coldata[c].getValueType() != chunk.getValueType()) { + throw new DMLRuntimeException("ValueType mismatch in setColumnChunk: expected " + + _coldata[c].getValueType() + " but got " + chunk.getValueType()); + } + + ArrayFactory.set(_coldata[c], chunk, offset, offset + chunk.size() - 1, _nRow); + } + + @Override public void write(DataOutput out) throws IOException { final boolean isDefaultMeta = isColNamesDefault() && isColumnMetadataDefault(); diff --git a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java index 75c03b31a1c..7faee722d04 100644 --- a/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java +++ b/src/main/java/org/apache/sysds/runtime/util/Py4jConverterUtils.java @@ -128,36 +128,60 @@ public static Array convert(byte[] data, int numElements, Types.ValueType val buffer.order(ByteOrder.LITTLE_ENDIAN); Array array = ArrayFactory.allocate(valueType, numElements); + readBufferIntoArray(buffer, array, valueType, numElements); - // Process the data based on the value type - switch(valueType) { - case UINT8: - for(int i = 0; i < numElements; i++) { + return array; + } + + // Right now row conversion is only supported for if all columns have the same datatype, so this is a placeholder for now that essentially just casts to Object[] + public static Object[] convertRow(byte[] data, int numElements, Types.ValueType valueType) { + Array converted = convert(data, numElements, valueType); + + Object[] row = new Object[numElements]; + for(int i = 0; i < numElements; i++) { + row[i] = converted.get(i); + } + + return row; + } + + public static Array[] convertFused(byte[] data, int numElements, Types.ValueType[] valueTypes) { + int numOperations = valueTypes.length; + + ByteBuffer buffer = ByteBuffer.wrap(data); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + Array[] arrays = new Array[numOperations]; + + for (int i = 0; i < numOperations; i++) { + arrays[i] = ArrayFactory.allocate(valueTypes[i], numElements); + readBufferIntoArray(buffer, arrays[i], valueTypes[i], numElements); + } + + return arrays; + } + + private static void readBufferIntoArray(ByteBuffer buffer, Array array, Types.ValueType valueType, int numElements) { + for (int i = 0; i < numElements; i++) { + switch (valueType) { + case UINT8: array.set(i, (int) (buffer.get() & 0xFF)); - } - break; - case INT32: - for(int i = 0; i < numElements; i++) { - array.set(i, buffer.getInt()); - } - break; - case INT64: - for(int i = 0; i < numElements; i++) { - array.set(i, buffer.getLong()); - } - break; - case FP32: - for(int i = 0; i < numElements; i++) { + break; + case INT32: + case HASH32: + array.set(i, buffer.getInt()); + break; + case INT64: + case HASH64: + array.set(i, buffer.getLong()); + break; + case FP32: array.set(i, buffer.getFloat()); - } - break; - case FP64: - for(int i = 0; i < numElements; i++) { + break; + case FP64: array.set(i, buffer.getDouble()); - } - break; - case BOOLEAN: - for(int i = 0; i < numElements; i++) { + break; + case BOOLEAN: if (array instanceof BooleanArray) { ((BooleanArray) array).set(i, buffer.get() != 0); } else if (array instanceof BitSetArray) { @@ -165,38 +189,20 @@ public static Array convert(byte[] data, int numElements, Types.ValueType val } else { throw new DMLRuntimeException("Array factory returned invalid array type for boolean values."); } - } - break; - case STRING: - for(int i = 0; i < numElements; i++) { - buffer.order(ByteOrder.BIG_ENDIAN); - int strLen = buffer.getInt(); - buffer.order(ByteOrder.LITTLE_ENDIAN); - byte[] strBytes = new byte[strLen]; + break; + case STRING: + int strLength = buffer.getInt(); + byte[] strBytes = new byte[strLength]; buffer.get(strBytes); array.set(i, new String(strBytes, StandardCharsets.UTF_8)); - } - break; - case CHARACTER: - for(int i = 0; i < numElements; i++) { + break; + case CHARACTER: array.set(i, buffer.getChar()); - } - break; - case HASH32: - for(int i = 0; i < numElements; i++) { - array.set(i, buffer.getInt()); - } - break; - case HASH64: - for(int i = 0; i < numElements; i++) { - array.set(i, buffer.getLong()); - } - break; - default: - throw new DMLRuntimeException("Unsupported value type: " + valueType); + break; + default: + throw new DMLRuntimeException("Unsupported value type: " + valueType); + } } - - return array; } public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb) { diff --git a/src/main/python/systemds/utils/converters.py b/src/main/python/systemds/utils/converters.py index 38fdab8ca70..61a4769e806 100644 --- a/src/main/python/systemds/utils/converters.py +++ b/src/main/python/systemds/utils/converters.py @@ -56,7 +56,7 @@ def numpy_to_matrix_block(sds, np_arr: np.array): else: arr = np_arr.ravel().astype(np.float64) value_type = jvm.org.apache.sysds.common.Types.ValueType.FP64 - buf = bytearray(arr.tobytes()) + buf = arr.tobytes() # Send data to java. try: @@ -82,31 +82,38 @@ def matrix_block_to_numpy(jvm: JVMView, mb: JavaObject): ) -def convert_column(jvm, rows, j, col_type, pd_col, fb, col_name): - """Converts a given pandas column to a FrameBlock representation. +def convert(jvm, fb, idx, num_elements, value_type, pd_series, conversion="column"): + """Converts a given pandas column or row to a FrameBlock representation. :param jvm: The JVMView of the current SystemDS context. - :param rows: The number of rows in the pandas DataFrame. - :param j: The current column index. - :param col_type: The ValueType of the column. - :param pd_col: The pandas column to convert. + :param fb: The FrameBlock to add the column to. + :param idx: The current column/row index. + :param num_elements: The number of rows/columns in the pandas DataFrame. + :param value_type: The ValueType of the column/row. + :param pd_series: The pandas column or row to convert. + :param conversion: The type of conversion to perform. Can be either "column" or "row". """ - if col_type == jvm.org.apache.sysds.common.Types.ValueType.STRING: + if pd_series.dtype == "string" or pd_series.dtype == "object": byte_data = bytearray() - for value in pd_col.astype(str): + for value in pd_series.astype(str): encoded_value = value.encode("utf-8") - byte_data.extend(struct.pack(">I", len(encoded_value))) + byte_data.extend(struct.pack(" 4: - for i in range(len(schema)): - j_valueTypeArray[i] = schema[i] - - fb = jc_FrameBlock(j_valueTypeArray, rows) + # Row conversion if more columns than rows and all columns have the same type, otherwise column + conversion_type = ( + "row" if cols > rows and len(set(pd_df.dtypes)) == 1 else "column" + ) + if conversion_type == "row": + pd_df = pd_df.transpose() + col_names = pd_df.columns.tolist() # re-calculate col names + + fb = jc_FrameBlock( + j_valueTypeArray, + j_colNameArray, + rows if conversion_type == "column" else None, + ) + if conversion_type == "row": + fb.ensureAllocatedColumns(rows) + # We use .submit() with explicit .result() calling to properly propagate exceptions with concurrent.futures.ThreadPoolExecutor() as executor: - executor.map( - lambda j, col_name: convert_column( - jvm, rows, j, schema[j], pd_df[col_name], fb, col_name - ), - range(len(col_names)), - col_names, - ) + futures = [ + executor.submit( + convert, + jvm, + fb, + i, + rows if conversion_type == "column" else cols, + schema[i], + pd_df[col_name], + conversion_type, + ) + for i, col_name in enumerate(col_names) + ] + + for future in concurrent.futures.as_completed(futures): + future.result() return fb else: j_dataArray = java_gate.new_array(jc_String, rows, cols) - j_colNameArray = java_gate.new_array(jc_String, len(col_names)) for j, col_name in enumerate(col_names): - j_valueTypeArray[j] = schema[j] - j_colNameArray[j] = str(col_names[j]) col_data = pd_df[col_name].fillna("").to_numpy(dtype=str) for i in range(col_data.shape[0]): diff --git a/src/main/python/tests/iotests/test_io_pandas_systemds.py b/src/main/python/tests/iotests/test_io_pandas_systemds.py index 0ddbf63a5d6..7f599ea163d 100644 --- a/src/main/python/tests/iotests/test_io_pandas_systemds.py +++ b/src/main/python/tests/iotests/test_io_pandas_systemds.py @@ -27,18 +27,22 @@ from systemds.context import SystemDSContext +def create_dataframe(n_rows, n_cols, mixed=True): + return pd.DataFrame( + { + f"C{i+1}": [ + f"col{i+1}_string_{j}" if i == 0 and mixed else j + i + for j in range(n_rows) + ] + for i in range(n_cols) + } + ) + + class TestPandasFromToSystemds(unittest.TestCase): sds: SystemDSContext = None temp_dir: str = "tests/iotests/temp_write_csv/" - n_cols = 3 - n_rows = 5 - df = pd.DataFrame( - { - "C1": [f"col1_string_{i}" for i in range(n_rows)], - "C2": [i for i in range(n_rows)], - } - ) @classmethod def setUpClass(cls): @@ -52,22 +56,36 @@ def tearDownClass(cls): shutil.rmtree(cls.temp_dir, ignore_errors=True) def test_into_systemds(self): - # Transfer into SystemDS and write to CSV - frame = self.sds.from_pandas(self.df) - frame.write( - self.temp_dir + "into_systemds.csv", format="csv", header=True - ).compute(verbose=True) + combinations = [ # (n_rows, n_cols, mixed) + (3, 2, True), # Test un-parallelized code (rows <= 4) + (10, 5, True), # Test parallelized column-wise code + (5, 10, True), # Test parallelized column-wise mixed code + (5, 10, False), # Test parallelized row-wise code + ] - # Read the CSV file using pandas - result_df = pd.read_csv(self.temp_dir + "into_systemds.csv") + for n_rows, n_cols, mixed in combinations: + df = create_dataframe(n_rows, n_cols, mixed) - # Verify the data - self.assertTrue(isinstance(result_df, pd.DataFrame)) - self.assertTrue(self.df.equals(result_df)) + # Transfer into SystemDS and write to CSV + frame = self.sds.from_pandas(df) + frame.write( + self.temp_dir + "into_systemds.csv", format="csv", header=True + ).compute(verbose=True) + + # Read the CSV file using pandas + result_df = pd.read_csv(self.temp_dir + "into_systemds.csv") + + # Verify the data + self.assertTrue(isinstance(result_df, pd.DataFrame)) + self.assertTrue(df.equals(result_df)) def test_out_of_systemds(self): + n_rows = 3 + n_cols = 2 + df = create_dataframe(n_rows, n_cols) + # Create a CSV file to read into SystemDS - self.df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, index=False) + df.to_csv(self.temp_dir + "out_of_systemds.csv", header=False, index=False) # Read the CSV file into SystemDS and then compute back to pandas frame = self.sds.read( @@ -79,7 +97,7 @@ def test_out_of_systemds(self): result_df["C2"] = result_df["C2"].astype(int) self.assertTrue(isinstance(result_df, pd.DataFrame)) - self.assertTrue(self.df.equals(result_df)) + self.assertTrue(df.equals(result_df)) if __name__ == "__main__": diff --git a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java index 63f617711a2..b5c57341e0d 100644 --- a/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java +++ b/src/test/java/org/apache/sysds/test/component/frame/FrameGetSetTest.java @@ -19,6 +19,10 @@ package org.apache.sysds.test.component.frame; +import static org.junit.Assert.assertEquals; + +import org.apache.sysds.runtime.frame.data.columns.Array; +import org.apache.sysds.runtime.frame.data.columns.ArrayFactory; import org.junit.Assert; import org.junit.Test; import org.apache.sysds.common.Types.ValueType; @@ -171,4 +175,43 @@ else if( itype == InitType.ROW_STRING ) { throw new RuntimeException(ex); } } + + + @Test + public void testSetRow() { + FrameBlock frame = new FrameBlock(schemaMixed, "0", rows); + + frame.setRow(2, new Object[] {"2", 2.0, 2L, true}); + + assertEquals(frame.get(2, 0), "2"); + assertEquals(frame.get(2, 1), 2.0); + assertEquals(frame.get(2, 2), 2L); + assertEquals(frame.get(2, 3), true); + } + + @Test + public void testAppendColumnChunk() { + FrameBlock frame = new FrameBlock(schemaMixed, rows); + + Array chunk = ArrayFactory.create(new double[] {1.0, 2.0}); + Array chunk2 = ArrayFactory.create(new double[] {3.0, 4.0}); + frame.appendColumnChunk(1, chunk); + frame.appendColumnChunk(1, chunk2); + + assertEquals(frame.get(0, 1), 1.0); + assertEquals(frame.get(1, 1), 2.0); + assertEquals(frame.get(2, 1), 3.0); + assertEquals(frame.get(3, 1), 4.0); + } + + @Test + public void testSetColumnChunk() { + FrameBlock frame = new FrameBlock(schemaMixed, "0", rows); + + Array chunk = ArrayFactory.create(new double[] {1.0, 2.0}); + frame.setColumnChunk(1, chunk, 5, rows); + + assertEquals(frame.get(5, 1), 1.0); + assertEquals(frame.get(6, 1), 2.0); + } } diff --git a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java index 980165c3abd..466c3337d83 100644 --- a/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java +++ b/src/test/java/org/apache/sysds/test/component/frame/array/Py4jConverterUtilsTest.java @@ -170,9 +170,7 @@ public void testConvertString() { ByteBuffer buffer = ByteBuffer.allocate(4 + strings[0].length() + 4 + strings[1].length()); buffer.order(ByteOrder.LITTLE_ENDIAN); for(String s : strings) { - buffer.order(ByteOrder.BIG_ENDIAN); buffer.putInt(s.length()); - buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.put(s.getBytes(StandardCharsets.UTF_8)); } Array result = Py4jConverterUtils.convert(buffer.array(), numElements, Types.ValueType.STRING); @@ -199,6 +197,32 @@ public void testConvertChar() { } } + @Test + public void testConvertRow() { + int numElements = 4; + byte[] data = {1, 2, 3, 4}; + Object[] row = Py4jConverterUtils.convertRow(data, numElements, Types.ValueType.UINT8); + assertNotNull(row); + assertEquals(4, row.length); + assertEquals(1, row[0]); + assertEquals(2, row[1]); + assertEquals(3, row[2]); + assertEquals(4, row[3]); + } + + @Test + public void testConvertFused() { + int numElements = 1; + byte[] data = {1, 2, 3, 4}; + Types.ValueType[] valueTypes = {ValueType.UINT8, ValueType.UINT8, ValueType.UINT8, ValueType.UINT8}; + Array[] arrays = Py4jConverterUtils.convertFused(data, numElements, valueTypes); + assertNotNull(arrays); + assertEquals(4, arrays.length); + for(int i = 0; i < 4; i++) { + assertEquals(1 + i, arrays[i].get(0)); + } + } + @Test(expected = Exception.class) public void nullData() { Py4jConverterUtils.convert(null, 14, ValueType.BOOLEAN);