-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix cacheMissPercentage metric #18208
Changes from 8 commits
1f08bae
bb4a5aa
ba4b9b9
e99322e
cf2957e
2e74aa5
2c42671
2ca2b2f
11dddd6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package alluxio.worker.block.io; | ||
|
||
import alluxio.metrics.MetricKey; | ||
import alluxio.metrics.MetricsSystem; | ||
|
||
import io.netty.buffer.ByteBuf; | ||
|
||
import java.io.IOException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.ReadableByteChannel; | ||
|
||
/** | ||
* An reader class with metrics. | ||
*/ | ||
public class DeStoreBlockReader extends BlockReader { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
private final BlockReader mDeBlockReader; | ||
|
||
/** | ||
* A decorator of BlockReader. | ||
* @param deblockReader block reader | ||
*/ | ||
public DeStoreBlockReader(BlockReader deblockReader) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Requiring the reader to be a Since we are dealing with the bytes-read-cache metric here, requiring the argument to be of type |
||
mDeBlockReader = deblockReader; | ||
} | ||
|
||
@Override | ||
public ByteBuffer read(long offset, long length) throws IOException { | ||
return mDeBlockReader.read(offset, length); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also update the metric here |
||
} | ||
|
||
@Override | ||
public long getLength() { | ||
return mDeBlockReader.getLength(); | ||
} | ||
|
||
@Override | ||
public ReadableByteChannel getChannel() { | ||
return mDeBlockReader.getChannel(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the channel also needs to be wrapped to add the metric updating logic. return new ReadableByteChannel() {
private final ReadableByteChannel mDelegate = mDeBlockReader.getChannel();
@Override
public int read(ByteBuffer dst) throws IOException {
int bytesRead = mDelegate.read(dst);
// update metric here
return bytesRead;
}
} |
||
} | ||
|
||
@Override | ||
public int transferTo(ByteBuf buf) throws IOException { | ||
int bytesReadFromCache = mDeBlockReader.transferTo(buf); | ||
MetricsSystem.counter(MetricKey.WORKER_BYTES_READ_CACHE.getName()).inc(bytesReadFromCache); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return bytesReadFromCache; | ||
} | ||
|
||
@Override | ||
public boolean isClosed() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also override |
||
return mDeBlockReader.isClosed(); | ||
} | ||
|
||
@Override | ||
public String getLocation() { | ||
return mDeBlockReader.getLocation(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return mDeBlockReader.toString(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,16 +12,24 @@ | |
package alluxio.worker.page; | ||
|
||
import alluxio.conf.PropertyKey; | ||
import alluxio.exception.runtime.AlluxioRuntimeException; | ||
import alluxio.exception.status.NotFoundException; | ||
import alluxio.exception.status.UnavailableException; | ||
import alluxio.metrics.MetricInfo; | ||
import alluxio.metrics.MetricKey; | ||
import alluxio.metrics.MetricsSystem; | ||
import alluxio.network.protocol.databuffer.NioDirectBufferPool; | ||
import alluxio.resource.CloseableResource; | ||
import alluxio.underfs.UfsManager; | ||
import alluxio.underfs.UnderFileSystem; | ||
import alluxio.underfs.options.OpenOptions; | ||
import alluxio.util.IdUtils; | ||
import alluxio.worker.block.UfsInputStreamCache; | ||
import alluxio.worker.block.UnderFileSystemBlockStore.BytesReadMetricKey; | ||
import alluxio.worker.block.io.BlockReader; | ||
import alluxio.worker.block.meta.BlockMeta; | ||
|
||
import com.codahale.metrics.Counter; | ||
import com.google.common.base.Preconditions; | ||
import io.netty.buffer.ByteBuf; | ||
|
||
|
@@ -31,6 +39,8 @@ | |
import java.nio.channels.Channels; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.nio.channels.ReadableByteChannel; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ConcurrentMap; | ||
|
||
/** | ||
* Block reader that reads from UFS. | ||
|
@@ -47,6 +57,9 @@ public class PagedUfsBlockReader extends BlockReader { | |
private long mLastPageIndex = -1; | ||
private boolean mClosed = false; | ||
private long mPosition; | ||
private final ConcurrentMap<BytesReadMetricKey, Counter> mUfsBytesReadMetrics = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. elements are only inserted into this map, but never queried There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This map is used by mUfsBytesReadMetrics.computeIfAbsent(). |
||
new ConcurrentHashMap<>(); | ||
private final Counter mUfsBytesRead; | ||
|
||
/** | ||
* @param ufsManager | ||
|
@@ -70,6 +83,23 @@ public PagedUfsBlockReader(UfsManager ufsManager, | |
mInitialOffset = offset; | ||
mLastPage = ByteBuffer.allocateDirect((int) mPageSize); | ||
mPosition = offset; | ||
try { | ||
UfsManager.UfsClient ufsClient = mUfsManager.get(mUfsBlockOptions.getMountId()); | ||
mUfsBytesRead = mUfsBytesReadMetrics.computeIfAbsent( | ||
new BytesReadMetricKey(ufsClient.getUfsMountPointUri(), mUfsBlockOptions.getUser()), | ||
key -> key.mUser == null | ||
? MetricsSystem.counterWithTags( | ||
MetricKey.WORKER_BYTES_READ_UFS.getName(), | ||
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(), | ||
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri)) | ||
: MetricsSystem.counterWithTags( | ||
MetricKey.WORKER_BYTES_READ_UFS.getName(), | ||
MetricKey.WORKER_BYTES_READ_UFS.isClusterAggregated(), | ||
MetricInfo.TAG_UFS, MetricsSystem.escape(key.mUri), | ||
MetricInfo.TAG_USER, key.mUser)); | ||
} catch (UnavailableException | NotFoundException e) { | ||
throw AlluxioRuntimeException.from(e); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -145,6 +175,7 @@ public int readPageAtIndex(ByteBuffer buffer, long pageIndex) throws IOException | |
mLastPage.flip(); | ||
mLastPageIndex = pageIndex; | ||
fillWithCachedPage(buffer, pageIndex * mPageSize, totalBytesRead); | ||
mUfsBytesRead.inc(totalBytesRead); | ||
return totalBytesRead; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the metrics are updated in page store only. If the user uses the block store, then these
bytesReadCache + bytesReadUfs
will be zero. Can you also update the metrics in the block store?