Skip to content

Commit

Permalink
Add option to configure the deadline of getSystemInfo (#2240)
Browse files Browse the repository at this point in the history
Add get system info timeout
  • Loading branch information
Quinn-With-Two-Ns authored Sep 25, 2024
1 parent 238c5e1 commit 52aa9e8
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public Supplier<Capabilities> getServerCapabilities() {
SystemInfoInterceptor.getServerCapabilitiesWithRetryOrThrow(
serverCapabilitiesFuture,
interceptedChannel,
deadlineFrom(options.getHealthCheckAttemptTimeout()));
deadlineFrom(options.getSystemInfoTimeout()));
}

private static Deadline deadlineFrom(Duration duration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ public class ServiceStubsOptions {
*/
protected final Duration healthCheckAttemptTimeout;

/**
* SystemInfoTimeout specifies how to long to wait for service response on each health check
* attempt. Default: 5s.
*/
protected final Duration systemInfoTimeout;

/**
* HealthCheckTimeout defines how long client should be sending health check requests to the
* server before concluding that it is unavailable. Defaults to 10s.
Expand Down Expand Up @@ -128,6 +134,7 @@ public class ServiceStubsOptions {
this.enableHttps = that.enableHttps;
this.sslContext = that.sslContext;
this.healthCheckAttemptTimeout = that.healthCheckAttemptTimeout;
this.systemInfoTimeout = that.systemInfoTimeout;
this.healthCheckTimeout = that.healthCheckTimeout;
this.enableKeepAlive = that.enableKeepAlive;
this.keepAliveTime = that.keepAliveTime;
Expand All @@ -150,6 +157,7 @@ public class ServiceStubsOptions {
SslContext sslContext,
Duration healthCheckAttemptTimeout,
Duration healthCheckTimeout,
Duration systemInfoTimeout,
boolean enableKeepAlive,
Duration keepAliveTime,
Duration keepAliveTimeout,
Expand All @@ -168,6 +176,7 @@ public class ServiceStubsOptions {
this.sslContext = sslContext;
this.healthCheckAttemptTimeout = healthCheckAttemptTimeout;
this.healthCheckTimeout = healthCheckTimeout;
this.systemInfoTimeout = systemInfoTimeout;
this.enableKeepAlive = enableKeepAlive;
this.keepAliveTime = keepAliveTime;
this.keepAliveTimeout = keepAliveTimeout;
Expand Down Expand Up @@ -233,6 +242,13 @@ public Duration getHealthCheckAttemptTimeout() {
return healthCheckAttemptTimeout;
}

/**
* @return The timeout for the RPC made by the client to fetch server capabilities.
*/
public Duration getSystemInfoTimeout() {
return systemInfoTimeout;
}

/**
* @return duration of time to wait while checking server connection when creating new client
*/
Expand Down Expand Up @@ -337,6 +353,7 @@ public boolean equals(Object o) {
&& Objects.equals(sslContext, that.sslContext)
&& Objects.equals(healthCheckAttemptTimeout, that.healthCheckAttemptTimeout)
&& Objects.equals(healthCheckTimeout, that.healthCheckTimeout)
&& Objects.equals(systemInfoTimeout, that.systemInfoTimeout)
&& Objects.equals(keepAliveTime, that.keepAliveTime)
&& Objects.equals(keepAliveTimeout, that.keepAliveTimeout)
&& Objects.equals(rpcTimeout, that.rpcTimeout)
Expand All @@ -358,6 +375,7 @@ public int hashCode() {
sslContext,
healthCheckAttemptTimeout,
healthCheckTimeout,
systemInfoTimeout,
enableKeepAlive,
keepAliveTime,
keepAliveTimeout,
Expand Down Expand Up @@ -389,6 +407,8 @@ public String toString() {
+ healthCheckAttemptTimeout
+ ", healthCheckTimeout="
+ healthCheckTimeout
+ ", systemInfoTimeout="
+ systemInfoTimeout
+ ", enableKeepAlive="
+ enableKeepAlive
+ ", keepAliveTime="
Expand Down Expand Up @@ -421,6 +441,7 @@ public static class Builder<T extends Builder<T>> {
private String target;
private Consumer<ManagedChannelBuilder<?>> channelInitializer;
private Duration healthCheckAttemptTimeout;
private Duration systemInfoTimeout;
private Duration healthCheckTimeout;
private boolean enableKeepAlive = true;
private Duration keepAliveTime = Duration.ofSeconds(30);
Expand All @@ -444,6 +465,7 @@ protected Builder(ServiceStubsOptions options) {
this.sslContext = options.sslContext;
this.healthCheckAttemptTimeout = options.healthCheckAttemptTimeout;
this.healthCheckTimeout = options.healthCheckTimeout;
this.systemInfoTimeout = options.systemInfoTimeout;
this.enableKeepAlive = options.enableKeepAlive;
this.keepAliveTime = options.keepAliveTime;
this.keepAliveTimeout = options.keepAliveTimeout;
Expand Down Expand Up @@ -713,6 +735,17 @@ public T setHealthCheckTimeout(Duration healthCheckTimeout) {
return self();
}

/**
* Set a SystemInfoTimeout that specifies how long the client tries to fetch server
* capabilities.
*
* @return {@code this}
*/
public T setSystemInfoTimeout(Duration systemInfoTimeout) {
this.systemInfoTimeout = systemInfoTimeout;
return self();
}

/**
* Enables keep alive ping from client to the server, which can help drop abruptly closed
* connections faster.
Expand Down Expand Up @@ -796,6 +829,7 @@ public ServiceStubsOptions build() {
this.sslContext,
this.healthCheckAttemptTimeout,
this.healthCheckTimeout,
this.systemInfoTimeout,
this.enableKeepAlive,
this.keepAliveTime,
this.keepAliveTimeout,
Expand Down Expand Up @@ -847,6 +881,8 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
Duration healthCheckTimeout =
this.healthCheckTimeout != null ? this.healthCheckTimeout : Duration.ofSeconds(10);

Duration systemInfoTimeout =
this.systemInfoTimeout != null ? this.systemInfoTimeout : Duration.ofSeconds(5);
return new ServiceStubsOptions(
this.channel,
target,
Expand All @@ -855,6 +891,7 @@ public ServiceStubsOptions validateAndBuildWithDefaults() {
this.sslContext,
healthCheckAttemptTimeout,
healthCheckTimeout,
systemInfoTimeout,
this.enableKeepAlive,
this.keepAliveTime,
this.keepAliveTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,14 @@ public void setUp() throws Exception {
}

@After
public void tearDown() throws Exception {
public void tearDown() {
if (channelManager != null) {
channelManager.shutdownNow();
}
}

@Test
public void testGetServerCapabilities() throws Exception {
public void testGetServerCapabilities() {
Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(CAPABILITIES, capabilities);
assertEquals(1, getSystemInfoCount.get());
Expand All @@ -150,7 +150,7 @@ public void testGetServerCapabilities() throws Exception {
}

@Test
public void testGetServerCapabilitiesRetry() throws Exception {
public void testGetServerCapabilitiesRetry() {
getSystemInfoUnavailable.set(2);
Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(CAPABILITIES, capabilities);
Expand All @@ -160,7 +160,7 @@ public void testGetServerCapabilitiesRetry() throws Exception {
}

@Test
public void testGetServerCapabilitiesUnavailable() throws Exception {
public void testGetServerCapabilitiesUnavailable() {
getSystemInfoUnavailable.set(Integer.MAX_VALUE);
try {
Capabilities unused = channelManager.getServerCapabilities().get();
Expand All @@ -174,7 +174,7 @@ public void testGetServerCapabilitiesUnavailable() throws Exception {
}

@Test
public void testGetServerCapabilitiesUnimplemented() throws Exception {
public void testGetServerCapabilitiesUnimplemented() {
getSystemInfoUnimplemented.set(1);
Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(Capabilities.getDefaultInstance(), capabilities);
Expand All @@ -184,7 +184,7 @@ public void testGetServerCapabilitiesUnimplemented() throws Exception {
}

@Test
public void testGetServerCapabilitiesWithConnect() throws Exception {
public void testGetServerCapabilitiesWithConnect() {
channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100));
Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(CAPABILITIES, capabilities);
Expand All @@ -194,7 +194,7 @@ public void testGetServerCapabilitiesWithConnect() throws Exception {
}

@Test
public void testGetServerCapabilitiesRetryWithConnect() throws Exception {
public void testGetServerCapabilitiesRetryWithConnect() {
getSystemInfoUnavailable.set(2);
channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100));
Capabilities capabilities = channelManager.getServerCapabilities().get();
Expand All @@ -205,7 +205,7 @@ public void testGetServerCapabilitiesRetryWithConnect() throws Exception {
}

@Test
public void testGetServerCapabilitiesUnavailableWithConnect() throws Exception {
public void testGetServerCapabilitiesUnavailableWithConnect() {
getSystemInfoUnavailable.set(Integer.MAX_VALUE);
try {
channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100));
Expand All @@ -220,7 +220,7 @@ public void testGetServerCapabilitiesUnavailableWithConnect() throws Exception {
}

@Test
public void testGetServerCapabilitiesUnimplementedWithConnect() throws Exception {
public void testGetServerCapabilitiesUnimplementedWithConnect() {
getSystemInfoUnimplemented.set(1);
channelManager.connect(HEALTH_CHECK_NAME, Duration.ofMillis(100));
Capabilities capabilities = channelManager.getServerCapabilities().get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this material except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.temporal.serviceclient;

import static org.junit.Assert.assertEquals;

import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.temporal.api.workflowservice.v1.GetSystemInfoRequest;
import io.temporal.api.workflowservice.v1.GetSystemInfoResponse;
import io.temporal.api.workflowservice.v1.WorkflowServiceGrpc;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.*;

public class SystemInfoTimeoutTest {

private static final GetSystemInfoResponse.Capabilities CAPABILITIES =
GetSystemInfoResponse.Capabilities.newBuilder().setInternalErrorDifferentiation(true).build();

private static final GetSystemInfoResponse GET_SYSTEM_INFO_RESPONSE =
GetSystemInfoResponse.newBuilder().setCapabilities(CAPABILITIES).build();

private static final RpcRetryOptions RPC_RETRY_OPTIONS =
RpcRetryOptions.newBuilder()
.setInitialInterval(Duration.ofMillis(10))
.setBackoffCoefficient(1.0)
.setMaximumAttempts(3)
.setExpiration(Duration.ofMillis(100))
.validateBuildWithDefaults();

@Rule public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
private final AtomicInteger getSystemInfoCount = new AtomicInteger(0);
private final AbstractQueue<Duration> getSystemInfoTimeout = new ArrayBlockingQueue<Duration>(10);

private final WorkflowServiceGrpc.WorkflowServiceImplBase workflowImpl =
new WorkflowServiceGrpc.WorkflowServiceImplBase() {
@Override
public void getSystemInfo(
GetSystemInfoRequest request, StreamObserver<GetSystemInfoResponse> responseObserver) {
Duration timeout = getSystemInfoTimeout.poll();
if (timeout != null) {
try {
Thread.sleep(timeout.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
getSystemInfoCount.getAndIncrement();
responseObserver.onNext(GET_SYSTEM_INFO_RESPONSE);
responseObserver.onCompleted();
}
};

private ManagedChannel managedChannel;

@Before
public void setUp() throws Exception {
getSystemInfoCount.set(0);
String serverName = InProcessServerBuilder.generateName();
grpcCleanupRule.register(
InProcessServerBuilder.forName(serverName)
.directExecutor()
.addService(workflowImpl)
.build()
.start());
managedChannel =
grpcCleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}

@Test
public void testGetServerCapabilitiesTimeoutExceeded() {
WorkflowServiceStubsOptions serviceStubsOptions =
WorkflowServiceStubsOptions.newBuilder()
.setChannel(managedChannel)
.setRpcRetryOptions(RPC_RETRY_OPTIONS)
.setSystemInfoTimeout(Duration.ofSeconds(1))
.validateAndBuildWithDefaults();

ClientInterceptor deadlineInterceptor =
new GrpcDeadlineInterceptor(
serviceStubsOptions.getRpcTimeout(),
serviceStubsOptions.getRpcLongPollTimeout(),
serviceStubsOptions.getRpcQueryTimeout());

ChannelManager channelManager =
new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor));

getSystemInfoTimeout.add(Duration.ofSeconds(2));

StatusRuntimeException sre =
Assert.assertThrows(
StatusRuntimeException.class, () -> channelManager.getServerCapabilities().get());
assertEquals(Status.Code.DEADLINE_EXCEEDED, sre.getStatus().getCode());
}

@Test
public void testGetServerCapabilitiesRetry() {
WorkflowServiceStubsOptions serviceStubsOptions =
WorkflowServiceStubsOptions.newBuilder()
.setChannel(managedChannel)
.setRpcRetryOptions(RPC_RETRY_OPTIONS)
.setRpcTimeout(Duration.ofMillis(500))
.setSystemInfoTimeout(Duration.ofSeconds(5))
.validateAndBuildWithDefaults();

ClientInterceptor deadlineInterceptor =
new GrpcDeadlineInterceptor(
serviceStubsOptions.getRpcTimeout(),
serviceStubsOptions.getRpcLongPollTimeout(),
serviceStubsOptions.getRpcQueryTimeout());

ChannelManager channelManager =
new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor));

getSystemInfoTimeout.add(Duration.ofSeconds(1));
getSystemInfoTimeout.add(Duration.ofSeconds(1));

GetSystemInfoResponse.Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(CAPABILITIES, capabilities);
assertEquals(3, getSystemInfoCount.get());
}

@Test
public void testGetServerCapabilitiesTimeout() {
WorkflowServiceStubsOptions serviceStubsOptions =
WorkflowServiceStubsOptions.newBuilder()
.setChannel(managedChannel)
.setRpcRetryOptions(RPC_RETRY_OPTIONS)
.setSystemInfoTimeout(Duration.ofSeconds(10))
.validateAndBuildWithDefaults();

ClientInterceptor deadlineInterceptor =
new GrpcDeadlineInterceptor(
serviceStubsOptions.getRpcTimeout(),
serviceStubsOptions.getRpcLongPollTimeout(),
serviceStubsOptions.getRpcQueryTimeout());

ChannelManager channelManager =
new ChannelManager(serviceStubsOptions, Collections.singletonList(deadlineInterceptor));

getSystemInfoTimeout.add(Duration.ofSeconds(6));

GetSystemInfoResponse.Capabilities capabilities = channelManager.getServerCapabilities().get();
assertEquals(CAPABILITIES, capabilities);
assertEquals(1, getSystemInfoCount.get());
}
}

0 comments on commit 52aa9e8

Please sign in to comment.