Skip to content

Commit

Permalink
[SYSTEMDS-3814] Fix invalid rename of csv input to output files
Browse files Browse the repository at this point in the history
This patch fixes a remaining invalid rename of persistently read input
csv files to csv output files, which "deletes" the input file. So far
we based this information on the PREAD variable name, but certain
assignments loose this information. We now properly capture this
information at createvar instructions, preserve them inside all
matrices, frames, and tensors, and thus ensure robustness for all
kind of programs.
  • Loading branch information
mboehm7 committed Jan 17, 2025
1 parent 5b71a03 commit 6272b0e
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public enum CacheStatus {

/** The name of HDFS file in which the data is backed up. */
protected String _hdfsFileName = null; // file name and path
protected boolean _isPRead = false; //persistent read, must not be deleted

/**
* Flag that indicates whether or not hdfs file exists.It is used
Expand Down Expand Up @@ -285,6 +286,14 @@ public String getFileName() {
return _hdfsFileName;
}

public boolean isPersistentRead() {
return _isPRead;
}

public void setPersistentRead(boolean pread) {
_isPRead = pread;
}

public long getUniqueID() {
return _uniqueID;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,7 +706,7 @@ private void processCreateVariableInstruction(ExecutionContext ec){
case MATRIX: {
String fname = createUniqueFilename();
MatrixObject obj = new MatrixObject(getInput1().getValueType(), fname);
setCacheableDataFields(obj);
setCacheableDataFields(obj, getInput1().getName());
obj.setUpdateType(_updateType);
obj.setMarkForLinCache(true);
ec.setVariable(getInput1().getName(), obj);
Expand All @@ -717,14 +717,14 @@ private void processCreateVariableInstruction(ExecutionContext ec){
case TENSOR: {
String fname = createUniqueFilename();
TensorObject obj = new TensorObject(getInput1().getValueType(), fname);
setCacheableDataFields(obj);
setCacheableDataFields(obj, getInput1().getName());
ec.setVariable(getInput1().getName(), obj);
break;
}
case FRAME: {
String fname = createUniqueFilename();
FrameObject fobj = new FrameObject(fname);
setCacheableDataFields(fobj);
setCacheableDataFields(fobj, getInput1().getName());
if( _schema != null )
fobj.setSchema(_schema); //after metadata
ec.setVariable(getInput1().getName(), fobj);
Expand Down Expand Up @@ -757,13 +757,14 @@ private String createUniqueFilename(){
return fname;
}

private void setCacheableDataFields(CacheableData<?> obj){
private void setCacheableDataFields(CacheableData<?> obj, String varname){
//clone metadata because it is updated on copy-on-write, otherwise there
//is potential for hidden side effects between variables.
obj.setMetaData((MetaData)metadata.clone());
obj.enableCleanup(!getInput1().getName()
.startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
obj.setFileFormatProperties(_formatProperties);
obj.setPersistentRead(varname.startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX));
}

/**
Expand Down Expand Up @@ -960,7 +961,7 @@ private void processCastAsMatrixVariableInstruction(ExecutionContext ec) {

/**
* Handler for CastAsFrameVariable instruction
*
*
* @param ec execution context
*/
private void processCastAsFrameVariableInstruction(ExecutionContext ec){
Expand Down Expand Up @@ -1018,6 +1019,7 @@ private void processReadInstruction(ExecutionContext ec){
* @param ec execution context
*/
private void processCopyInstruction(ExecutionContext ec) {

// get source variable
Data dd = ec.getVariable(getInput1().getName());

Expand Down Expand Up @@ -1142,9 +1144,7 @@ private void writeCSVFile(ExecutionContext ec, String fname) {
try {
FileFormat fmt = ((MetaDataFormat)mo.getMetaData()).getFileFormat();
DataCharacteristics dc = (mo.getMetaData()).getDataCharacteristics();
if( fmt == FileFormat.CSV
&& !getInput1().getName().startsWith(org.apache.sysds.lops.Data.PREAD_PREFIX) )
{
if( fmt == FileFormat.CSV && !mo.isPersistentRead() ) {
WriterTextCSV writer = new WriterTextCSV((FileFormatPropertiesCSV)fprop);
writer.addHeaderToCSV(mo.getFileName(), fname, dc.getRows(), dc.getCols());
}
Expand Down
134 changes: 134 additions & 0 deletions src/test/java/org/apache/sysds/test/functions/io/RenameIssueTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.sysds.test.functions.io;

import java.io.File;
import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.common.Types.ExecMode;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.io.MatrixWriter;
import org.apache.sysds.runtime.io.MatrixWriterFactory;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.test.AutomatedTestBase;
import org.apache.sysds.test.TestConfiguration;
import org.apache.sysds.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class RenameIssueTest extends AutomatedTestBase {

protected static final Log LOG = LogFactory.getLog(RenameIssueTest.class.getName());

private final static String TEST_NAME1 = "Rename";
private final static String TEST_DIR = "functions/io/";
private final static String TEST_CLASS_DIR = TEST_DIR + RenameIssueTest.class.getSimpleName() + "/";

@Override
public void setUp() {
TestUtils.clearAssertionInformation();
addTestConfiguration(TEST_NAME1, new TestConfiguration(TEST_CLASS_DIR, TEST_NAME1, new String[] {"L","R1"}) );
}

@Test
public void testCSVSinglenode() {
runRameTest(FileFormat.CSV, ExecMode.SINGLE_NODE);
}

@Test
public void testCSVHybrid() {
runRameTest(FileFormat.CSV, ExecMode.HYBRID);
}

@Test
public void testCSVSpark() {
runRameTest(FileFormat.CSV, ExecMode.SPARK);
}

@Test
public void testTextSinglenode() {
runRameTest(FileFormat.TEXT, ExecMode.SINGLE_NODE);
}

@Test
public void testTextHybrid() {
runRameTest(FileFormat.TEXT, ExecMode.HYBRID);
}

@Test
public void testTextSpark() {
runRameTest(FileFormat.TEXT, ExecMode.SPARK);
}

@Test
public void testBinarySinglenode() {
runRameTest(FileFormat.BINARY, ExecMode.SINGLE_NODE);
}

@Test
public void testBinaryHybrid() {
runRameTest(FileFormat.BINARY, ExecMode.HYBRID);
}

@Test
public void testBinarySpark() {
runRameTest(FileFormat.BINARY, ExecMode.SPARK);
}

private void runRameTest(FileFormat fmt, ExecMode mode)
{
ExecMode modeOld = setExecMode(mode);

try {
TestConfiguration config = getTestConfiguration(TEST_NAME1);
loadTestConfiguration(config);

MatrixBlock a = DataConverter.convertToMatrixBlock(getRandomMatrix(7, 7, -1, 1, 0.5, -1));
MatrixWriter writer = MatrixWriterFactory.createMatrixWriter(fmt);
writer.writeMatrixToHDFS(a, input("A"),
(long)a.getNumRows(), (long)a.getNumColumns(), (int)a.getNonZeros(), 1000);
HDFSTool.writeMetaDataFile(input("A")+".mtd", ValueType.FP64,
new MatrixCharacteristics(7,7,1000), fmt);

String HOME = SCRIPT_DIR + TEST_DIR;
fullDMLScriptName = HOME + TEST_NAME1 + ".dml";
programArgs = new String[]{"-explain",
"-args", input("A"), fmt.toString().toLowerCase(), output("B")};
runTest(true, false, null, -1);

//check file existence (no rename to output)
Assert.assertTrue(new File(input("A")).exists());
Assert.assertTrue(new File(output("B")).exists());
}
catch (IOException e) {
e.printStackTrace();
Assert.fail();
}
finally {
resetExecMode(modeOld);
}
}
}
31 changes: 31 additions & 0 deletions src/test/scripts/functions/io/Rename.dml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#-------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#-------------------------------------------------------------

X1 = read($1);

Xa = X1;
for(i in 1:2) {
write(Xa, $3, format=$2);
while(FALSE){} #write first
Xa = rbind(Xa, X1);
print("Creating and writing replicated dataset ["+i+"]");
}

0 comments on commit 6272b0e

Please sign in to comment.