Skip to content

Commit

Permalink
Clarity revisions
Browse files Browse the repository at this point in the history
Signed-off-by: owenhalpert <[email protected]>
  • Loading branch information
owenhalpert committed Mar 6, 2025
1 parent 2a80d32 commit fbb2d1b
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 63 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,14 @@ public static String getRemoteBuildServiceEndpoint() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_SERVICE_ENDPOINT);
}

public static TimeValue getRemoteBuildClientTimeout() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT);
}

public static TimeValue getRemoteBuildClientPollInterval() {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL);
}

public static boolean isFaissAVX2Disabled() {
try {
return KNNSettings.state().getSettingValue(KNNSettings.KNN_FAISS_AVX2_DISABLED);
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/opensearch/knn/index/engine/faiss/Faiss.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.knn.index.engine.faiss;

import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.FieldInfo;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -138,15 +139,26 @@ public boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws I
return false;
}

/**
* Get method name from a {@link FieldInfo} formatted attributes map
* Example:
* {
* "index_description": "HNSW12,Flat",
* "spaceType": "l2",
* "name": "hnsw",
* ...
* }
*/
private String getMethodName(String parametersJson) throws IOException {
XContentParser parser = XContentType.JSON.xContent()
.createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, parametersJson.getBytes());

while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken();
if (NAME.equals(fieldName)) {
// Matched field name (key), next line will move to the value
parser.nextToken();
return parser.text();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.knn.index.engine.faiss;

import com.google.common.collect.ImmutableSet;
import org.apache.lucene.index.FieldInfo;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -197,7 +198,7 @@ private static int getMFromIndexDescription(String indexDescription) {

/**
* Return whether this engine/method supports remote build.
* @param attributes
* @param attributes Map of {@link FieldInfo} formatted attributes
* @return true if remote build is supported, false otherwise
* @throws IOException
*/
Expand All @@ -208,11 +209,30 @@ static boolean supportsRemoteIndexBuild(Map<String, String> attributes) throws I
}

/**
* Gets encoder name from a {@FieldInfo parameters} map
* Gets encoder name from a {@FieldInfo parameters} map.
* Needs to use a JSON parser since FieldInfo.attributes() is a Map of String, String.
*
* @param parametersJson
* @return encoder name
* @throws IOException
* Example:
* {
* "index_description": "HNSW12,Flat",
* "spaceType": "l2",
* "name": "hnsw",
* "data_type": "float",
* --------------------
* "parameters": {
* "ef_search": 24,
* "ef_construction": 28,
* "encoder": {
* "name": "flat",
* "parameters": {}
* }
* }
* --------------------
* }
*
* @param parametersJson json string of parameters (inner parameter map above)
* @return encoder name or null if not found
* @throws IOException if the json string is not valid
*/
private static String getEncoderName(String parametersJson) throws IOException {
XContentParser parser = XContentType.JSON.xContent()
Expand All @@ -221,22 +241,27 @@ private static String getEncoderName(String parametersJson) throws IOException {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken();

if (PARAMETERS.equals(fieldName) && parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String paramName = parser.currentName();
parser.nextToken();

if (METHOD_ENCODER_PARAMETER.equals(paramName) && parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String encoderField = parser.currentName();
parser.nextToken();

if (NAME.equals(encoderField)) {
return parser.text();

if (PARAMETERS.equals(fieldName)) {
parser.nextToken();
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String paramName = parser.currentName();

if (METHOD_ENCODER_PARAMETER.equals(paramName)) {
parser.nextToken();
if (parser.currentToken() == XContentParser.Token.START_OBJECT) {
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
if (parser.currentToken() == XContentParser.Token.FIELD_NAME) {
String encoderField = parser.currentName();

if (NAME.equals(encoderField)) {
// .nextToken to move from the key `name` to the value.
parser.nextToken();
return parser.text();
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.knn.index.remote;

import org.apache.commons.lang.StringUtils;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.knn.index.KNNSettings;

import java.io.IOException;
Expand Down Expand Up @@ -42,18 +41,17 @@ class RemoteIndexPoller implements RemoteIndexWaiter {
@SuppressWarnings("BusyWait")
public RemoteBuildStatusResponse awaitVectorBuild(RemoteBuildStatusRequest remoteBuildStatusRequest) throws InterruptedException,
IOException {
long startTime = System.currentTimeMillis();
long timeout = ((TimeValue) KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT)).getMillis();
long pollInterval = ((TimeValue) (KNNSettings.state().getSettingValue(KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL)))
.getMillis();
long startTime = System.nanoTime();
long timeout = KNNSettings.getRemoteBuildClientTimeout().getNanos();
// Thread.sleep expects millis
long pollInterval = KNNSettings.getRemoteBuildClientPollInterval().getMillis();

// Initial delay to allow build service to process the job and store the ID before getting its status.
// TODO tune default based on benchmarking
Thread.sleep(pollInterval * INITIAL_DELAY_FACTOR);

while (System.currentTimeMillis() - startTime < timeout) {
while (System.nanoTime() - startTime < timeout) {
RemoteBuildStatusResponse remoteBuildStatusResponse = client.getBuildStatus(remoteBuildStatusRequest);
Duration d = Duration.ofMillis(System.currentTimeMillis() - startTime);
String taskStatus = remoteBuildStatusResponse.getTaskStatus();
if (StringUtils.isBlank(taskStatus)) {
throw new IOException(String.format("Invalid response format, missing %s", TASK_STATUS));
Expand All @@ -67,6 +65,7 @@ public RemoteBuildStatusResponse awaitVectorBuild(RemoteBuildStatusRequest remot
}
case FAILED_INDEX_BUILD -> {
String errorMessage = remoteBuildStatusResponse.getErrorMessage();
Duration d = Duration.ofNanos(System.nanoTime() - startTime);
throw new InterruptedException(
String.format("Remote index build failed after %d minutes. %s", d.toMinutesPart(), errorMessage)
);
Expand All @@ -75,8 +74,8 @@ public RemoteBuildStatusResponse awaitVectorBuild(RemoteBuildStatusRequest remot
default -> throw new IOException(String.format("Server returned invalid task status %s", taskStatus));
}
}
Duration waitedDuration = Duration.ofMillis(System.currentTimeMillis() - startTime);
Duration timeoutDuration = Duration.ofMillis(timeout);
Duration waitedDuration = Duration.ofNanos(System.nanoTime() - startTime);
Duration timeoutDuration = Duration.ofNanos(timeout);
throw new InterruptedException(
String.format(
"Remote index build timed out after %d minutes, timeout is set to %d minutes. Falling back to CPU build",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING;
import static org.opensearch.knn.index.KNNSettings.KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.COMPLETED_INDEX_BUILD;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.FAILED_INDEX_BUILD;
import static org.opensearch.knn.index.remote.KNNRemoteConstants.FILE_NAME;
Expand Down Expand Up @@ -63,12 +61,8 @@ public void testAwaitVectorBuildTimeout() {
try (MockedStatic<KNNSettings> knnSettingsStaticMock = Mockito.mockStatic(KNNSettings.class)) {
knnSettingsStaticMock.when(KNNSettings::state).thenReturn(knnSettingsMock);

when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(100)
);
when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(10)
);
when(KNNSettings.getRemoteBuildClientTimeout()).thenReturn(TimeValue.timeValueMillis(10));
when(KNNSettings.getRemoteBuildClientPollInterval()).thenReturn(TimeValue.timeValueMillis(10));

RemoteBuildStatusResponse runningResponse = new RemoteBuildStatusResponse(RUNNING_INDEX_BUILD, null, null);
when(mockClient.getBuildStatus(mockStatusRequest)).thenReturn(runningResponse);
Expand All @@ -85,12 +79,8 @@ public void testAwaitVectorBuildCompleted() {
try (MockedStatic<KNNSettings> knnSettingsStaticMock = Mockito.mockStatic(KNNSettings.class)) {
knnSettingsStaticMock.when(KNNSettings::state).thenReturn(knnSettingsMock);

when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING.getKey())).thenReturn(
TimeValue.timeValueSeconds(5)
);
when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(10)
);
when(KNNSettings.getRemoteBuildClientTimeout()).thenReturn(TimeValue.timeValueMillis(100));
when(KNNSettings.getRemoteBuildClientPollInterval()).thenReturn(TimeValue.timeValueMillis(10));

RemoteBuildStatusResponse completedResponse = new RemoteBuildStatusResponse(COMPLETED_INDEX_BUILD, MOCK_FILE_NAME, null);
when(mockClient.getBuildStatus(mockStatusRequest)).thenReturn(completedResponse);
Expand All @@ -110,12 +100,8 @@ public void testAwaitVectorBuildFailed() {
try (MockedStatic<KNNSettings> knnSettingsStaticMock = Mockito.mockStatic(KNNSettings.class)) {
knnSettingsStaticMock.when(KNNSettings::state).thenReturn(knnSettingsMock);

when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING.getKey())).thenReturn(
TimeValue.timeValueSeconds(5)
);
when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(10)
);
when(KNNSettings.getRemoteBuildClientTimeout()).thenReturn(TimeValue.timeValueMillis(100));
when(KNNSettings.getRemoteBuildClientPollInterval()).thenReturn(TimeValue.timeValueMillis(10));

String errorMessage = "Failed to build index due to insufficient resources";

Expand Down Expand Up @@ -143,12 +129,9 @@ public void testMissingIndexPathForCompletedStatus() {
try (MockedStatic<KNNSettings> knnSettingsStaticMock = Mockito.mockStatic(KNNSettings.class)) {
knnSettingsStaticMock.when(KNNSettings::state).thenReturn(knnSettingsMock);

when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING.getKey())).thenReturn(
TimeValue.timeValueSeconds(5)
);
when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(10)
);
when(KNNSettings.getRemoteBuildClientTimeout()).thenReturn(TimeValue.timeValueMillis(100));
when(KNNSettings.getRemoteBuildClientPollInterval()).thenReturn(TimeValue.timeValueMillis(10));

RemoteBuildStatusResponse invalidResponse = new RemoteBuildStatusResponse(COMPLETED_INDEX_BUILD, null, null);
RemoteIndexPoller poller = new RemoteIndexPoller(mockClient);

Expand All @@ -167,12 +150,9 @@ public void testMissingTaskStatus() {
try (MockedStatic<KNNSettings> knnSettingsStaticMock = Mockito.mockStatic(KNNSettings.class)) {
knnSettingsStaticMock.when(KNNSettings::state).thenReturn(knnSettingsMock);

when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_TIMEOUT_SETTING.getKey())).thenReturn(
TimeValue.timeValueSeconds(5)
);
when(knnSettingsMock.getSettingValue(KNN_REMOTE_BUILD_CLIENT_POLL_INTERVAL_SETTING.getKey())).thenReturn(
TimeValue.timeValueMillis(10)
);
when(KNNSettings.getRemoteBuildClientTimeout()).thenReturn(TimeValue.timeValueMillis(100));
when(KNNSettings.getRemoteBuildClientPollInterval()).thenReturn(TimeValue.timeValueMillis(10));

RemoteBuildStatusResponse invalidResponse = new RemoteBuildStatusResponse(null, null, null);
RemoteIndexPoller poller = new RemoteIndexPoller(mockClient);

Expand Down

0 comments on commit fbb2d1b

Please sign in to comment.