Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOLR-16505: Switch UpdateShardHandler.getRecoveryOnlyHttpClient to Jetty HTTP2 #2276

Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c2e578e
SOLR-16505: Switch UpdateShardHandler.getRecoveryOnlyHttpClient to Je…
iamsanjay Feb 18, 2024
d5bc8c4
Merge main
iamsanjay Feb 20, 2024
3572dec
SOLR-16505: Switch UpdateShardHandler.getRecoveryOnlyHttpClient to Je…
iamsanjay Feb 20, 2024
9588fdf
Merge main
iamsanjay Feb 27, 2024
0655f11
Using FutureTask to send PREPRECOVERY, without executor
iamsanjay Feb 27, 2024
e7c346f
Merge main
iamsanjay Feb 29, 2024
0611782
Null check for FutureTask, removed try-catch
iamsanjay Feb 29, 2024
91f3c9d
code format, added test case
iamsanjay Mar 1, 2024
6a0b54b
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay Mar 3, 2024
1c6798b
Remove comment, create method for cancel recovery
iamsanjay Mar 3, 2024
5a62766
Merge main
iamsanjay Mar 7, 2024
ae368b5
Update IndexFetcher Class to Use Http2SolrClient
iamsanjay Mar 7, 2024
9e9b5f7
Adding header for compression to SolrRequests
iamsanjay Mar 7, 2024
8deebcd
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay Mar 12, 2024
625a364
Enable testing resplication handler for externalCompression
iamsanjay Mar 12, 2024
901ef51
Renaming method to more appropriate name
iamsanjay Mar 12, 2024
d574b42
Merge main
iamsanjay Mar 13, 2024
cc4011b
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay Mar 13, 2024
26a4c0e
Resolve conflicts Http2SolrClient
iamsanjay Mar 13, 2024
de5a40c
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay Mar 18, 2024
43dda16
Restoring the old auth of IndexFetcher
iamsanjay Mar 18, 2024
b48c0b9
Merge main
iamsanjay Mar 27, 2024
851109f
Fix retry fetch() IndexFetcher
iamsanjay Mar 27, 2024
6af3d76
Merge main
iamsanjay Mar 30, 2024
73c5ba8
Avoid closing InputStream before receiving zero-length Data field
iamsanjay Mar 30, 2024
4c16404
Read till end-of-file
iamsanjay Mar 30, 2024
19ec489
read till end-of-file
iamsanjay Mar 30, 2024
917509f
Merge main
iamsanjay Apr 18, 2024
c54cd5b
Added Test case for User managed replication with basic auth enabled
iamsanjay Apr 18, 2024
bb1c3b3
Removed isContentDownloaded and updated listener factory setting mech…
iamsanjay Apr 22, 2024
38f532a
Merge main
iamsanjay Apr 22, 2024
377eaa3
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay Apr 30, 2024
1b9b7fc
Change return code when downloaded successfully
iamsanjay Apr 30, 2024
fdb1d1f
Merge branch 'main' into SOLR-16367_getRecoveryOnlyHttpClient_to_Jett…
iamsanjay May 8, 2024
ef92b6b
tidy code
iamsanjay May 8, 2024
6279656
Update basic-authentication-plugin.adoc (#2446)
gspgsp May 8, 2024
b789a71
group operators together (#2450)
epugh May 8, 2024
f39e8ba
SOLR-17192: Add "field-limiting" URP to catch ill-designed schemas (#…
gerlowskija May 8, 2024
6bde352
CHANGES.txt
dsmiley May 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions solr/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ Other Changes

* SOLR-17222: QueryResponseWriterUtil.NonFlushingStream should support bulk write methods (Michael Gibney)

* SOLR-16505: Switch internal replica recovery commands to Jetty HTTP2 (Sanjay Dutt, David Smiley)

================== 9.5.0 ==================
New Features
---------------------
Expand Down
45 changes: 17 additions & 28 deletions solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.HttpUriRequestResponse;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
Expand Down Expand Up @@ -124,7 +124,7 @@ public static interface RecoveryListener {
private int retries;
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
private volatile FutureTask<NamedList<Object>> prevSendPreRecoveryHttpUriRequest;
private final Replica.Type replicaType;

private CoreDescriptor coreDescriptor;
Expand Down Expand Up @@ -175,25 +175,18 @@ public final void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
}

/** Builds a new HttpSolrClient for use in recovery. Caller must close */
private HttpSolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
// workaround for SOLR-13605: get the configured timeouts & set them directly
// (even though getRecoveryOnlyHttpClient() already has them set)
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
private Http2SolrClient.Builder recoverySolrClientBuilder(String baseUrl, String leaderCoreName) {
iamsanjay marked this conversation as resolved.
Show resolved Hide resolved
final UpdateShardHandlerConfig cfg = cc.getConfig().getUpdateShardHandlerConfig();
return (new HttpSolrClient.Builder(baseUrl)
return new Http2SolrClient.Builder(baseUrl)
.withDefaultCollection(leaderCoreName)
.withConnectionTimeout(cfg.getDistributedConnectionTimeout(), TimeUnit.MILLISECONDS)
.withSocketTimeout(cfg.getDistributedSocketTimeout(), TimeUnit.MILLISECONDS)
.withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient()));
.withHttpClient(cc.getUpdateShardHandler().getRecoveryOnlyHttpClient());
dsmiley marked this conversation as resolved.
Show resolved Hide resolved
}

// make sure any threads stop retrying
@Override
public final void close() {
close = true;
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.abort();
}
cancelPrepRecoveryCmd();
log.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
}

Expand Down Expand Up @@ -634,11 +627,7 @@ public final void doSyncOrReplicateRecovery(SolrCore core) throws Exception {
.getCollection(cloudDesc.getCollectionName())
.getSlice(cloudDesc.getShardId());

try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
cancelPrepRecoveryCmd();

if (isClosed()) {
log.info("RecoveryStrategy has been closed");
Expand Down Expand Up @@ -894,7 +883,6 @@ public final boolean isClosed() {

private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {

WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(zkController.getNodeName());
Expand All @@ -915,18 +903,19 @@ private final void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreNa
int readTimeout =
conflictWaitMs
+ Integer.parseInt(System.getProperty("prepRecoveryReadTimeoutExtraWait", "8000"));
iamsanjay marked this conversation as resolved.
Show resolved Hide resolved
try (HttpSolrClient client =
try (SolrClient client =
recoverySolrClientBuilder(
leaderBaseUrl,
null) // leader core omitted since client only used for 'admin' request
.withSocketTimeout(readTimeout, TimeUnit.MILLISECONDS)
.withIdleTimeout(readTimeout, TimeUnit.MILLISECONDS)
.build()) {
HttpUriRequestResponse mrr = client.httpUriRequest(prepCmd);
prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;

prevSendPreRecoveryHttpUriRequest = new FutureTask<>(() -> client.request(prepCmd));
log.info("Sending prep recovery command to [{}]; [{}]", leaderBaseUrl, prepCmd);

mrr.future.get();
prevSendPreRecoveryHttpUriRequest.run();
}
}

private void cancelPrepRecoveryCmd() {
Optional.ofNullable(prevSendPreRecoveryHttpUriRequest).ifPresent(req -> req.cancel(true));
}
}
118 changes: 50 additions & 68 deletions solr/core/src/java/org/apache/solr/handler/IndexFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
import org.apache.http.client.HttpClient;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
Expand All @@ -97,9 +96,9 @@
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
Expand Down Expand Up @@ -128,12 +127,12 @@
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.security.AllowListUrlChecker;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.UpdateShardHandler;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.IndexOutputOutputStream;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.apache.solr.util.stats.InstrumentedHttpRequestExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -186,7 +185,7 @@ public class IndexFetcher {

boolean fetchFromLeader = false;

private final HttpClient myHttpClient;
private final SolrClient solrClient;

private Integer connTimeout;

Expand Down Expand Up @@ -261,22 +260,22 @@ public String getMessage() {
}
}

private static HttpClient createHttpClient(
SolrCore core,
String httpBasicAuthUser,
String httpBasicAuthPassword,
boolean useCompression) {
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
// no metrics, just tracing
InstrumentedHttpRequestExecutor executor = new InstrumentedHttpRequestExecutor(null);
return HttpClientUtil.createClient(
httpClientParams,
core.getCoreContainer().getUpdateShardHandler().getRecoveryOnlyConnectionManager(),
true,
executor);
// It's crucial not to remove the authentication credentials as they are essential for User
// managed replication.
// GitHub PR #2276
private SolrClient createSolrClient(
SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, String leaderBaseUrl) {
final UpdateShardHandler updateShardHandler = core.getCoreContainer().getUpdateShardHandler();
Http2SolrClient httpClient =
new Http2SolrClient.Builder(leaderBaseUrl)
.withHttpClient(updateShardHandler.getRecoveryOnlyHttpClient())
.withListenerFactory(
updateShardHandler.getRecoveryOnlyHttpClient().getListenerFactory())
.withBasicAuthCredentials(httpBasicAuthUser, httpBasicAuthPassword)
.withIdleTimeout(soTimeout, TimeUnit.MILLISECONDS)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.build();
return httpClient;
}

public IndexFetcher(
Expand Down Expand Up @@ -318,12 +317,10 @@ public IndexFetcher(
if (soTimeout == -1) {
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
}

String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient =
createHttpClient(
solrCore, httpBasicAuthUser, httpBasicAuthPassword, useExternalCompression);
solrClient =
createSolrClient(solrCore, httpBasicAuthUser, httpBasicAuthPassword, leaderBaseUrl);
}

private void setLeaderCoreUrl(String leaderCoreUrl) {
Expand Down Expand Up @@ -380,16 +377,10 @@ public NamedList<Object> getLatestVersion() throws IOException {
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);

req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
try (SolrClient client =
new Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {

return client.request(req, leaderCoreName);
try {
return solrClient.request(req, leaderCoreName);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
}
Expand All @@ -407,15 +398,10 @@ private void fetchFileList(long gen) throws IOException {
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);

req.setBasePath(leaderBaseUrl);
// TODO modify to use shardhandler
try (SolrClient client =
new HttpSolrClient.Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
NamedList<?> response = client.request(req, leaderCoreName);
try {
NamedList<?> response = solrClient.request(req, leaderCoreName);

List<Map<String, Object>> files = (List<Map<String, Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null) filesToDownload = Collections.synchronizedList(files);
Expand Down Expand Up @@ -1804,10 +1790,10 @@ public void fetchFile() throws Exception {
private void fetch() throws Exception {
try {
while (true) {
int result;
try (FastInputStream is = getStream()) {
try (FastInputStream fis = getStream()) {
int result;
// fetch packets one by one in a single request
result = fetchPackets(is);
result = fetchPackets(fis);
if (result == 0 || result == NO_CONTENT) {
return;
}
Expand All @@ -1833,18 +1819,25 @@ private int fetchPackets(FastInputStream fis) throws Exception {
byte[] longbytes = new byte[8];
try {
while (true) {
if (fis.peek() == -1) {
if (bytesDownloaded == 0) {
log.warn("No content received for file: {}", fileName);
return NO_CONTENT;
}
return 0;
}
if (stop) {
stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;

fis.readFully(intbytes);
// read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
log.warn("No content received for file: {}", fileName);
return NO_CONTENT;
Comment on lines -1847 to -1848
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least an assert (or comment) communicating this is unexpected

continue;
}
// TODO consider recoding the remaining logic to not use/need buf[]; instead use the
// internal buffer of fis
Expand Down Expand Up @@ -1878,7 +1871,6 @@ private int fetchPackets(FastInputStream fis) throws Exception {
log.debug("Fetched and wrote {} bytes of file: {}", bytesDownloaded, fileName);
// errorCount is always set to zero after a successful packet
errorCount = 0;
if (bytesDownloaded >= size) return 0;
iamsanjay marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a git blame and found: SOLR-14299 IndexFetcher doesn't reset count to 0 after the last packet is received

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That issue merely moved this code slightly; it didn't introduce it. Anyway, I suppose this code potentially prematurely exited before reading the whole stream, and that's why you removed it?

}
} catch (ReplicationHandlerException e) {
throw e;
Expand Down Expand Up @@ -1967,7 +1959,7 @@ private void cleanup() {
private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();

// //the method is command=filecontent
// the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, ReplicationHandler.PATH);
Expand All @@ -1990,17 +1982,13 @@ private FastInputStream getStream() throws IOException {

NamedList<?> response;
InputStream is = null;

// TODO use shardhandler
try (SolrClient client =
new Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withResponseParser(null)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
try {
QueryRequest req = new QueryRequest(params);
response = client.request(req, leaderCoreName);
req.setResponseParser(new InputStreamResponseParser(FILE_STREAM));
req.setBasePath(leaderBaseUrl);
if (useExternalCompression) req.addHeader("Accept-Encoding", "gzip");
response = solrClient.request(req, leaderCoreName);
is = (InputStream) response.get("stream");
if (useInternalCompression) {
is = new InflaterInputStream(is);
Expand Down Expand Up @@ -2124,21 +2112,15 @@ NamedList<Object> getDetails() throws IOException, SolrServerException {
params.set("follower", false);
params.set(CommonParams.QT, ReplicationHandler.PATH);

QueryRequest request = new QueryRequest(params);
request.setBasePath(leaderBaseUrl);
// TODO use shardhandler
try (SolrClient client =
new HttpSolrClient.Builder(leaderBaseUrl)
.withHttpClient(myHttpClient)
.withConnectionTimeout(connTimeout, TimeUnit.MILLISECONDS)
.withSocketTimeout(soTimeout, TimeUnit.MILLISECONDS)
.build()) {
QueryRequest request = new QueryRequest(params);
return client.request(request, leaderCoreName);
}
return solrClient.request(request, leaderCoreName);
}

public void destroy() {
abortFetch();
HttpClientUtil.close(myHttpClient);
IOUtils.closeQuietly(solrClient);
}

String getLeaderCoreUrl() {
Expand Down
Loading
Loading