Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for repository interactions to RemoteIndexBuildStrategy #2566

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Features
* [Remote Vector Index Build] Introduce Remote Native Index Build feature flag, settings, and initial skeleton [#2525](https://github.com/opensearch-project/k-NN/pull/2525)
* [Remote Vector Index Build] Implement vector data upload and vector data size threshold setting [#2550](https://github.com/opensearch-project/k-NN/pull/2550)
* [Remote Vector Index Build] Add metrics for repository interactions to RemoteIndexBuildStrategy [#2566](https://github.com/opensearch-project/k-NN/pull/2566)
### Enhancements
* Introduce node level circuit breakers for k-NN [#2509](https://github.com/opensearch-project/k-NN/pull/2509)
### Bug Fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategy;
import org.opensearch.knn.index.codec.nativeindex.model.BuildIndexParams;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryMissingException;
Expand All @@ -25,6 +26,19 @@
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_VECTOR_REPO_SETTING;
import static org.opensearch.knn.index.codec.util.KNNCodecUtil.initializeVectorValues;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_FAILURE_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.READ_TIME;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WAITING_TIME;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT;
import static org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue.WRITE_TIME;

/**
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
Expand Down Expand Up @@ -110,9 +124,17 @@ public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings, long
public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
StopWatch stopWatch;
long time_in_millis;
final VectorRepositoryAccessor vectorRepositoryAccessor;

StopWatch remoteBuildTimeStopwatch = new StopWatch();
KNNVectorValues<?> knnVectorValues = indexInfo.getKnnVectorValuesSupplier().get();
initializeVectorValues(knnVectorValues);
startRemoteIndexBuildStats((long) indexInfo.getTotalLiveDocs() * knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch);

// 1. Write required data to repository
stopWatch = new StopWatch().start();
try {
VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getRepository(), indexSettings);
stopWatch = new StopWatch().start();
vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor(getRepository(), indexSettings);
// We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
// shard id in this context.
String blobName = UUIDs.base64UUID() + "_" + indexInfo.getFieldName() + "_" + indexInfo.getSegmentWriteState().segmentInfo.name;
Expand All @@ -123,27 +145,62 @@ public void buildAndWriteIndex(BuildIndexParams indexInfo) throws IOException {
indexInfo.getKnnVectorValuesSupplier()
);
time_in_millis = stopWatch.stop().totalTime().millis();
WRITE_SUCCESS_COUNT.increment();
WRITE_TIME.incrementBy(time_in_millis);
log.debug("Repository write took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
time_in_millis = stopWatch.stop().totalTime().millis();
WRITE_FAILURE_COUNT.increment();
log.error("Repository write failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e);
handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch);
return;
}

stopWatch = new StopWatch().start();
// 2. Triggers index build
stopWatch = new StopWatch().start();
try {
submitVectorBuild();
time_in_millis = stopWatch.stop().totalTime().millis();
BUILD_REQUEST_SUCCESS_COUNT.increment();
log.debug("Submit vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
BUILD_REQUEST_FAILURE_COUNT.increment();
log.error("Submit vector failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e);
handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch);
return;
}

stopWatch = new StopWatch().start();
// 3. Awaits on vector build to complete
stopWatch = new StopWatch().start();
try {
awaitVectorBuild();
time_in_millis = stopWatch.stop().totalTime().millis();
WAITING_TIME.incrementBy(time_in_millis);
log.debug("Await vector build took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
log.debug("Await vector build failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch);
return;
}

stopWatch = new StopWatch().start();
// 4. Downloads index file and writes to indexOutput
stopWatch = new StopWatch().start();
try {
assert vectorRepositoryAccessor != null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would this be null? Is this just defensive?

vectorRepositoryAccessor.readFromRepository();
time_in_millis = stopWatch.stop().totalTime().millis();
READ_SUCCESS_COUNT.increment();
READ_TIME.incrementBy(time_in_millis);
log.debug("Repository read took {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName());
} catch (Exception e) {
// TODO: This needs more robust failure handling
log.warn("Failed to build index remotely", e);
fallbackStrategy.buildAndWriteIndex(indexInfo);
time_in_millis = stopWatch.stop().totalTime().millis();
READ_FAILURE_COUNT.increment();
log.error("Repository read failed after {} ms for vector field [{}]", time_in_millis, indexInfo.getFieldName(), e);
handleFailure(indexInfo, knnVectorValues.bytesPerVector(), remoteBuildTimeStopwatch);
return;
}

endRemoteIndexBuildStats((long) indexInfo.getTotalLiveDocs() * knnVectorValues.bytesPerVector(), stopWatch);
}

/**
Expand Down Expand Up @@ -178,4 +235,28 @@ private void submitVectorBuild() {
private void awaitVectorBuild() {
throw new NotImplementedException();
}

private void startRemoteIndexBuildStats(long size, StopWatch stopWatch) {
stopWatch.start();
REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.increment();
REMOTE_INDEX_BUILD_CURRENT_SIZE.incrementBy(size);
}

private void endRemoteIndexBuildStats(long size, StopWatch stopWatch) {
long time_in_millis = stopWatch.stop().totalTime().millis();
REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.decrement();
REMOTE_INDEX_BUILD_CURRENT_SIZE.decrementBy(size);
REMOTE_INDEX_BUILD_TIME.incrementBy(time_in_millis);
}

/**
* Helper method to collect remote index build metrics on failure and invoke fallback strategy
* @param indexParams
* @param bytesPerVector
* @throws IOException
*/
private void handleFailure(BuildIndexParams indexParams, long bytesPerVector, StopWatch stopWatch) throws IOException {
endRemoteIndexBuildStats(indexParams.getTotalLiveDocs() * bytesPerVector, stopWatch);
fallbackStrategy.buildAndWriteIndex(indexParams);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.knn.plugin.stats;

import lombok.Getter;

import java.util.concurrent.atomic.LongAdder;

public enum KNNRemoteIndexBuildValue {

// Repository Accumulating Stats
WRITE_SUCCESS_COUNT("write_success_count"),
WRITE_FAILURE_COUNT("write_failure_count"),
WRITE_TIME("successful_write_time_in_millis"),
READ_SUCCESS_COUNT("read_success_count"),
READ_FAILURE_COUNT("read_failure_count"),
READ_TIME("successful_read_time_in_millis"),

// Remote Index Build Stats
REMOTE_INDEX_BUILD_CURRENT_OPERATIONS("remote_index_build_current_operations"),
REMOTE_INDEX_BUILD_CURRENT_SIZE("remote_index_build_current_size"),
REMOTE_INDEX_BUILD_TIME("remote_index_build_time_in_millis"),

// Client Stats
BUILD_REQUEST_SUCCESS_COUNT("build_request_success_count"),
BUILD_REQUEST_FAILURE_COUNT("build_request_failure_count"),
STATUS_REQUEST_SUCCESS_COUNT("status_request_success_count"),
STATUS_REQUEST_FAILURE_COUNT("status_request_failure_count"),
INDEX_BUILD_SUCCESS_COUNT("index_build_success_count"),
INDEX_BUILD_FAILURE_COUNT("index_build_failure_count"),
WAITING_TIME("waiting_time_in_ms");

@Getter
private final String name;
private final LongAdder value;

/**
* Constructor
*
* @param name name of the graph value
*/
KNNRemoteIndexBuildValue(String name) {
this.name = name;
this.value = new LongAdder();
}

/**
* Get the graph value
*
* @return value
*/
public Long getValue() {
return value.longValue();
}

/**
* Increment the graph value
*/
public void increment() {
value.increment();
}

/**
* Decrement the graph value
*/
public void decrement() {
value.decrement();
}

/**
* Increment the graph value by a specified amount
*
* @param delta The amount to increment
*/
public void incrementBy(long delta) {
value.add(delta);
}

/**
* Decrement the graph value by a specified amount
*
* @param delta The amount to decrement
*/
public void decrementBy(long delta) {
value.add(delta * -1);
}
}
75 changes: 73 additions & 2 deletions src/main/java/org/opensearch/knn/plugin/stats/KNNStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableMap;
import org.opensearch.knn.common.KNNConstants;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.common.featureflags.KNNFeatureFlags;
import org.opensearch.knn.index.engine.KNNEngine;
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
import org.opensearch.knn.indices.ModelCache;
import org.opensearch.knn.indices.ModelDao;
import org.opensearch.knn.plugin.stats.suppliers.EventOccurredWithinThresholdSupplier;
Expand All @@ -24,6 +25,7 @@
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -71,8 +73,15 @@ private Map<String, KNNStat<?>> getClusterOrNodeStats(Boolean getClusterStats) {
Map<String, KNNStat<?>> statsMap = new HashMap<>();

for (Map.Entry<String, KNNStat<?>> entry : knnStats.entrySet()) {
// knnStats is initialized at node bootup, so we need to do feature flag enforcement when retrieving the stats instead
if (entry.getValue().isClusterLevel() == getClusterStats) {
statsMap.put(entry.getKey(), entry.getValue());
if (Objects.equals(entry.getKey(), StatNames.REMOTE_VECTOR_INDEX_BUILD_STATS.getName())) {
if (KNNFeatureFlags.isKNNRemoteVectorBuildEnabled()) {
statsMap.put(entry.getKey(), entry.getValue());
}
} else {
statsMap.put(entry.getKey(), entry.getValue());
}
}
}
return statsMap;
Expand All @@ -86,6 +95,7 @@ private Map<String, KNNStat<?>> buildStatsMap() {
addScriptStats(builder);
addModelStats(builder);
addGraphStats(builder);
addRemoteIndexBuildStats(builder);
return builder.build();
}

Expand Down Expand Up @@ -218,4 +228,65 @@ private Map<String, Map<String, Object>> createGraphStatsMap() {
graphStatsMap.put(StatNames.REFRESH.getName(), refreshMap);
return graphStatsMap;
}

private void addRemoteIndexBuildStats(ImmutableMap.Builder<String, KNNStat<?>> builder) {
builder.put(StatNames.REMOTE_VECTOR_INDEX_BUILD_STATS.getName(), new KNNStat<>(false, this::createRemoteIndexStatsMap));
}

private Map<String, Map<String, Object>> createRemoteIndexStatsMap() {
Map<String, Object> clientStatsMap = new HashMap<>();
clientStatsMap.put(
KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT.getName(),
KNNRemoteIndexBuildValue.BUILD_REQUEST_SUCCESS_COUNT.getValue()
);
clientStatsMap.put(
KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT.getName(),
KNNRemoteIndexBuildValue.BUILD_REQUEST_FAILURE_COUNT.getValue()
);
clientStatsMap.put(
KNNRemoteIndexBuildValue.STATUS_REQUEST_SUCCESS_COUNT.getName(),
KNNRemoteIndexBuildValue.STATUS_REQUEST_SUCCESS_COUNT.getValue()
);
clientStatsMap.put(
KNNRemoteIndexBuildValue.STATUS_REQUEST_FAILURE_COUNT.getName(),
KNNRemoteIndexBuildValue.STATUS_REQUEST_FAILURE_COUNT.getValue()
);
clientStatsMap.put(
KNNRemoteIndexBuildValue.INDEX_BUILD_SUCCESS_COUNT.getName(),
KNNRemoteIndexBuildValue.INDEX_BUILD_SUCCESS_COUNT.getValue()
);
clientStatsMap.put(
KNNRemoteIndexBuildValue.INDEX_BUILD_FAILURE_COUNT.getName(),
KNNRemoteIndexBuildValue.INDEX_BUILD_FAILURE_COUNT.getValue()
);
clientStatsMap.put(KNNRemoteIndexBuildValue.WAITING_TIME.getName(), KNNRemoteIndexBuildValue.WAITING_TIME.getValue());

Map<String, Object> repoStatsMap = new HashMap<>();
repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT.getName(), KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT.getValue());
repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT.getName(), KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT.getValue());
repoStatsMap.put(KNNRemoteIndexBuildValue.WRITE_TIME.getName(), KNNRemoteIndexBuildValue.WRITE_TIME.getValue());
repoStatsMap.put(KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT.getName(), KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT.getValue());
repoStatsMap.put(KNNRemoteIndexBuildValue.READ_FAILURE_COUNT.getName(), KNNRemoteIndexBuildValue.READ_FAILURE_COUNT.getValue());
repoStatsMap.put(KNNRemoteIndexBuildValue.READ_TIME.getName(), KNNRemoteIndexBuildValue.READ_TIME.getValue());

Map<String, Object> buildStatsMap = new HashMap<>();
buildStatsMap.put(
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.getName(),
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_OPERATIONS.getValue()
);
buildStatsMap.put(
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE.getName(),
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_CURRENT_SIZE.getValue()
);
buildStatsMap.put(
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME.getName(),
KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME.getValue()
);

Map<String, Map<String, Object>> remoteIndexBuildStatsMap = new HashMap<>();
remoteIndexBuildStatsMap.put(StatNames.BUILD_STATS.getName(), buildStatsMap);
remoteIndexBuildStatsMap.put(StatNames.CLIENT_STATS.getName(), clientStatsMap);
remoteIndexBuildStatsMap.put(StatNames.REPOSITORY_STATS.getName(), repoStatsMap);
return remoteIndexBuildStatsMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public enum StatNames {
GRAPH_STATS("graph_stats"),
REFRESH("refresh"),
MERGE("merge"),
REMOTE_VECTOR_INDEX_BUILD_STATS("remote_vector_index_build_stats"),
CLIENT_STATS("client_stats"),
REPOSITORY_STATS("repository_stats"),
BUILD_STATS("build_stats"),
MIN_SCORE_QUERY_REQUESTS(KNNCounter.MIN_SCORE_QUERY_REQUESTS.getName()),
MIN_SCORE_QUERY_WITH_FILTER_REQUESTS(KNNCounter.MIN_SCORE_QUERY_WITH_FILTER_REQUESTS.getName()),
MAX_DISTANCE_QUERY_REQUESTS(KNNCounter.MAX_DISTANCE_QUERY_REQUESTS.getName()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexSettings;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.plugin.stats.KNNRemoteIndexBuildValue;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.RepositoryMissingException;

Expand Down Expand Up @@ -41,6 +42,13 @@ public void testRemoteIndexBuildStrategyFallback() throws IOException {
);
objectUnderTest.buildAndWriteIndex(buildIndexParams);
assertTrue(fallback.get());
assertEquals(0L, (long) KNNRemoteIndexBuildValue.WRITE_SUCCESS_COUNT.getValue());
assertEquals(1L, (long) KNNRemoteIndexBuildValue.WRITE_FAILURE_COUNT.getValue()); // Repository is first accessed during write
assertEquals(0L, (long) KNNRemoteIndexBuildValue.WRITE_TIME.getValue());
assertEquals(0L, (long) KNNRemoteIndexBuildValue.READ_SUCCESS_COUNT.getValue());
assertEquals(0L, (long) KNNRemoteIndexBuildValue.READ_FAILURE_COUNT.getValue());
assertEquals(0L, (long) KNNRemoteIndexBuildValue.READ_TIME.getValue());
assertTrue(KNNRemoteIndexBuildValue.REMOTE_INDEX_BUILD_TIME.getValue() > 0L);
}

public void testShouldBuildIndexRemotely() {
Expand Down
Loading