Skip to content

Commit

Permalink
try adding both interface in the CacheManager
Browse files Browse the repository at this point in the history
  • Loading branch information
jja725 committed Apr 22, 2024
1 parent 7574cfe commit 8bd15c4
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.client.file.cache;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageNotFoundException;
Expand Down Expand Up @@ -281,6 +282,21 @@ default int get(PageId pageId, int pageOffset, int bytesToRead, byte[] buffer, i
cacheContext);
}

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
* @param pageId page identifier
* @param pageOffset offset into the page
* @param bytesToRead number of bytes to read in this page
* @param buffer destination buffer to write
* @param cacheContext cache related context
* @return number of bytes read, 0 if page is not found, -1 on errors
* @deprecated only for compatibility reason with PagedBlockStore
*/
@Deprecated
int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
CacheContext cacheContext);

/**
* Reads a part of a page if the queried page is found in the cache, stores the result in buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static alluxio.client.file.CacheContext.StatsUnit.BYTE;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.quota.CacheScope;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.PageNotFoundException;
Expand Down Expand Up @@ -61,6 +62,21 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
return mCacheManager.put(pageId, page, cacheContext);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
CacheContext cacheContext) {
int nread = mShadowCacheManager.get(pageId, bytesToRead, getCacheScope(cacheContext));
if (nread > 0) {
Metrics.SHADOW_CACHE_PAGES_HIT.inc();
Metrics.SHADOW_CACHE_BYTES_HIT.inc(nread);
} else {
updateShadowCache(pageId, bytesToRead, cacheContext);
}
Metrics.SHADOW_CACHE_PAGES_READ.inc();
Metrics.SHADOW_CACHE_BYTES_READ.inc(bytesToRead);
return mCacheManager.get(pageId, pageOffset, bytesToRead, target, cacheContext);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
CacheContext cacheContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.file.cache.store.PageStoreDir;
import alluxio.client.quota.CacheQuota;
import alluxio.client.quota.CacheScope;
Expand Down Expand Up @@ -600,6 +601,48 @@ public int get(PageId pageId, int pageOffset, ReadTargetBuffer buffer,
return get(pageId, pageOffset, (int) pageSize, buffer, cacheContext);
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
CacheContext cacheContext) {
Preconditions.checkArgument(pageOffset <= mOptions.getPageSize(),
"Read exceeds page boundary: offset=%s size=%s", pageOffset, mOptions.getPageSize());
Preconditions.checkArgument(bytesToRead <= buffer.remaining(),
"buffer does not have enough space: bufferRemaining=%s bytesToRead=%s",
buffer.remaining(), bytesToRead);
LOG.debug("get({},pageOffset={}) enters", pageId, pageOffset);
if (mState.get() == NOT_IN_USE) {
Metrics.GET_NOT_READY_ERRORS.inc();
Metrics.GET_ERRORS.inc();
return -1;
}
ReadWriteLock pageLock = getPageLock(pageId);
try (LockResource r = new LockResource(pageLock.readLock())) {
PageInfo pageInfo;
try (LockResource r2 = new LockResource(mPageMetaStore.getLock().readLock())) {
pageInfo = mPageMetaStore.getPageInfo(pageId); //check if page exists and refresh LRU items
} catch (PageNotFoundException e) {
LOG.debug("get({},pageOffset={}) fails due to page not found", pageId, pageOffset);
return 0;
}
int bytesRead =
getPage(pageInfo, pageOffset, bytesToRead, buffer, cacheContext);
if (bytesRead <= 0) {
Metrics.GET_ERRORS.inc();
Metrics.GET_STORE_READ_ERRORS.inc();
// something is wrong to read this page, let's remove it from meta store
try (LockResource r2 = new LockResource(mPageMetaStore.getLock().writeLock())) {
mPageMetaStore.removePage(pageId);
} catch (PageNotFoundException e) {
// best effort to remove this page from meta store and ignore the exception
Metrics.CLEANUP_GET_ERRORS.inc();
}
return -1;
}
LOG.debug("get({},pageOffset={}) exits", pageId, pageOffset);
return bytesRead;
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down Expand Up @@ -968,6 +1011,25 @@ private boolean deletePage(PageInfo pageInfo, boolean isTemporary) {
return true;
}

private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
PageReadTargetBuffer target, CacheContext cacheContext) {
try {
int ret = pageInfo.getLocalCacheDir().getPageStore()
.get(pageInfo.getPageId(), pageOffset, bytesToRead, target,
cacheContext.isTemporary());
if (ret != bytesToRead) {
// data read from page store is inconsistent from the metastore
LOG.error("Failed to read page {}: supposed to read {} bytes, {} bytes actually read",
pageInfo.getPageId(), bytesToRead, ret);
return -1;
}
} catch (IOException | PageNotFoundException e) {
LOG.debug("Failed to get existing page {} from pageStore", pageInfo.getPageId(), e);
return -1;
}
return bytesToRead;
}

private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
ReadTargetBuffer target, CacheContext cacheContext) {
int originOffset = target.offset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package alluxio.client.file.cache;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;
import alluxio.metrics.MetricKey;
Expand Down Expand Up @@ -143,6 +144,20 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
CacheContext cacheContext) {
try {
return mCacheManager
.get(pageId, pageOffset, bytesToRead, buffer, cacheContext);
} catch (Exception e) {
LOG.error("Failed to get page {}, offset {} cacheContext {}", pageId, pageOffset,
cacheContext, e);
Metrics.GET_ERRORS.inc();
return -1;
}
}

@Override
public int getAndLoad(PageId pageId, int pageOffset, int bytesToRead,
ReadTargetBuffer buffer, CacheContext cacheContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import alluxio.Constants;
import alluxio.client.file.cache.store.LocalPageStore;
import alluxio.client.file.cache.store.MemoryPageStore;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
Expand Down Expand Up @@ -118,6 +119,54 @@ void put(PageId pageId,
ByteBuffer page,
boolean isTemporary) throws ResourceExhaustedException, IOException;

/**
* Gets a page from the store to the destination buffer.
*
* @param pageId page identifier
* @param buffer destination buffer
* @return the number of bytes read
* @throws IOException when the store fails to read this page
* @throws PageNotFoundException when the page isn't found in the store
*/
default int get(PageId pageId, PageReadTargetBuffer buffer)
throws IOException, PageNotFoundException {
return get(pageId, 0, (int) buffer.remaining(), buffer, false);
}

/**
* Gets part of a page from the store to the destination buffer.
*
* @param pageId page identifier
* @param pageOffset offset within page
* @param bytesToRead bytes to read in this page
* @param buffer destination buffer
* @return the number of bytes read
* @throws IOException when the store fails to read this page
* @throws PageNotFoundException when the page isn't found in the store
* @throws IllegalArgumentException when the page offset exceeds the page size
*/
default int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer)
throws IOException, PageNotFoundException {
return get(pageId, pageOffset, bytesToRead, buffer, false);
}

/**
* Gets part of a page from the store to the destination buffer.
*
* @param pageId page identifier
* @param pageOffset offset within page
* @param bytesToRead bytes to read in this page
* @param buffer destination buffer
* @param isTemporary is page data temporary
* @return the number of bytes read
* @throws IOException when the store fails to read this page
* @throws PageNotFoundException when the page isn't found in the store
* @throws IllegalArgumentException when the page offset exceeds the page size
*/
int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
boolean isTemporary)
throws IOException, PageNotFoundException;

/**
* Gets a page from the store to the destination buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package alluxio.client.file.cache;

import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.client.file.cache.store.PageStoreOptions;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
Expand Down Expand Up @@ -88,6 +89,30 @@ public void put(PageId pageId,
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Callable<Integer> callable = () ->
mPageStore.get(pageId, pageOffset, bytesToRead, target, isTemporary);
try {
return mTimeLimter.callWithTimeout(callable, mTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Task got cancelled by others, interrupt the current thread
// and then throw a runtime ex to make the higher level stop.
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (TimeoutException e) {
Metrics.STORE_GET_TIMEOUT.inc();
throw new IOException(e);
} catch (RejectedExecutionException e) {
Metrics.STORE_THREADS_REJECTED.inc();
throw new IOException(e);
} catch (Throwable t) {
Throwables.propagateIfPossible(t, IOException.class, PageNotFoundException.class);
throw new IOException(t);
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ public void put(PageId pageId,
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative");
Path pagePath = getPagePath(pageId, isTemporary);
if (!Files.exists(pagePath)) {
throw new PageNotFoundException(pagePath.toString());
}
long pageLength = pagePath.toFile().length();
Preconditions.checkArgument(pageOffset <= pageLength, "page offset %s exceeded page size %s",
pageOffset, pageLength);
try (RandomAccessFile localFile = new RandomAccessFile(pagePath.toString(), "r")) {
int bytesSkipped = localFile.skipBytes(pageOffset);
if (pageOffset != bytesSkipped) {
throw new IOException(
String.format("Failed to read page %s (%s) from offset %s: %s bytes skipped",
pageId, pagePath, pageOffset, bytesSkipped));
}
int bytesRead = 0;
int bytesLeft = (int) Math.min(pageLength - pageOffset, target.remaining());
bytesLeft = Math.min(bytesLeft, bytesToRead);
while (bytesLeft > 0) {
int bytes = target.readFromFile(localFile, bytesLeft);
if (bytes <= 0) {
break;
}
bytesRead += bytes;
bytesLeft -= bytes;
}
return bytesRead;
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,24 @@ public void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws IOEx
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Preconditions.checkArgument(target != null, "buffer is null");
Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative");
PageId pageKey = getKeyFromPageId(pageId);
if (!mPageStoreMap.containsKey(pageKey)) {
throw new PageNotFoundException(pageId.getFileId() + "_" + pageId.getPageIndex());
}
MemPage page = mPageStoreMap.get(pageKey);
Preconditions.checkArgument(pageOffset <= page.getPageLength(),
"page offset %s exceeded page size %s", pageOffset, page.getPageLength());
int bytesLeft = (int) Math.min(page.getPageLength() - pageOffset, target.remaining());
bytesLeft = Math.min(bytesLeft, bytesToRead);
target.writeBytes(page.getPage(), pageOffset, bytesLeft);
return bytesLeft;
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ public void put(PageId pageId, ByteBuffer page, boolean isTemporary) throws IOEx
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Preconditions.checkArgument(pageOffset >= 0, "page offset should be non-negative");
try {
byte[] key = getKeyFromPageId(pageId, false).array();
byte[] page = mDb.get(mPageColumnHandle, key);
if (page == null) {
throw new PageNotFoundException(new String(key));
}
Preconditions.checkArgument(pageOffset <= page.length,
"page offset %s exceeded page size %s", pageOffset, page.length);
int bytesLeft =
Math.min(page.length - pageOffset, Math.min((int) target.remaining(), bytesToRead));
System.arraycopy(page, pageOffset, target.byteArray(), (int) target.offset(), bytesLeft);
return bytesLeft;
} catch (RocksDBException e) {
throw new IOException("Failed to retrieve page", e);
}
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import alluxio.Constants;
import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.store.PageReadTargetBuffer;
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
Expand Down Expand Up @@ -230,6 +231,20 @@ public boolean put(PageId pageId, ByteBuffer page, CacheContext cacheContext) {
return true;
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, PageReadTargetBuffer buffer,
CacheContext cacheContext) {
if (!mCache.containsKey(pageId)) {
return 0;
}
byte[] page = mCache.get(pageId);
if (bytesToRead >= 0) {
System.arraycopy(page, pageOffset + 0, buffer.byteArray(), (int) buffer.offset(),
bytesToRead);
}
return bytesToRead;
}

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer buffer,
CacheContext cacheContext) {
Expand Down
Loading

0 comments on commit 8bd15c4

Please sign in to comment.