From 8bd15c42d1b7855a36e23acef6bf6030ba695f51 Mon Sep 17 00:00:00 2001 From: Jianjian Date: Sun, 21 Apr 2024 21:40:22 -0700 Subject: [PATCH] try adding both interface in the CacheManager --- .../client/file/cache/CacheManager.java | 16 +++++ .../cache/CacheManagerWithShadowCache.java | 16 +++++ .../client/file/cache/LocalCacheManager.java | 62 +++++++++++++++++++ .../file/cache/NoExceptionCacheManager.java | 15 +++++ .../alluxio/client/file/cache/PageStore.java | 49 +++++++++++++++ .../client/file/cache/TimeBoundPageStore.java | 25 ++++++++ .../file/cache/store/LocalPageStore.java | 33 ++++++++++ .../file/cache/store/MemoryPageStore.java | 18 ++++++ .../file/cache/store/RocksPageStore.java | 21 +++++++ .../CacheManagerWithShadowCacheTest.java | 15 +++++ .../cache/LocalCacheFileInStreamTest.java | 12 ++++ .../alluxio/worker/page/PagedBlockReader.java | 5 +- .../worker/page/ByteArrayCacheManager.java | 12 ++++ 13 files changed, 296 insertions(+), 3 deletions(-) diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java b/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java index 1269d0e68338..d3a0d7cbb2f6 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManager.java @@ -12,6 +12,7 @@ package alluxio.client.file.cache; import alluxio.client.file.CacheContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.PropertyKey; import alluxio.exception.PageNotFoundException; @@ -281,6 +282,21 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, i cacheContext); } + /** + * Reads a part of a page if the queried page is found in the cache, stores the result in buffer. + * + * @param pageId page identifier + * @param pageOffset offset into the page + * @param bytesToRead number of bytes to read in this page + * @param buffer destination buffer to write + * @param cacheContext cache related context + * @return number of bytes read, 0 if page is not found, -1 on errors + * @deprecated only for compatibility reason with PagedBlockStore + */ + @Deprecated + int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + CacheContext cacheContext); + /** * Reads a part of a page if the queried page is found in the cache, stores the result in buffer. * diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManagerWithShadowCache.java b/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManagerWithShadowCache.java index d2039423ad03..357be31064ee 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManagerWithShadowCache.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/CacheManagerWithShadowCache.java @@ -14,6 +14,7 @@ import static alluxio.client.file.CacheContext.StatsUnit.BYTE; import alluxio.client.file.CacheContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.client.quota.CacheScope; import alluxio.conf.AlluxioConfiguration; import alluxio.exception.PageNotFoundException; @@ -61,6 +62,21 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { return mCacheManager.put(pageId, page, cacheContext); } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + CacheContext cacheContext) { + int nread = mShadowCacheManager.get(pageId, bytesToRead, getCacheScope(cacheContext)); + if (nread > 0) { + Metrics.SHADOW_CACHE_PAGES_HIT.inc(); + Metrics.SHADOW_CACHE_BYTES_HIT.inc(nread); + } else { + updateShadowCache(pageId, bytesToRead, cacheContext); + } + Metrics.SHADOW_CACHE_PAGES_READ.inc(); + Metrics.SHADOW_CACHE_BYTES_READ.inc(bytesToRead); + return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext); + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) { diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java b/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java index 4874820581cc..ab3644ee3a40 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/LocalCacheManager.java @@ -20,6 +20,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import alluxio.client.file.CacheContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.client.file.cache.store.PageStoreDir; import alluxio.client.quota.CacheQuota; import alluxio.client.quota.CacheScope; @@ -600,6 +601,48 @@ public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer, return get(pageId, pageOffset, (int) pageSize, buffer, cacheContext); } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + CacheContext cacheContext) { + Preconditions.checkArgument(pageOffset <= mOptions.getPageSize(), + "Read exceeds page boundary: offset=%s size=%s", pageOffset, mOptions.getPageSize()); + Preconditions.checkArgument(bytesToRead <= buffer.remaining(), + "buffer does not have enough space: bufferRemaining=%s bytesToRead=%s", + buffer.remaining(), bytesToRead); + LOG.debug("get({},pageOffset={}) enters", pageId, pageOffset); + if (mState.get() == NOT_IN_USE) { + Metrics.GET_NOT_READY_ERRORS.inc(); + Metrics.GET_ERRORS.inc(); + return -1; + } + ReadWriteLock pageLock = getPageLock(pageId); + try (LockResource r = new LockResource(pageLock.readLock())) { + PageInfo pageInfo; + try (LockResource r2 = new LockResource(mPageMetaStore.getLock().readLock())) { + pageInfo = mPageMetaStore.getPageInfo(pageId); //check if page exists and refresh LRU items + } catch (PageNotFoundException e) { + LOG.debug("get({},pageOffset={}) fails due to page not found", pageId, pageOffset); + return 0; + } + int bytesRead = + getPage(pageInfo, pageOffset, bytesToRead, buffer, cacheContext); + if (bytesRead <= 0) { + Metrics.GET_ERRORS.inc(); + Metrics.GET_STORE_READ_ERRORS.inc(); + // something is wrong to read this page, let's remove it from meta store + try (LockResource r2 = new LockResource(mPageMetaStore.getLock().writeLock())) { + mPageMetaStore.removePage(pageId); + } catch (PageNotFoundException e) { + // best effort to remove this page from meta store and ignore the exception + Metrics.CLEANUP_GET_ERRORS.inc(); + } + return -1; + } + LOG.debug("get({},pageOffset={}) exits", pageId, pageOffset); + return bytesRead; + } + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer, CacheContext cacheContext) { @@ -968,6 +1011,25 @@ private boolean deletePage(PageInfo pageInfo, boolean isTemporary) { return true; } + private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead, + PageReadTargetBuffer target, CacheContext cacheContext) { + try { + int ret = pageInfo.getLocalCacheDir().getPageStore() + .get(pageInfo.getPageId(), pageOffset, bytesToRead, target, + cacheContext.isTemporary()); + if (ret != bytesToRead) { + // data read from page store is inconsistent from the metastore + LOG.error("Failed to read page {}: supposed to read {} bytes, {} bytes actually read", + pageInfo.getPageId(), bytesToRead, ret); + return -1; + } + } catch (IOException | PageNotFoundException e) { + LOG.debug("Failed to get existing page {} from pageStore", pageInfo.getPageId(), e); + return -1; + } + return bytesToRead; + } + private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) { int originOffset = target.offset(); diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/NoExceptionCacheManager.java b/core/client/fs/src/main/java/alluxio/client/file/cache/NoExceptionCacheManager.java index 7e30bbe68abd..130b24b990ca 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/NoExceptionCacheManager.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/NoExceptionCacheManager.java @@ -12,6 +12,7 @@ package alluxio.client.file.cache; import alluxio.client.file.CacheContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.exception.PageNotFoundException; import alluxio.file.ReadTargetBuffer; import alluxio.metrics.MetricKey; @@ -143,6 +144,20 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer } } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + CacheContext cacheContext) { + try { + return mCacheManager + .get(pageId, pageOffset, bytesToRead, buffer, cacheContext); + } catch (Exception e) { + LOG.error("Failed to get page {}, offset {} cacheContext {}", pageId, pageOffset, + cacheContext, e); + Metrics.GET_ERRORS.inc(); + return -1; + } + } + @Override public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer, CacheContext cacheContext, diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/PageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/PageStore.java index a73cd96a8939..57411f02fb57 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/PageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/PageStore.java @@ -14,6 +14,7 @@ import alluxio.Constants; import alluxio.client.file.cache.store.LocalPageStore; import alluxio.client.file.cache.store.MemoryPageStore; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.client.file.cache.store.PageStoreOptions; import alluxio.exception.PageNotFoundException; import alluxio.exception.status.ResourceExhaustedException; @@ -118,6 +119,54 @@ void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws ResourceExhaustedException, IOException; + /** + * Gets a page from the store to the destination buffer. + * + * @param pageId page identifier + * @param buffer destination buffer + * @return the number of bytes read + * @throws IOException when the store fails to read this page + * @throws PageNotFoundException when the page isn't found in the store + */ + default int get(PageId pageId, PageReadTargetBuffer buffer) + throws IOException, PageNotFoundException { + return get(pageId, 0, (int) buffer.remaining(), buffer, false); + } + + /** + * Gets part of a page from the store to the destination buffer. + * + * @param pageId page identifier + * @param pageOffset offset within page + * @param bytesToRead bytes to read in this page + * @param buffer destination buffer + * @return the number of bytes read + * @throws IOException when the store fails to read this page + * @throws PageNotFoundException when the page isn't found in the store + * @throws IllegalArgumentException when the page offset exceeds the page size + */ + default int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer) + throws IOException, PageNotFoundException { + return get(pageId, pageOffset, bytesToRead, buffer, false); + } + + /** + * Gets part of a page from the store to the destination buffer. + * + * @param pageId page identifier + * @param pageOffset offset within page + * @param bytesToRead bytes to read in this page + * @param buffer destination buffer + * @param isTemporary is page data temporary + * @return the number of bytes read + * @throws IOException when the store fails to read this page + * @throws PageNotFoundException when the page isn't found in the store + * @throws IllegalArgumentException when the page offset exceeds the page size + */ + int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + boolean isTemporary) + throws IOException, PageNotFoundException; + /** * Gets a page from the store to the destination buffer. * diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/TimeBoundPageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/TimeBoundPageStore.java index 553708c56e4e..83ec3e04afe6 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/TimeBoundPageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/TimeBoundPageStore.java @@ -11,6 +11,7 @@ package alluxio.client.file.cache; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.client.file.cache.store.PageStoreOptions; import alluxio.exception.PageNotFoundException; import alluxio.exception.status.ResourceExhaustedException; @@ -88,6 +89,30 @@ public void put(PageId pageId, } } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + boolean isTemporary) throws IOException, PageNotFoundException { + Callable callable = () -> + mPageStore.get(pageId, pageOffset, bytesToRead, target, isTemporary); + try { + return mTimeLimter.callWithTimeout(callable, mTimeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // Task got cancelled by others, interrupt the current thread + // and then throw a runtime ex to make the higher level stop. + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (TimeoutException e) { + Metrics.STORE_GET_TIMEOUT.inc(); + throw new IOException(e); + } catch (RejectedExecutionException e) { + Metrics.STORE_THREADS_REJECTED.inc(); + throw new IOException(e); + } catch (Throwable t) { + Throwables.propagateIfPossible(t, IOException.class, PageNotFoundException.class); + throw new IOException(t); + } + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java index b972d944a129..b97609821835 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/store/LocalPageStore.java @@ -91,6 +91,39 @@ public void put(PageId pageId, } } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + boolean isTemporary) throws IOException, PageNotFoundException { + Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative"); + Path pagePath = getPagePath(pageId, isTemporary); + if (!Files.exists(pagePath)) { + throw new PageNotFoundException(pagePath.toString()); + } + long pageLength = pagePath.toFile().length(); + Preconditions.checkArgument(pageOffset <= pageLength, "page offset %s exceeded page size %s", + pageOffset, pageLength); + try (RandomAccessFile localFile = new RandomAccessFile(pagePath.toString(), "r")) { + int bytesSkipped = localFile.skipBytes(pageOffset); + if (pageOffset != bytesSkipped) { + throw new IOException( + String.format("Failed to read page %s (%s) from offset %s: %s bytes skipped", + pageId, pagePath, pageOffset, bytesSkipped)); + } + int bytesRead = 0; + int bytesLeft = (int) Math.min(pageLength - pageOffset, target.remaining()); + bytesLeft = Math.min(bytesLeft, bytesToRead); + while (bytesLeft > 0) { + int bytes = target.readFromFile(localFile, bytesLeft); + if (bytes <= 0) { + break; + } + bytesRead += bytes; + bytesLeft -= bytes; + } + return bytesRead; + } + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java index 7fde058b6c27..634615ac2588 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/store/MemoryPageStore.java @@ -59,6 +59,24 @@ public void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws IOEx } } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + boolean isTemporary) throws IOException, PageNotFoundException { + Preconditions.checkArgument(target != null, "buffer is null"); + Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative"); + PageId pageKey = getKeyFromPageId(pageId); + if (!mPageStoreMap.containsKey(pageKey)) { + throw new PageNotFoundException(pageId.getFileId() + "_" + pageId.getPageIndex()); + } + MemPage page = mPageStoreMap.get(pageKey); + Preconditions.checkArgument(pageOffset <= page.getPageLength(), + "page offset %s exceeded page size %s", pageOffset, page.getPageLength()); + int bytesLeft = (int) Math.min(page.getPageLength() - pageOffset, target.remaining()); + bytesLeft = Math.min(bytesLeft, bytesToRead); + target.writeBytes(page.getPage(), pageOffset, bytesLeft); + return bytesLeft; + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { diff --git a/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java b/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java index 1fb83c5394f6..f5b41a11a8c2 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java +++ b/core/client/fs/src/main/java/alluxio/client/file/cache/store/RocksPageStore.java @@ -177,6 +177,27 @@ public void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws IOEx } } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + boolean isTemporary) throws IOException, PageNotFoundException { + Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative"); + try { + byte[] key = getKeyFromPageId(pageId, false).array(); + byte[] page = mDb.get(mPageColumnHandle, key); + if (page == null) { + throw new PageNotFoundException(new String(key)); + } + Preconditions.checkArgument(pageOffset <= page.length, + "page offset %s exceeded page size %s", pageOffset, page.length); + int bytesLeft = + Math.min(page.length - pageOffset, Math.min((int) target.remaining(), bytesToRead)); + System.arraycopy(page, pageOffset, target.byteArray(), (int) target.offset(), bytesLeft); + return bytesLeft; + } catch (RocksDBException e) { + throw new IOException("Failed to retrieve page", e); + } + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, boolean isTemporary) throws IOException, PageNotFoundException { diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java index 2a4fd57d8dfd..88d2e7b48954 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/CacheManagerWithShadowCacheTest.java @@ -17,6 +17,7 @@ import alluxio.Constants; import alluxio.client.file.CacheContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; import alluxio.conf.PropertyKey; @@ -230,6 +231,20 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { return true; } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer, + CacheContext cacheContext) { + if (!mCache.containsKey(pageId)) { + return 0; + } + byte[] page = mCache.get(pageId); + if (bytesToRead >= 0) { + System.arraycopy(page, pageOffset + 0, buffer.byteArray(), (int) buffer.offset(), + bytesToRead); + } + return bytesToRead; + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer, CacheContext cacheContext) { diff --git a/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java b/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java index 910225b940c8..f3bf69e51d59 100644 --- a/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java +++ b/core/client/fs/src/test/java/alluxio/client/file/cache/LocalCacheFileInStreamTest.java @@ -23,6 +23,7 @@ import alluxio.client.file.MockFileInStream; import alluxio.client.file.URIStatus; import alluxio.client.file.cache.context.CachePerThreadContext; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.conf.AlluxioConfiguration; import alluxio.conf.Configuration; import alluxio.conf.InstancedConfiguration; @@ -773,6 +774,17 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { return true; } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + CacheContext cacheContext) { + if (!mPages.containsKey(pageId)) { + return 0; + } + mPagesServed++; + target.writeBytes(mPages.get(pageId), pageOffset, bytesToRead); + return bytesToRead; + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) { diff --git a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java index 0b9fc83bb975..41b7a95d4016 100644 --- a/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java +++ b/core/server/worker/src/main/java/alluxio/worker/page/PagedBlockReader.java @@ -14,8 +14,8 @@ import alluxio.client.file.CacheContext; import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.PageId; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.exception.runtime.AlluxioRuntimeException; -import alluxio.file.ByteBufferTargetBuffer; import alluxio.grpc.ErrorType; import alluxio.metrics.MetricKey; import alluxio.metrics.MetricsSystem; @@ -105,8 +105,7 @@ private long read(ByteBuf byteBuf, long offset, long length) throws IOException Preconditions.checkArgument(byteBuf.writableBytes() >= length, "buffer overflow, trying to write %s bytes, only %s writable", length, byteBuf.writableBytes()); - ByteBufferTargetBuffer target = - new ByteBufferTargetBuffer(byteBuf.nioBuffer()); + PageReadTargetBuffer target = new NettyBufTargetBuffer(byteBuf); long bytesRead = 0; while (bytesRead < length) { long pos = offset + bytesRead; diff --git a/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java b/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java index b7aa85325f9b..7842c33c4eef 100644 --- a/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java +++ b/core/server/worker/src/test/java/alluxio/worker/page/ByteArrayCacheManager.java @@ -15,6 +15,7 @@ import alluxio.client.file.cache.CacheManager; import alluxio.client.file.cache.CacheUsage; import alluxio.client.file.cache.PageId; +import alluxio.client.file.cache.store.PageReadTargetBuffer; import alluxio.file.ReadTargetBuffer; import alluxio.network.protocol.databuffer.DataFileChannel; @@ -54,6 +55,17 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) { return true; } + @Override + public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target, + CacheContext cacheContext) { + if (!mPages.containsKey(pageId)) { + return 0; + } + mPagesServed++; + target.writeBytes(mPages.get(pageId), pageOffset, bytesToRead); + return bytesToRead; + } + @Override public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target, CacheContext cacheContext) {