From 4271e79d7a7bd4121566a5a859161ce4ab86fd53 Mon Sep 17 00:00:00 2001 From: "445092967@qq.com" <445092967@qq.com> Date: Tue, 17 Oct 2023 23:03:10 +0800 Subject: [PATCH] WIP reuse opened file in worker --- .../file/cache/store/LocalPageStore.java | 23 +++++++++++++++++-- .../databuffer/CompositeDataBuffer.java | 2 +- .../protocol/databuffer/DataFileChannel.java | 14 ++++++++--- .../alluxio/cli/fs/command/CatCommand.java | 4 ++-- 4 files changed, 35 insertions(+), 8 deletions(-) diff --git a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java index 7bedd9a7e6bd..5dd99a8d7624 100644 --- a/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java +++ b/dora/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java @@ -215,11 +215,30 @@ public DataFileChannel getDataFileChannel( if (pageOffset + bytesToRead > fileLength) { bytesToRead = (int) (fileLength - (long) pageOffset); } - - DataFileChannel dataFileChannel = new DataFileChannel(pageFile, pageOffset, bytesToRead); + DataFileChannel lastDataFileChannel = mLastDataFileChannel; + final RandomAccessFile raf; + if (lastDataFileChannel != null && lastDataFileChannel.mPageIndex == pageId.getPageIndex()) { + raf = lastDataFileChannel.mFile; + } else { + try { + raf = new RandomAccessFile(pageFile, "r") { + @Override + public void close() throws IOException { + super.close(); + } + }; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + DataFileChannel dataFileChannel = new DataFileChannel( + pageId.getPageIndex(), raf, pageOffset, bytesToRead); + mLastDataFileChannel = dataFileChannel; return dataFileChannel; } + volatile DataFileChannel mLastDataFileChannel; + @Override public void close() { // no-op diff --git a/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/CompositeDataBuffer.java b/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/CompositeDataBuffer.java index d803ed7b550e..d2c756b49d0f 100644 --- a/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/CompositeDataBuffer.java +++ b/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/CompositeDataBuffer.java @@ -21,7 +21,7 @@ */ public final class CompositeDataBuffer implements DataBuffer { - private final List mDataBufferList; + public final List mDataBufferList; /** * CompositeDataBuffer wraps multiple {@link DataBuffer}. diff --git a/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/DataFileChannel.java b/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/DataFileChannel.java index 5a0cab898826..3850495908ff 100644 --- a/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/DataFileChannel.java +++ b/dora/core/common/src/main/java/alluxio/network/protocol/databuffer/DataFileChannel.java @@ -17,6 +17,7 @@ import java.io.File; import java.io.IOException; import java.io.OutputStream; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; @@ -24,9 +25,10 @@ * A DataBuffer with the underlying data being a {@link FileChannel}. */ public final class DataFileChannel implements DataBuffer { - private final File mFile; + public final RandomAccessFile mFile; private final long mOffset; private final long mLength; + public final long mPageIndex; /** * @@ -34,15 +36,21 @@ public final class DataFileChannel implements DataBuffer { * @param offset The offset into the FileChannel * @param length The length of the data to read */ - public DataFileChannel(File file, long offset, long length) { + public DataFileChannel(long pageIndex, RandomAccessFile file, long offset, long length) { mFile = Preconditions.checkNotNull(file, "file"); + try { + mFile.seek(offset); + } catch (Exception e) { + throw new RuntimeException(e); + } mOffset = offset; mLength = length; + mPageIndex = pageIndex; } @Override public Object getNettyOutput() { - return new DefaultFileRegion(mFile, mOffset, mLength); + return new DefaultFileRegion(mFile.getChannel(), mOffset, mLength); } @Override diff --git a/dora/shell/src/main/java/alluxio/cli/fs/command/CatCommand.java b/dora/shell/src/main/java/alluxio/cli/fs/command/CatCommand.java index 51125212931b..e33652a4f5ca 100644 --- a/dora/shell/src/main/java/alluxio/cli/fs/command/CatCommand.java +++ b/dora/shell/src/main/java/alluxio/cli/fs/command/CatCommand.java @@ -54,11 +54,11 @@ protected void runPlainPath(AlluxioURI path, CommandLine cl) if (status.isFolder()) { throw new FileDoesNotExistException(ExceptionMessage.PATH_MUST_BE_FILE.getMessage(path)); } - byte[] buf = new byte[Constants.MB]; + byte[] buf = new byte[Constants.MB * 16]; try (FileInStream is = mFileSystem.openFile(path)) { int read = is.read(buf); while (read != -1) { - System.out.write(buf, 0, read); +// System.out.write(buf, 0, read); read = is.read(buf); } }