Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
JiamingMai committed Oct 26, 2023
2 parents ad2ac8b + 7a5734f commit 8d057f8
Show file tree
Hide file tree
Showing 38 changed files with 618 additions and 207 deletions.
1 change: 0 additions & 1 deletion .github/workflows/java8_integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ jobs:
mkdir -p ~/.m2
ALLUXIO_DOCKER_NO_TTY=true \
ALLUXIO_DOCKER_GIT_CLEAN=true \
ALLUXIO_DOCKER_MVN_PROJECT_LIST=dora/tests \
ALLUXIO_DOCKER_MVN_TESTS=${{ matrix.modules }} \
dev/github/run_docker.sh
timeout-minutes: 60
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/java8_integration_tests_ft.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ jobs:
ALLUXIO_DOCKER_FORK_COUNT=1 \
ALLUXIO_DOCKER_NO_TTY=true \
ALLUXIO_DOCKER_GIT_CLEAN=true \
ALLUXIO_DOCKER_MVN_PROJECT_LIST=dora/tests \
ALLUXIO_DOCKER_MVN_TESTS=${{ matrix.modules }} \
dev/github/run_docker.sh
timeout-minutes: 60
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/java8_integration_tests_webui.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ jobs:
mkdir -p ~/.m2
ALLUXIO_DOCKER_NO_TTY=true \
ALLUXIO_DOCKER_GIT_CLEAN=true \
ALLUXIO_DOCKER_MVN_PROJECT_LIST=dora/tests,webui \
ALLUXIO_DOCKER_MVN_TESTS=${{ matrix.modules }} \
dev/github/run_docker.sh
timeout-minutes: 60
Expand Down
25 changes: 14 additions & 11 deletions common/transport/src/main/proto/grpc/block_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,20 @@ message LoadFileRequest {

// A subtask of a load file request. either a load data or load metadata.
message LoadSubTask {
optional Block block = 1;
optional UfsStatus ufs_status = 2;
optional LoadDataSubTask load_data_subtask = 1;
optional LoadMetadataSubTask load_metadata_subtask = 2;
}

message LoadDataSubTask {
required int64 length = 1;
optional string ufs_path = 2;
optional int64 offset_in_file = 3;
optional UfsStatus ufs_status = 4;
optional WorkerNetAddress main_worker = 5;
}

message LoadMetadataSubTask {
optional UfsStatus ufs_status = 1;
}

message File{
Expand Down Expand Up @@ -303,15 +315,6 @@ message LoadFailure {
optional bool retryable = 4;
}

message FileFailure {
required File file = 1;
// The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code].
required int32 code = 2;
// A developer-facing error message
optional string message = 3;
optional bool retryable = 4;
}

// Response for an async cache request
message AsyncCacheResponse {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ message LoadJobPOptions {
optional bool loadMetadataOnly = 4;
optional bool skipIfExists = 5;
optional string fileFilterRegx = 6;
optional int32 replicas = 7;
}

message CopyJobPOptions {
Expand Down
1 change: 1 addition & 0 deletions common/transport/src/main/proto/proto/journal/job.proto
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ message LoadJobEntry {
optional bool load_metadata_only = 9;
optional bool skip_if_exists = 10;
optional string file_filter_regx = 11;
optional int32 replicas = 12;
}

// next available id: 13
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;

/**
* An improved implementation of the prefetch cache policy that only halves the prefetch size,
* on cache miss.
*/
public class AdaptivePrefetchCachePolicy implements PrefetchCachePolicy {
private int mPrefetchSize = 0;
private long mLastCallEndPos = -1;
private final int mMaxPrefetchSize =
(int) Configuration.getBytes(PropertyKey.USER_POSITION_READER_STREAMING_PREFETCH_MAX_SIZE);

@Override
public void addTrace(long pos, int size) {
if (pos == mLastCallEndPos) {
// increase the prefetch size by the size of cumulative, consecutive reads
mPrefetchSize = Math.min(mMaxPrefetchSize, mPrefetchSize + size);
}
mLastCallEndPos = pos + size;
}

@Override
public void onCacheHitRead() {
// Noop
}

@Override
public void onCacheMissRead() {
// on prefetch cache miss, there may be a chance that the read position is
// not consecutive, e.g. the reader seeks to a position far away from the
// previous position
// halve the prefetch size to be conservative
mPrefetchSize /= 2;
}

@Override
public int getPrefetchSize() {
return mPrefetchSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;

import com.google.common.collect.EvictingQueue;

/**
* A base prefetch cache policy that increases the prefetch window if the read pattern is
* contiguous and reduce the window down to the read size if it is not.
*/
public class BasePrefetchCachePolicy implements PrefetchCachePolicy {
private int mPrefetchSize = 0;
private final EvictingQueue<CallTrace> mCallHistory = EvictingQueue.create(
Configuration.getInt(PropertyKey.USER_POSITION_READER_STREAMING_MULTIPLIER));

@Override
public void addTrace(long pos, int size) {
mCallHistory.add(new CallTrace(pos, size));
int consecutiveReadLength = 0;
long lastReadEnd = -1;
for (CallTrace trace : mCallHistory) {
if (trace.mPosition == lastReadEnd) {
lastReadEnd += trace.mLength;
consecutiveReadLength += trace.mLength;
} else {
lastReadEnd = trace.mPosition + trace.mLength;
consecutiveReadLength = trace.mLength;
}
}
mPrefetchSize = consecutiveReadLength;
}

@Override
public void onCacheHitRead() {
// Noop
}

@Override
public void onCacheMissRead() {
// Noop
}

@Override
public int getPrefetchSize() {
return mPrefetchSize;
}

private static class CallTrace {
final long mPosition;
final int mLength;

private CallTrace(long pos, int length) {
mPosition = pos;
mLength = length;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.amazonaws.annotation.NotThreadSafe;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.EvictingQueue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.slf4j.Logger;
Expand Down Expand Up @@ -51,35 +50,18 @@ public class PositionReadFileInStream extends FileInStream {

private class PrefetchCache implements AutoCloseable {
private final long mFileLength;
private final EvictingQueue<CallTrace> mCallHistory;
private int mPrefetchSize = 0;

private ByteBuf mCache = Unpooled.wrappedBuffer(new byte[0]);
private long mCacheStartPos = 0;
private final PrefetchCachePolicy mPolicy;

PrefetchCache(int prefetchMultiplier, long fileLength) {
mCallHistory = EvictingQueue.create(prefetchMultiplier);
PrefetchCache(PrefetchCachePolicy policy, long fileLength) {
mPolicy = policy;
mFileLength = fileLength;
}

private void update() {
int consecutiveReadLength = 0;
long lastReadEnd = -1;
for (CallTrace trace : mCallHistory) {
if (trace.mPosition == lastReadEnd) {
lastReadEnd += trace.mLength;
consecutiveReadLength += trace.mLength;
} else {
lastReadEnd = trace.mPosition + trace.mLength;
consecutiveReadLength = trace.mLength;
}
}
mPrefetchSize = consecutiveReadLength;
}

private void addTrace(long pos, int size) {
mCallHistory.add(new CallTrace(pos, size));
update();
mPolicy.addTrace(pos, size);
}

/**
Expand All @@ -99,13 +81,16 @@ private int fillWithCache(long targetStartPos, ByteBuffer outBuffer) {
slice.limit(size);
mCache.getBytes(posInCache, slice);
outBuffer.position(outBuffer.position() + size);
mPolicy.onCacheHitRead();
return size;
} else {
// the position is beyond the cache end position
mPolicy.onCacheMissRead();
return 0;
}
} else {
// the position is behind the cache start position
mPolicy.onCacheMissRead();
return 0;
}
}
Expand All @@ -129,13 +114,17 @@ private int prefetch(PositionReader reader, long pos, int minBytesToRead) {
}
}

int prefetchSize = Math.max(mPrefetchSize, minBytesToRead);
int prefetchSize = Math.max((int) mPolicy.getPrefetchSize(), minBytesToRead);
// cap to remaining file length
prefetchSize = (int) Math.min(mFileLength - pos, prefetchSize);

if (mCache.capacity() < prefetchSize) {
mCache.release();
mCache = PooledDirectNioByteBuf.allocate(prefetchSize);
try {
mCache = PooledDirectNioByteBuf.allocate(prefetchSize);
} catch (OutOfMemoryError oom) {
mCache = Unpooled.wrappedBuffer(new byte[0]);
}
mCacheStartPos = 0;
}
mCache.clear();
Expand All @@ -161,16 +150,6 @@ public void close() {
}
}

private static class CallTrace {
final long mPosition;
final int mLength;

private CallTrace(long pos, int length) {
mPosition = pos;
mLength = length;
}
}

/**
* Constructor.
* @param reader the position reader
Expand All @@ -187,7 +166,7 @@ public PositionReadFileInStream(
mPositionReader = reader;
mLength = uriStatus.getLength();
mCache = new PrefetchCache(
Configuration.getInt(PropertyKey.USER_POSITION_READER_STREAMING_MULTIPLIER), mLength);
PrefetchCachePolicy.Factory.create(), mLength);
long dataPreloadFileSizeThreshold =
Configuration.getBytes(PropertyKey.USER_POSITION_READER_PRELOAD_DATA_FILE_SIZE_THRESHOLD);
mDataPreloadEnabled =
Expand All @@ -213,7 +192,7 @@ long getBufferedPosition() {

@VisibleForTesting
int getPrefetchSize() {
return mCache.mPrefetchSize;
return mCache.mPolicy.getPrefetchSize();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.client.file;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;

import com.amazonaws.annotation.NotThreadSafe;

/**
* The prefetch cache policy to determine the prefetch size.
*/
@NotThreadSafe
public interface PrefetchCachePolicy {
/**
* Adds the trace of a read request.
* @param pos the position
* @param size the size
*/
void addTrace(long pos, int size);

/**
* Called when a read hits the cache.
*/
void onCacheHitRead();

/**
* Called when a read does not hit the cache.
*/
void onCacheMissRead();

/**
* @return the expected prefetch size
*/
int getPrefetchSize();

/**
* The factory class.
*/
class Factory {
/**
* @return a prefetch cache policy
*/
public static PrefetchCachePolicy create() {
if (Configuration.getBoolean(
PropertyKey.USER_POSITION_READER_STREAMING_ADAPTIVE_POLICY_ENABLED)) {
return new AdaptivePrefetchCachePolicy();
}
return new BasePrefetchCachePolicy();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void commitFile(String fileId) {
try {
mCacheManager.commitFile(fileId);
} catch (Exception e) {
LOG.error("Failed to commit file {}", fileId);
LOG.error("Failed to commit file {}", fileId, e);
}
}

Expand Down
Loading

0 comments on commit 8d057f8

Please sign in to comment.