Skip to content

Commit

Permalink
Improve page loading performance
Browse files Browse the repository at this point in the history
Cherry-pick of existing commit.
orig-pr: #18389
orig-commit: 5421aa4
orig-commit-author: elega <[email protected]>

			pr-link: #18411
			change-id: cid-46e89dcfd22dcbbb4090df7f6c8861408ef8cb67
  • Loading branch information
alluxio-bot authored Nov 10, 2023
1 parent 08254a0 commit c2fa1f7
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public static BlockWorkerClient create(UserState userState, GrpcServerAddress ad
* @param request the cache request
* @return listenable future of CacheDataResponse
*/
ListenableFuture<CacheDataResponse> cacheData(CacheDataRequest request);
CacheDataResponse cacheData(CacheDataRequest request);

/**
* Free this worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ public void cache(CacheRequest request) {
}

@Override
public ListenableFuture<CacheDataResponse> cacheData(CacheDataRequest request) {
public CacheDataResponse cacheData(CacheDataRequest request) {
try {
return mRpcFutureStub.withDeadlineAfter(mRpcTimeoutMs, TimeUnit.MILLISECONDS)
return mRpcBlockingStub.withDeadlineAfter(mRpcTimeoutMs, TimeUnit.MILLISECONDS)
.cacheData(request);
} catch (Exception e) {
LOG.warn("Error sending cache data request {} to worker {}.", request, mAddress, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.PositionReader;
import alluxio.client.file.dora.DoraCacheClient;
import alluxio.collections.ConcurrentHashSet;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PreconditionMessage;
Expand Down Expand Up @@ -47,6 +48,10 @@ public class PositionReadFileInStream extends FileInStream {
private final boolean mDataPreloadEnabled;
private final long mNumPreloadedDataSize =
Configuration.getBytes(PropertyKey.USER_POSITION_READER_PRELOAD_DATA_SIZE);
private final ConcurrentHashSet<Long> mPreloadingPages = new ConcurrentHashSet<>();
// TODO(elega) this should be retrieved from the uri status.
private final long mWorkerPageSize =
Configuration.getBytes(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE);

private class PrefetchCache implements AutoCloseable {
private final long mFileLength;
Expand Down Expand Up @@ -104,13 +109,15 @@ private int fillWithCache(long targetStartPos, ByteBuffer outBuffer) {
* @return number of bytes that's been prefetched, 0 if exception occurs
*/
private int prefetch(PositionReader reader, long pos, int minBytesToRead) {
if (mDataPreloadEnabled) {
if (mDataPreloadEnabled
&& mPreloadingPages.addIfAbsent(pos / mWorkerPageSize)) {
try {
mClient.cacheData(
mURIStatus.getUfsPath(), pos,
Math.min(mURIStatus.getLength() - pos, mNumPreloadedDataSize));
} catch (Throwable t) {
LOG.warn("Preload data failed for {}", mURIStatus.getUfsPath(), t);
mPreloadingPages.remove(pos / mWorkerPageSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,18 @@ public void put(PageId pageId,
Path parent = Preconditions.checkNotNull(pagePath.getParent(),
"parent of cache file should not be null");
Files.createDirectories(parent);
Files.createFile(pagePath);
}
// extra try to ensure output stream is closed
try (FileOutputStream fos = new FileOutputStream(pagePath.toFile(), false)) {
fos.getChannel().write(page);
}
} catch (Exception e) {
} catch (Throwable t) {
Files.deleteIfExists(pagePath);
if (e.getMessage() != null && e.getMessage().contains(ERROR_NO_SPACE_LEFT)) {
if (t.getMessage() != null && t.getMessage().contains(ERROR_NO_SPACE_LEFT)) {
throw new ResourceExhaustedException(
String.format("%s is full, configured with %d bytes", mRoot, mCapacity), e);
String.format("%s is full, configured with %d bytes", mRoot, mCapacity), t);
}
throw new IOException("Failed to write file " + pagePath + " for page " + pageId, e);
throw new IOException("Failed to write file " + pagePath + " for page " + pageId, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import alluxio.exception.InvalidPathException;
import alluxio.exception.status.PermissionDeniedException;
import alluxio.grpc.CacheDataRequest;
import alluxio.grpc.CacheDataResponse;
import alluxio.grpc.CompleteFilePOptions;
import alluxio.grpc.CompleteFilePRequest;
import alluxio.grpc.CreateDirectoryPOptions;
Expand Down Expand Up @@ -63,10 +62,6 @@
import alluxio.resource.CloseableResource;
import alluxio.wire.WorkerNetAddress;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -481,17 +476,7 @@ public void cacheData(String ufsPath, long pos, long length) {
.setLength(length)
.setAsync(true)
.build();
ListenableFuture<CacheDataResponse> future = client.get().cacheData(request);
Futures.addCallback(future, new FutureCallback<CacheDataResponse>() {
@Override
public void onSuccess(CacheDataResponse result) {
}

@Override
public void onFailure(Throwable t) {
LOG.warn("Preloading {} failed", ufsPath, t);
}
}, MoreExecutors.directExecutor());
client.get().cacheData(request);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
12 changes: 12 additions & 0 deletions dora/core/common/src/main/java/alluxio/conf/PropertyKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -4008,6 +4008,16 @@ public String toString() {
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_FAST_DATA_LOAD_ENABLED =
booleanBuilder(Name.WORKER_FAST_DATA_LOAD_ENABLED)
.setDescription("If enabled, instead of creating a reader to load the data, "
+ "we just get the data from UFS and put it into the page store. This is an "
+ "experimental feature.")
.setIsHidden(true)
.setDefaultValue(false)
.setConsistencyCheckLevel(ConsistencyCheckLevel.WARN)
.setScope(Scope.WORKER)
.build();
public static final PropertyKey WORKER_DATA_SERVER_DOMAIN_SOCKET_AS_UUID =
booleanBuilder(Name.WORKER_DATA_SERVER_DOMAIN_SOCKET_AS_UUID)
.setDefaultValue(false)
Expand Down Expand Up @@ -7934,6 +7944,8 @@ public static final class Name {
"alluxio.worker.data.server.domain.socket.address";
public static final String WORKER_DATA_SERVER_DOMAIN_SOCKET_AS_UUID =
"alluxio.worker.data.server.domain.socket.as.uuid";
public static final String WORKER_FAST_DATA_LOAD_ENABLED =
"alluxio.worker.fast.data.load.enabled";
public static final String WORKER_FUSE_ENABLED =
"alluxio.worker.fuse.enabled";
public static final String WORKER_FUSE_MOUNT_ALLUXIO_PATH =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public class PagedDoraWorker extends AbstractWorker implements DoraWorker {
private final ConcurrentHashSet<PageId> mLoadingPages = new ConcurrentHashSet<>();
private final ExecutorService mCacheDataExecutor = Executors.newFixedThreadPool(
Configuration.getInt(PropertyKey.WORKER_PRELOAD_DATA_THREAD_POOL_SIZE));
private final boolean mFastDataLoadEnabled;

/**
* Constructor.
Expand Down Expand Up @@ -216,6 +217,7 @@ public PagedDoraWorker(
mClientWriteToUFSEnabled = mConf
.getBoolean(PropertyKey.CLIENT_WRITE_TO_UFS_ENABLED);
mXAttrWriteToUFSEnabled = mConf.getBoolean(PropertyKey.UNDERFS_XATTR_CHANGE_ENABLED);
mFastDataLoadEnabled = mConf.getBoolean(PropertyKey.WORKER_FAST_DATA_LOAD_ENABLED);
}

/**
Expand Down Expand Up @@ -540,9 +542,6 @@ public void cacheData(String ufsPath, long length, long pos, boolean isAsync)
// TODO(yimin) As an optimization, data does not need to load on a page basis.
// Can implement a bulk load mechanism and load a couple of pages at the same time,
// to improve the performance.
if (mLoadingPages.contains(pageId)) {
continue;
}
if (mCacheManager.hasPageUnsafe(pageId)) {
continue;
}
Expand All @@ -551,16 +550,20 @@ public void cacheData(String ufsPath, long length, long pos, boolean isAsync)
if (loadLength == 0) {
continue;
}
mLoadingPages.add(pageId);
if (!mLoadingPages.addIfAbsent(pageId)) {
continue;
}

futures.add(CompletableFuture.runAsync(() -> {
try {
if (mCacheManager.hasPageUnsafe(pageId)) {
return;
}
LOG.debug("Preloading {} pos: {} length: {}", ufsPath, loadPos, loadLength);
loadData(ufsPath, 0, loadPos, loadLength, fi.getLength());
LOG.debug("Preloading {} pos: {} length: {} started", ufsPath, loadPos, loadLength);
loadPages(ufsPath, Collections.singletonList(pageId), fi.getLength());
LOG.debug("Preloading {} pos: {} length: {} finished", ufsPath, loadPos, loadLength);
} catch (Exception e) {
LOG.debug("Preloading failed for {} page: {}", ufsPath, pageId, e);
LOG.info("Preloading failed for {} page: {}", ufsPath, pageId, e);
} finally {
mLoadingPages.remove(pageId);
}
Expand Down Expand Up @@ -604,8 +607,15 @@ private ListenableFuture<Void> submitLoadDataSubTask(
}
}
else {
loadData(subTask.getUfsPath(), 0, subTask.getOffsetInFile(), subTask.getLength(),
fileLength);
if (mFastDataLoadEnabled) {
loadPages(
subTask.getUfsPath(), 0, subTask.getOffsetInFile(), subTask.getLength(),
fileLength);
} else {
loadData(
subTask.getUfsPath(), 0, subTask.getOffsetInFile(), subTask.getLength(),
fileLength);
}
}
} catch (Throwable e) {
LOG.error("Loading {} failed", subTask, e);
Expand Down Expand Up @@ -660,6 +670,47 @@ private void loadMetadata(UfsStatus status, List<LoadFailure> errors) {
}
}

private void loadPages(String ufsPath, List<PageId> pageIds, long fileLength)
throws AccessControlException, IOException {
Optional<UnderFileSystem> ufs = mUfsManager.get(new AlluxioURI(ufsPath));
if (!ufs.isPresent()) {
throw new RuntimeException("Ufs not found for " + ufsPath);
}
long lastPageId = fileLength / mPageSize;
// TODO(elega) can batch multiple pages together ot speed up the loading.
for (PageId pageId : pageIds) {
int lengthToLoad = (int)
(pageId.getPageIndex() == lastPageId ? fileLength % mPageSize : mPageSize);
long offset = pageId.getPageIndex() * mPageSize;
ByteBuf buf = PooledByteBufAllocator.DEFAULT.directBuffer(lengthToLoad);
try (PositionReader reader = ufs.get().openPositionRead(ufsPath, fileLength)) {
int bytesRead = reader.read(offset, buf, lengthToLoad);
if (lengthToLoad != bytesRead) {
throw new RuntimeException(
"Page load failed, expected: " + lengthToLoad + " actual " + bytesRead);
}
mCacheManager.put(pageId, buf.nioBuffer());
} finally {
buf.release();
}
}
}

private void loadPages(
String ufsPath, long mountId, long offset, long lengthToLoad, long fileLength)
throws AccessControlException, IOException {
if (lengthToLoad == 0) {
return;
}
lengthToLoad = Math.min(lengthToLoad, fileLength - offset);
List<PageId> pagesToLoad = new ArrayList<>();
String fileId = new AlluxioURI(ufsPath).hash();
for (long current = offset; current < offset + lengthToLoad; current += mPageSize) {
pagesToLoad.add(new PageId(fileId, current / mPageSize));
}
loadPages(ufsPath, pagesToLoad, fileLength);
}

protected void loadData(String ufsPath, long mountId, long offset, long lengthToLoad,
long fileLength) throws AccessControlException, IOException {
Protocol.OpenUfsBlockOptions options =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public class PagedDoraWorkerTest {

@Before
public void before() throws Exception {
Configuration.set(PropertyKey.WORKER_FAST_DATA_LOAD_ENABLED, true);
Configuration.set(PropertyKey.DORA_WORKER_METASTORE_ROCKSDB_DIR,
mTestFolder.newFolder("rocks"));
Configuration.set(PropertyKey.WORKER_PAGE_STORE_PAGE_SIZE, 10);
Expand Down Expand Up @@ -216,6 +217,28 @@ public void testCacheData() throws Exception {
}
}

@Test
public void testCacheDataNotPageAligned() throws Exception {
int numPages = 10;
long length = mPageSize * numPages - 1;
String ufsPath = mTestFolder.newFile("test").getAbsolutePath();
byte[] buffer = BufferUtils.getIncreasingByteArray((int) length);
BufferUtils.writeBufferToFile(ufsPath, buffer);

mWorker.cacheData(ufsPath, length, 0, false);
List<PageId> cachedPages =
mCacheManager.getCachedPageIdsByFileId(new AlluxioURI(ufsPath).hash(), length);
assertEquals(numPages, cachedPages.size());
int start = 0;
for (PageId pageId : cachedPages) {
long size = numPages == pageId.getPageIndex() + 1 ? length % mPageSize : mPageSize;
byte[] buff = new byte[(int) size];
mCacheManager.get(pageId, (int) size, buff, 0);
assertTrue(BufferUtils.equalIncreasingByteArray(start, (int) size, buff));
start += mPageSize;
}
}

@Test
public void testCacheDataPartial() throws Exception {
int numPages = 10;
Expand Down

0 comments on commit c2fa1f7

Please sign in to comment.