Skip to content

Commit

Permalink
Improve Flint Index Creation Reliability by Adjusting Socket Timeout (#…
Browse files Browse the repository at this point in the history
…177)

* Improve Flint Index Creation Reliability by Adjusting Socket Timeout

This PR addresses the issue where creating a Flint index could fail due to a socket timeout. We have increased the default socket timeout from 30 (as defined in the OpenSearch RESTful Java client) to 60 seconds. Additionally, we've made the socket timeout value configurable to ensure greater flexibility and adaptability to various network conditions.

Test enhancements include:
- E2E testing with simulated socket timeout to validate the robustness of index creation under constrained network scenarios.

Signed-off-by: Kaituo Li <[email protected]>

* rename class and remove unused code

Signed-off-by: Kaituo Li <[email protected]>

---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Nov 28, 2023
1 parent 0616865 commit a352f67
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.
- `spark.datasource.flint.socket_timeout_millis`: default value is 60000.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public class FlintOptions implements Serializable {
*/
public static final String DEFAULT_REFRESH_POLICY = "wait_for";

public static final String SOCKET_TIMEOUT_MILLIS = "socket_timeout_millis";

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down Expand Up @@ -123,4 +127,8 @@ public String getUsername() {
public String getPassword() {
return options.getOrDefault(PASSWORD, "flint");
}

public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ public RestHighLevelClient createClient() {
restClientBuilder.setHttpClientConfigCallback(delegate ->
RetryableHttpAsyncClient.builder(delegate, options));
}

final RequestConfigurator callback = new RequestConfigurator(options);
restClientBuilder.setRequestConfigCallback(callback);

return new RestHighLevelClient(restClientBuilder);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.apache.http.client.config.RequestConfig;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.flint.core.FlintOptions;

/**
* allows override default socket timeout in RestClientBuilder.DEFAULT_SOCKET_TIMEOUT_MILLIS
*/
public class RequestConfigurator implements RestClientBuilder.RequestConfigCallback {

private final FlintOptions options;

public RequestConfigurator(FlintOptions options) {
this.options = options;
}

@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
// Set the socket timeout in milliseconds
return requestConfigBuilder.setSocketTimeout(options.getSocketTimeoutMillis());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ object FlintSparkConf {
val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")

val SOCKET_TIMEOUT_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.SOCKET_TIMEOUT_MILLIS}")
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

}

/**
Expand Down Expand Up @@ -188,7 +195,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
REGION,
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD)
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

Expand Down

0 comments on commit a352f67

Please sign in to comment.