diff --git a/java/hive/src/main/java/io/v6d/hadoop/fs/FileSystem.java b/java/hive/src/main/java/io/v6d/hadoop/fs/FileSystem.java index 761e2027b..355c1154a 100644 --- a/java/hive/src/main/java/io/v6d/hadoop/fs/FileSystem.java +++ b/java/hive/src/main/java/io/v6d/hadoop/fs/FileSystem.java @@ -17,47 +17,39 @@ import com.google.common.base.StopwatchContext; import io.v6d.core.client.Context; import io.v6d.core.client.IPCClient; -import io.v6d.core.client.ds.ObjectFactory; -import io.v6d.core.client.ds.ObjectMeta; import io.v6d.core.common.util.ObjectID; -import io.v6d.core.common.util.VineyardException.ObjectNotExists; import io.v6d.hive.ql.io.CloseableReentrantLock; -import io.v6d.modules.basic.arrow.SchemaBuilder; -import io.v6d.modules.basic.arrow.Table; -import io.v6d.modules.basic.arrow.TableBuilder; +import io.v6d.modules.basic.filesystem.VineyardFile; +import io.v6d.modules.basic.filesystem.VineyardFileStat; +import io.v6d.modules.basic.filesystem.VineyardFileUtils; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; import java.util.EnumSet; +import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Queue; +import java.util.Set; import lombok.*; -import org.apache.arrow.vector.types.pojo.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.util.Progressable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class VineyardOutputStream extends FSDataOutputStream { - private FileChannel channel; +class VineyardOutputStream extends OutputStream { + private final VineyardFile file; - public VineyardOutputStream(FileChannel channel) throws IOException { - super(new DataOutputBuffer(), null); - this.channel = channel; + public VineyardOutputStream(Path filePath, boolean overwrite) throws IOException { + file = new VineyardFile(filePath.toString(), false, VineyardFile.Mode.WRITE, overwrite); } @Override public void close() throws IOException { - this.channel.close(); + file.close(); } @Override @@ -67,30 +59,25 @@ public String toString() { @Override public void write(int b) throws IOException { - throw new UnsupportedOperationException("should not call this function."); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - channel.write(java.nio.ByteBuffer.wrap(b, off, len)); + file.write(b); } } class VineyardInputStream extends FSInputStream { - private FileChannel channel; + private final VineyardFile file; - public VineyardInputStream(FileChannel channel) throws IOException { - this.channel = channel; + public VineyardInputStream(Path filePath) throws IOException { + file = new VineyardFile(filePath.toString(), false, VineyardFile.Mode.READ, false); } @Override public void seek(long offset) throws IOException { - throw new UnsupportedOperationException("Vineyard input stream not support seek."); + file.seek(offset); } @Override public long getPos() throws IOException { - throw new UnsupportedOperationException("Vineyard input stream not support getPos."); + return file.getPos(); } @Override @@ -101,17 +88,18 @@ public boolean seekToNewSource(long l) throws IOException { @Override public int read() throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(1); - int ret = channel.read(buffer); - if (ret <= 0) { - return -1; - } - return buffer.get(0); + return file.read(); } @Override public void close() throws IOException { - channel.close(); + file.close(); + } +} + +class VineyardDataInputStream extends FSDataInputStream { + public VineyardDataInputStream(Path filePath) throws IOException { + super(new VineyardInputStream(filePath)); } } @@ -124,9 +112,10 @@ public class FileSystem extends org.apache.hadoop.fs.FileSystem { static final CloseableReentrantLock lock = new CloseableReentrantLock(); private Configuration conf = null; - static RawLocalFileSystem fs = null; - static boolean enablePrintAllFiles = false; - static boolean enablePrintAllObjects = false; + private boolean enablePrintAllFiles = false; + + private IPCClient client; + private static final int DIR_LEN = 1; Path workingDir = new Path("vineyard:/"); @@ -134,48 +123,6 @@ public FileSystem() { super(); } - private static void printAllFiles(Path p) throws IOException { - FileStatus[] status = fs.listStatus(p); - Queue queue = new java.util.LinkedList(); - for (FileStatus s : status) { - queue.add(s); - } - while (!queue.isEmpty()) { - FileStatus p1 = queue.poll(); - Context.println(p1.getPath().toString()); - if (p1.isDirectory()) { - FileStatus[] statusTemp = fs.listStatus(p1.getPath()); - for (FileStatus s : statusTemp) { - queue.add(s); - } - } - } - } - - private void printAllObjectsWithName() throws IOException { - if (enablePrintAllObjects) { - IPCClient client = Context.getClient(); - Context.println("print all objects with name"); - Context.println("===================================="); - Map objects = client.listNames(".*", true, 255); - for (val object : objects.entrySet()) { - Context.println( - "object name:" - + object.getKey() - + ", object id:" - + object.getValue().value()); - } - Context.println("===================================="); - } - } - - private static void printAllFiles() throws IOException { - if (enablePrintAllFiles) { - Context.println("------------------"); - printAllFiles(new Path("/opt/hive/data/warehouse")); - } - } - @Override public String getScheme() { return SCHEME; @@ -199,19 +146,28 @@ protected URI canonicalizeUri(URI uri) { public void initialize(URI name, Configuration conf) throws IOException { Context.println("Initialize vineyard file system: " + name.toString()); super.initialize(name, conf); - this.conf = conf; this.uri = name; + this.conf = conf; + this.client = Context.getClient(); - fs = new RawLocalFileSystem(); - fs.initialize(URI.create("file:///"), conf); - mkdirs(new Path(uri.toString().replaceAll("/+", "/"))); + mkdirs(new Path(uri.toString()), new FsPermission((short) 777)); + } + + private void printAllFiles() throws IOException { + if (this.enablePrintAllFiles) { + VineyardFileUtils.printAllFiles(this.client); + } } @Override public FSDataInputStream open(Path path, int i) throws IOException { - Path newPath = new Path(path.toString().replaceAll("vineyard", "file")); - FSDataInputStream result = fs.open(newPath); - return result; + try (val lock = this.lock.open()) { + FSDataInputStream result = + new FSDataInputStream( + new VineyardInputStream( + new Path(path.toString().replaceAll("/+", "/")))); + return result; + } } @Override @@ -226,7 +182,7 @@ public FSDataOutputStream create( throws IOException { try (val lock = this.lock.open()) { return createInternal( - path, + new Path(path.toString().replaceAll("/+", "/")), fsPermission, overwrite, bufferSize, @@ -245,28 +201,15 @@ private FSDataOutputStream createInternal( long blockSize, Progressable progressable) throws IOException { - Path newPath = new Path(path.toString().replaceAll("vineyard", "file")); - Path parentPath = newPath.getParent(); + Path parentPath = path.getParent(); try { - FileStatus parentStatus = fs.getFileStatus(parentPath); - if (!parentStatus.isDirectory()) { - throw new IOException("Parent path is not a directory:" + parentPath.toString()); - } + getFileStatusInternal(parentPath); } catch (FileNotFoundException e) { - // parent path not exist - Context.println("Parent dir not exists. Create parent dir first!"); - fs.mkdirs(parentPath); + // parent not exist + mkdirsInternal(parentPath, new FsPermission((short) 777)); } - printAllFiles(); FSDataOutputStream result = - fs.create( - newPath, - fsPermission, - overwrite, - bufferSize, - replication, - blockSize, - progressable); + new FSDataOutputStream(new VineyardOutputStream(path, overwrite), null); return result; } @@ -279,250 +222,193 @@ public FSDataOutputStream append(Path path, int i, Progressable progressable) @Override public boolean delete(Path path, boolean b) throws IOException { try (val lock = this.lock.open()) { - Path newPath = new Path(path.toString().replaceAll("vineyard", "file")); - return this.deleteInternal(newPath, b); + return this.deleteInternal(new Path(path.toString().replaceAll("/+", "/")), b); + } + } + + private void deleteVineyardObjectWithName(String[] names) throws IOException { + IPCClient client = Context.getClient(); + Set objectIDs = new HashSet(); + for (String name : names) { + ObjectID objectID = client.getName(name); + objectIDs.add(objectID); + client.dropName(name); } + client.delete(objectIDs, true, true); } - public void cleanObjectInVineyard(Path filePath) throws IOException { + private void deleteVineyardObjectWithObjectIDStr(String[] objectIDStrs) throws IOException { IPCClient client = Context.getClient(); - Queue queue = new java.util.LinkedList(); - Collection objectIDs = new ArrayList(); - queue.add(filePath); - while (!queue.isEmpty()) { - Path path = queue.peek(); + Set objectIDs = new HashSet(); + for (String objectIDStr : objectIDStrs) { try { - FileStatus fileStatus = fs.getFileStatus(path); - if (fileStatus.isDirectory()) { - FileStatus[] fileStatusArray = fs.listStatus(path); - for (FileStatus s : fileStatusArray) { - if (s.getPath().toString().compareTo(filePath.toString()) == 0) { - continue; - } - queue.add(s.getPath()); - } - } - - String objectName = path.toString().substring(path.toString().indexOf(":") + 1); - ObjectID objectID = client.getName(objectName); + ObjectID objectID = ObjectID.fromString(objectIDStr); objectIDs.add(objectID); - client.dropName(objectName); - } catch (FileNotFoundException e) { - // file not exist, skip - Context.println("File: " + path.toString() + " not exist."); - continue; - } catch (ObjectNotExists e) { - // object not exist - Context.println("Object of file: " + path.toString() + " not exist."); - continue; - } finally { - queue.poll(); + } catch (Exception e) { + // Skip some invalid object id. + Context.println("Failed to parse object id: " + e.getMessage()); + break; } } - client.delete(objectIDs, false, false); - printAllObjectsWithName(); + client.delete(objectIDs, true, true); } private boolean deleteInternal(Path path, boolean b) throws IOException { - cleanObjectInVineyard(path); - return fs.delete(path, b); + FileStatus fileStatus; + try { + fileStatus = getFileStatusInternal(path); + } catch (FileNotFoundException e) { + // file not exist + Context.println("File not exist."); + return false; + } + + if (fileStatus.isDirectory()) { + FileStatus[] childFileStatus = listStatusInternal(path); + if (childFileStatus.length > 0 && !b) { + throw new IOException("Directory is not empty."); + } + for (FileStatus child : childFileStatus) { + deleteInternal(child.getPath(), b); + } + deleteVineyardObjectWithName(new String[] {path.toString()}); + printAllFiles(); + return true; + } + + try { + FSDataInputStream in = open(path, 0); + byte[] objectIDByteArray = new byte[(int) fileStatus.getLen()]; + int len = in.read(objectIDByteArray); + String[] objectIDStrs = + new String(objectIDByteArray, 0, len, StandardCharsets.US_ASCII).split("\n"); + deleteVineyardObjectWithObjectIDStr(objectIDStrs); + deleteVineyardObjectWithName(new String[] {path.toString()}); + } catch (Exception e) { + Context.println("Failed to delete file: " + e.getMessage()); + return false; + } + printAllFiles(); + return true; } @Override public boolean rename(Path src, Path dst) throws IOException { try (val lock = this.lock.open()) { val watch = StopwatchContext.create(); - val renamed = this.renameInternal(src, dst); + String srcString = src.toString().replaceAll("/+", "/"); + String dstString = dst.toString().replaceAll("/+", "/"); + val renamed = this.renameInternal(new Path(srcString), new Path(dstString)); Context.println("Filesystem rename uses: " + watch.stop()); return renamed; } } - private void mergeFile(Path src, Path dst) throws IOException { - FSDataInputStream srcInput = fs.open(src); - FSDataInputStream dstInput = fs.open(dst); - byte[] objectIDByteArray = new byte[255]; - - int len = srcInput.read(objectIDByteArray); - String srcObjectIDStr = - new String(objectIDByteArray, 0, len, StandardCharsets.UTF_8).replaceAll("\n", ""); - - objectIDByteArray = new byte[255]; - len = dstInput.read(objectIDByteArray); - String dstObjectIDStr = - new String(objectIDByteArray, 0, len, StandardCharsets.UTF_8).replaceAll("\n", ""); - - ObjectID mergedTableObjectID = null; - try { - IPCClient client = Context.getClient(); - ObjectID srcObjectID = ObjectID.fromString(srcObjectIDStr); - ObjectID dstObjectID = ObjectID.fromString(dstObjectIDStr); - Table srcTable = - (Table) ObjectFactory.getFactory().resolve(client.getMetaData(srcObjectID)); - Table dstTable = - (Table) ObjectFactory.getFactory().resolve(client.getMetaData(dstObjectID)); - - // merge table - Schema schema = srcTable.getSchema().getSchema(); - SchemaBuilder mergedSchemaBuilder = SchemaBuilder.fromSchema(schema); - TableBuilder mergedTableBuilder = new TableBuilder(client, mergedSchemaBuilder); - - for (int i = 0; i < srcTable.getBatches().size(); i++) { - mergedTableBuilder.addBatch(srcTable.getBatches().get(i)); - } - - for (int i = 0; i < dstTable.getBatches().size(); i++) { - mergedTableBuilder.addBatch(dstTable.getBatches().get(i)); - } - - ObjectMeta meta = mergedTableBuilder.seal(client); - Context.println("Table id in vineyard:" + meta.getId().value()); - client.persist(meta.getId()); - Context.println("Table persisted, name:" + dst); - client.putName(meta.getId(), dst.toString().substring(dst.toString().indexOf(":") + 1)); - client.dropName(src.toString()); - mergedTableObjectID = meta.getId(); - - // drop old table - Collection ids = new ArrayList(); - ids.add(srcObjectID); - ids.add(dstObjectID); - client.delete(ids, false, false); - } catch (ObjectNotExists e) { - // Skip invalid file. - } finally { - srcInput.close(); - dstInput.close(); - if (mergedTableObjectID != null) { - FSDataOutputStream out = fs.create(dst); - String mergedTableIDStr = mergedTableObjectID.toString() + "\n"; - out.write((mergedTableIDStr + "\n").getBytes(StandardCharsets.UTF_8)); - out.close(); - } - } + private void deleteFileWithoutObject(Path path) throws IOException { + ObjectID objectID = client.getName(path.toString()); + Set objectIDs = new HashSet(); + objectIDs.add(objectID); + client.delete(objectIDs, true, true); + client.dropName(path.toString()); } - public boolean renameInternal(Path src, Path dst) throws IOException { - - Path newSrc = new Path(src.toString().replaceAll("vineyard", "file")); - Path newDst = new Path(dst.toString().replaceAll("vineyard", "file")); - String newTableName = - dst.toString().substring(dst.toString().indexOf(":") + 1).replaceAll("/+", "/"); - String oldTableName = - src.toString().substring(src.toString().indexOf(":") + 1).replaceAll("/+", "/"); + private void mergeFile(Path src, Path dst) throws IOException { + FSDataInputStream srcInput = open(src, 0); + FSDataInputStream dstInput = open(dst, 0); + FileStatus srcStatus = getFileStatusInternal(src); + FileStatus dstStatus = getFileStatusInternal(dst); + byte[] srcContent = new byte[(int) srcStatus.getLen()]; + byte[] dstContent = new byte[(int) dstStatus.getLen()]; + srcInput.read(srcContent); + dstInput.read(dstContent); + srcInput.close(); + dstInput.close(); + + FSDataOutputStream out = + createInternal(dst, new FsPermission((short) 777), true, 0, (short) 1, 1, null); + out.write(srcContent); + out.write(dstContent); + out.close(); + } + + private boolean renameInternal(Path src, Path dst) throws IOException { FileStatus srcStatus; try { - srcStatus = fs.getFileStatus(newSrc); + srcStatus = getFileStatusInternal(src); } catch (FileNotFoundException e) { // src file not exist Context.println("Src file not exist"); return false; } + Path dstParentPath = dst.getParent(); + try { + getFileStatusInternal(dstParentPath); + } catch (FileNotFoundException e) { + // dst parent not exist, create first + mkdirsInternal(dstParentPath, new FsPermission((short) 777)); + } + if (srcStatus.isDirectory()) { - FileStatus[] status = fs.listStatus(newSrc); + ObjectID objectID = client.getName(src.toString()); + client.putName(objectID, dst.toString()); + + FileStatus[] status = listStatusInternal(src); for (FileStatus s : status) { renameInternal( - s.getPath(), new Path(newDst.toString() + "/" + s.getPath().getName())); + s.getPath(), + new Path( + (dst.toString() + "/" + s.getPath().getName()) + .replaceAll("/+", "/"))); } - fs.delete(newSrc, true); + client.dropName(src.toString()); return true; } else { try { - fs.getFileStatus(newDst); + getFileStatusInternal(dst); } catch (FileNotFoundException e) { // dst file not exist - fs.rename(newSrc, newDst); - FSDataInputStream in = fs.open(newDst); - byte[] objectIDByteArray = new byte[255]; - int len = in.read(objectIDByteArray); - if (len > 0) { - String objectIDStr = - new String(objectIDByteArray, 0, len, StandardCharsets.UTF_8) - .replaceAll("\n", ""); - IPCClient client = Context.getClient(); - try { - client.putName(ObjectID.fromString(objectIDStr), newTableName); - client.dropName(oldTableName); - } catch (Exception e1) { - // Skip some invalid file. - // File content may be not a valid object id. - Context.println("Failed to put name to vineyard: " + e1.getMessage()); - } - } + + ObjectID objectID = client.getName(src.toString()); + client.putName(objectID, dst.toString()); + client.dropName(src.toString()); printAllFiles(); return true; } // dst file exist - mergeFile(newSrc, newDst); - deleteInternal(newSrc, true); + mergeFile(src, dst); + deleteFileWithoutObject(src); printAllFiles(); return true; } } - public void syncWithVineyard(String prefix) throws IOException { - IPCClient client = Context.getClient(); - String reg = "^" + prefix + ".*"; - Map objects = client.listNames(reg, true, 255); - for (val object : objects.entrySet()) { - try { - fs.getFileStatus(new Path("file://" + object.getKey())); - } catch (FileNotFoundException e) { - // file not exist - Path path = new Path("file://" + object.getKey()); - FSDataOutputStream out = fs.create(path); - ObjectID id = object.getValue(); - out.write((id.toString() + "\n").getBytes(StandardCharsets.UTF_8)); - out.close(); - } + @Override + public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { + try (val lock = this.lock.open()) { + return listStatusInternal(new Path(path.toString().replaceAll("/+", "/"))); } } - @Override - public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException { + private FileStatus[] listStatusInternal(Path path) throws FileNotFoundException, IOException { List result = new ArrayList(); - try (val lock = this.lock.open()) { - String prefix = - path.toString() - .substring(path.toString().indexOf(":") + 1) - .replaceAll("/+", "/"); - syncWithVineyard(prefix); - try { - FileStatus status = - fs.getFileStatus(new Path(path.toString().replaceAll("vineyard", "file"))); - FileStatus[] statusArray = - fs.listStatus(new Path(path.toString().replaceAll("vineyard", "file"))); - for (FileStatus s : statusArray) { - FileStatus temp = - new FileStatus( - s.getLen(), - s.isDirectory(), - s.getReplication(), - s.getBlockSize(), - s.getModificationTime(), - s.getAccessTime(), - new FsPermission((short) 777), - s.getOwner(), - s.getGroup(), - new Path( - SCHEME - + ":///" - + s.getPath() - .toString() - .substring( - s.getPath() - .toString() - .indexOf(":") - + 1) - .replaceAll("/+", "/"))); - result.add(temp); - } - } catch (FileNotFoundException e) { - // file not exist - return new FileStatus[0]; - } + VineyardFileStat[] vineyardFileStats = + VineyardFileUtils.listFileStats(client, path.toString()); + for (VineyardFileStat vineyardFileStat : vineyardFileStats) { + FileStatus temp = + new FileStatus( + vineyardFileStat.getLength(), + vineyardFileStat.isDir(), + vineyardFileStat.getBlockReplication(), + vineyardFileStat.getBlockSize(), + vineyardFileStat.getModifyTime(), + vineyardFileStat.getAccessTime(), + new FsPermission(vineyardFileStat.getPermission()), + null, + null, + new Path(vineyardFileStat.getFilePath())); + result.add(temp); } printAllFiles(); return result.toArray(new FileStatus[result.size()]); @@ -541,20 +427,24 @@ public Path getWorkingDirectory() { @Override public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException { try (val lock = this.lock.open()) { - return this.mkdirsInternal(path, fsPermission); + return this.mkdirsInternal( + new Path(path.toString().replaceAll("/+", "/")), fsPermission); } } private boolean mkdirsInternal(Path path, FsPermission fsPermission) throws IOException { - - Path newPath = new Path(path.toString().replaceAll("vineyard", "file")); try { - fs.getFileStatus(newPath); + getFileStatusInternal(path); } catch (FileNotFoundException e) { // file not exist - boolean result = fs.mkdirs(newPath); + Path parentPath = path.getParent(); + if (parentPath != null) { + mkdirsInternal(parentPath, fsPermission); + } + + new VineyardFile(path.toString(), true, VineyardFile.Mode.WRITE, false); printAllFiles(); - return result; + return true; } return false; } @@ -562,33 +452,26 @@ private boolean mkdirsInternal(Path path, FsPermission fsPermission) throws IOEx @Override public FileStatus getFileStatus(Path path) throws IOException { try (val lock = this.lock.open()) { - return this.getFileStatusInternal(path); + return this.getFileStatusInternal(new Path(path.toString().replaceAll("/+", "/"))); } } public FileStatus getFileStatusInternal(Path path) throws IOException { printAllFiles(); - FileStatus temp = - fs.getFileStatus(new Path(path.toString().replaceAll("vineyard", "file"))); + VineyardFileStat vineyardFileStat = + VineyardFileUtils.getFileStatus(client, path.toString()); FileStatus result = new FileStatus( - temp.getLen(), - temp.isDirectory(), - temp.getReplication(), - temp.getBlockSize(), - temp.getModificationTime(), - temp.getAccessTime(), - new FsPermission((short) 777), - temp.getOwner(), - temp.getGroup(), - new Path( - SCHEME - + ":///" - + temp.getPath() - .toString() - .substring( - temp.getPath().toString().indexOf(":") + 1) - .replaceAll("/+", "/"))); + vineyardFileStat.getLength(), + vineyardFileStat.isDir(), + vineyardFileStat.getBlockReplication(), + vineyardFileStat.getBlockSize(), + vineyardFileStat.getModifyTime(), + vineyardFileStat.getAccessTime(), + new FsPermission(vineyardFileStat.getPermission()), + null, + null, + path); return result; } diff --git a/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java b/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java index 22162d7ca..06720e7d4 100644 --- a/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java +++ b/java/hive/src/main/java/io/v6d/hive/ql/io/VineyardInputFormat.java @@ -103,7 +103,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { if (len <= 0) { continue; } - String[] objectIDs = new String(buffer, StandardCharsets.UTF_8).split("\n"); + String[] objectIDs = new String(buffer, StandardCharsets.US_ASCII).split("\n"); for (val objectID : objectIDs) { try { ObjectID tableID = ObjectID.fromString(objectID); @@ -118,7 +118,7 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { "Skipping invalid file: " + tableFilePath + ", content: " - + new String(buffer, StandardCharsets.UTF_8)); + + new String(buffer, StandardCharsets.US_ASCII)); break; } } @@ -184,7 +184,7 @@ class VineyardRecordReader implements RecordReader objectIDs = new HashSet(); + objectIDs.add(fileObjectID); + client.delete(objectIDs, false, false); + client.dropName(filePath); + } else { + throw new IOException("File already exist."); + } + } + + public void close() throws IOException { + if (this.isDir || this.mode == Mode.READ) { + // do nothing + return; + } + + // Write to vineyard + ObjectMeta fileMeta = ObjectMeta.empty(); + fileMeta.setTypename("vineyard::File"); + fileMeta.setValue("is_dir_", false); + fileMeta.setValue("modify_time_", System.currentTimeMillis()); + fileMeta.setValue("access_time_", (long) -1); + + content = Arrays.copyOfRange(content, 0, length); + byte[] base64EncodedContent = Base64.getEncoder().encode(content); + fileMeta.setValue( + "base64_content_", new String(base64EncodedContent, StandardCharsets.UTF_8)); + fileMeta.setValue("length_", length); + + fileMeta = client.createMetaData(fileMeta); + client.persist(fileMeta.getId()); + client.putName(fileMeta.getId(), filePath); + } + + private void expandContent() { + byte[] newContent = new byte[content.length * 2]; + System.arraycopy(content, 0, newContent, 0, length); + content = newContent; + } + + public void write(int b) throws IOException { + byte byteValue = (byte) (b & 0xff); + if (length >= content.length) { + expandContent(); + } + content[length] = byteValue; + length++; + } + + public void seek(long offset) throws IOException { + if (offset > content.length || offset < 0) { + throw new IOException("Seek offset is out of range."); + } + pos = (int) offset; + } + + public long getPos() throws IOException { + return pos; + } + + public int read() throws IOException { + int result = -1; + if (pos >= content.length) { + return result; + } + result = content[pos] & 0xff; + pos++; + return result; + } +} diff --git a/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileStat.java b/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileStat.java new file mode 100644 index 000000000..4d190f2af --- /dev/null +++ b/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileStat.java @@ -0,0 +1,131 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.v6d.modules.basic.filesystem; + +public class VineyardFileStat { + private String filePath; + private boolean isDir; + private long modifyTime; + private long length; + private long accessTime; + private int blockReplication; + private long blockSize; + private String owner; + private String group; + private short permission; + + public VineyardFileStat( + String filePath, + boolean isDir, + long modifyTime, + long length, + long accessTime, + int blockReplication, + long blockSize, + String owner, + String group, + short permission) { + this.filePath = filePath; + this.isDir = isDir; + this.modifyTime = modifyTime; + this.length = length; + this.accessTime = accessTime; + this.blockReplication = blockReplication; + this.blockSize = blockSize; + this.owner = owner; + this.group = group; + this.permission = permission; + } + + public String getFilePath() { + return filePath; + } + + public boolean isDir() { + return isDir; + } + + public long getModifyTime() { + return modifyTime; + } + + public long getLength() { + return length; + } + + public long getAccessTime() { + return accessTime; + } + + public int getBlockReplication() { + return blockReplication; + } + + public long getBlockSize() { + return blockSize; + } + + public String getOwner() { + return owner; + } + + public String getGroup() { + return group; + } + + public short getPermission() { + return permission; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public void setDir(boolean dir) { + isDir = dir; + } + + public void setModifyTime(long modifyTime) { + this.modifyTime = modifyTime; + } + + public void setLength(long length) { + this.length = length; + } + + public void setAccessTime(long accessTime) { + this.accessTime = accessTime; + } + + public void setBlockReplication(int blockReplication) { + this.blockReplication = blockReplication; + } + + public void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public void setGroup(String group) { + this.group = group; + } + + public void setPermission(short permission) { + this.permission = permission; + } +} diff --git a/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileUtils.java b/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileUtils.java new file mode 100644 index 000000000..43f49b9e1 --- /dev/null +++ b/java/modules/basic/src/main/java/io/v6d/modules/basic/filesystem/VineyardFileUtils.java @@ -0,0 +1,114 @@ +/** Copyright 2020-2023 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.v6d.modules.basic.filesystem; + +import io.v6d.core.client.Context; +import io.v6d.core.client.IPCClient; +import io.v6d.core.client.ds.ObjectMeta; +import io.v6d.core.common.util.ObjectID; +import io.v6d.core.common.util.VineyardException.ObjectNotExists; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import lombok.val; + +public class VineyardFileUtils { + public static VineyardFileStat getFileStatus(IPCClient client, String path) throws IOException { + ObjectID fileObjectID; + try { + fileObjectID = client.getName(path); + } catch (ObjectNotExists e) { + throw new FileNotFoundException(path + " is not found."); + } + ObjectMeta meta = client.getMetaData(fileObjectID); + boolean isDir = meta.getBooleanValue("is_dir_"); + int len = meta.getIntValue("length_"); + long modifyTime = meta.getLongValue("modify_time_"); + long accessTime = meta.getLongValue("access_time_"); + return new VineyardFileStat( + path, isDir, modifyTime, len, accessTime, 1, 1, null, null, (short) 777); + } + + public static VineyardFileStat[] listFileStats(IPCClient client, String path) + throws IOException { + List result = new ArrayList(); + VineyardFileStat fileStat; + try { + fileStat = VineyardFileUtils.getFileStatus(client, path.toString()); + } catch (FileNotFoundException e) { + throw new FileNotFoundException(path.toString() + " is not found."); + } + if (!fileStat.isDir()) { + // file + result.add(fileStat); + } else { + // dir + String pattern = "^" + path + "/[^/]*"; + Map objects = Context.getClient().listNames(pattern, true, 255); + for (val object : objects.entrySet()) { + ObjectID objectID = object.getValue(); + ObjectMeta meta = client.getMetaData(objectID); + if (meta.getTypename().compareTo("vineyard::File") != 0) { + continue; + } + boolean isDir = meta.getBooleanValue("is_dir_"); + int len = meta.getIntValue("length_"); + long modifyTime = meta.getLongValue("modify_time_"); + long accessTime = meta.getLongValue("access_time_"); + VineyardFileStat temp = + new VineyardFileStat( + object.getKey(), + isDir, + modifyTime, + len, + accessTime, + 1, + 1, + null, + null, + (short) 777); + result.add(temp); + } + } + return result.toArray(new VineyardFileStat[result.size()]); + } + + public static void printAllFiles(IPCClient client) throws IOException { + Context.println("-----------------------------------"); + Map objects; + try { + objects = client.listNames(".*", true, 255); + } catch (Exception e) { + Context.println("Failed to list names: " + e.getMessage()); + return; + } + for (val object : objects.entrySet()) { + try { + ObjectMeta meta = client.getMetaData(object.getValue()); + if (meta.getTypename().compareTo("vineyard::File") == 0) { + String type = meta.getBooleanValue("is_dir_") ? "dir" : "file"; + Context.println("Type:" + type + " " + object.getKey()); + } + } catch (Exception e) { + // Skip some invalid object id. + Context.println("Failed to get object meta: " + e.getMessage()); + continue; + } + } + Context.println("-----------------------------------"); + } +}