Skip to content

Commit

Permalink
Log detailed reason when server cannot handle client requests due to …
Browse files Browse the repository at this point in the history
…service unavailability #1254
  • Loading branch information
JamesChenX committed Jul 2, 2023
1 parent 8ff4b99 commit ccc0e19
Show file tree
Hide file tree
Showing 12 changed files with 190 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import im.turms.server.common.infra.exception.ResponseException;
import im.turms.server.common.infra.exception.ThrowableInfo;
import im.turms.server.common.infra.healthcheck.ServerStatusManager;
import im.turms.server.common.infra.healthcheck.ServiceAvailability;
import im.turms.server.common.infra.logging.core.logger.Logger;
import im.turms.server.common.infra.logging.core.logger.LoggerFactory;
import im.turms.server.common.infra.property.TurmsPropertiesManager;
Expand Down Expand Up @@ -115,10 +116,12 @@ public Mono<ByteBuf> handleRequest(
// Check if it is a heartbeat request
if (!serviceRequestBuffer.isReadable()) {
serviceRequestBuffer.release();
if (!serverStatusManager.isActive()) {
ServiceAvailability serviceAvailability = serverStatusManager.getServiceAvailability();
if (!serviceAvailability.isOk()) {
return Mono.just(ClientMessageEncoder.encodeResponse(System.currentTimeMillis(),
HEARTBEAT_FAILURE_REQUEST_ID,
ResponseStatusCode.SERVER_UNAVAILABLE));
ResponseStatusCode.SERVER_UNAVAILABLE,
serviceAvailability.reason()));
}
return handleHeartbeatRequest(sessionWrapper);
}
Expand Down Expand Up @@ -237,8 +240,10 @@ public Mono<TurmsNotification> handleServiceRequest(
requestId));
}
// Check server status
if (!serverStatusManager.isActive()) {
ServiceAvailability serviceAvailability = serverStatusManager.getServiceAvailability();
if (!serviceAvailability.isOk()) {
return Mono.just(NotificationFactory.create(ResponseStatusCode.SERVER_UNAVAILABLE,
serviceAvailability.reason(),
requestId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import im.turms.gateway.domain.session.service.SessionService;
import im.turms.server.common.domain.blocklist.service.BlocklistService;
import im.turms.server.common.infra.healthcheck.ServerStatusManager;
import im.turms.server.common.infra.healthcheck.ServiceAvailability;
import im.turms.server.common.infra.lang.ByteArrayWrapper;

/**
Expand All @@ -54,7 +55,8 @@ public ServiceAvailabilityHandler(

@Override
public void channelRegistered(ChannelHandlerContext ctx) {
if (serverStatusManager.isActive()) {
ServiceAvailability serviceAvailability = serverStatusManager.getServiceAvailability();
if (serviceAvailability.isOk()) {
SocketAddress socketAddress = ctx.channel()
.remoteAddress();
if (socketAddress instanceof InetSocketAddress address
Expand Down
20 changes: 13 additions & 7 deletions turms-gateway/src/test/java/integration/access/TcpServerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import im.turms.gateway.domain.session.service.SessionService;
import im.turms.server.common.domain.blocklist.service.BlocklistService;
import im.turms.server.common.infra.healthcheck.ServerStatusManager;
import im.turms.server.common.infra.healthcheck.ServiceAvailability;
import im.turms.server.common.infra.property.env.gateway.TcpProperties;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -58,10 +59,15 @@ void shouldCloseOrAcceptConnection_accordingTo_ServerStatusManager_isActive()
when(blocklistService.isIpBlocked(any(byte[].class))).thenReturn(false);

ServerStatusManager serverStatusManager = mock(ServerStatusManager.class);
List<Boolean> isActiveReturnValues = List.of(true, false, true, false);
OngoingStubbing<Boolean> isActiveStubbing = when(serverStatusManager.isActive());
for (Boolean returnValue : isActiveReturnValues) {
isActiveStubbing = isActiveStubbing.thenReturn(returnValue);
List<ServiceAvailability> serviceAvailabilities = List.of(ServiceAvailability.OK,
ServiceAvailability.SHUTTING_DOWN,
ServiceAvailability.OK,
ServiceAvailability.SHUTTING_DOWN);
OngoingStubbing<ServiceAvailability> serviceAvailabilityStubbing =
when(serverStatusManager.getServiceAvailability());
for (ServiceAvailability serviceAvailability : serviceAvailabilities) {
serviceAvailabilityStubbing =
serviceAvailabilityStubbing.thenReturn(serviceAvailability);
}

SessionService sessionService = mock(SessionService.class);
Expand All @@ -73,7 +79,7 @@ void shouldCloseOrAcceptConnection_accordingTo_ServerStatusManager_isActive()
8 * KB);

int i = 0;
for (Boolean isActive : isActiveReturnValues) {
for (ServiceAvailability serviceAvailability : serviceAvailabilities) {
System.out.printf("The client with index (%d) is connecting", i);
Connection connection = TcpClient.create()
.host(server.host())
Expand All @@ -91,9 +97,9 @@ void shouldCloseOrAcceptConnection_accordingTo_ServerStatusManager_isActive()
// Wait for the server to close the connection
Thread.sleep(200);
boolean isConnected = !connection.isDisposed();
assertThat(isConnected).isEqualTo(isActive);
assertThat(isConnected).isEqualTo(serviceAvailability.isOk());
i++;
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public static ByteBuf encodeResponse(long timestamp, long requestId, ResponseSta
return encodeResponse(timestamp, requestId, code.getBusinessCode(), null);
}

public static ByteBuf encodeResponse(
long timestamp,
long requestId,
ResponseStatusCode code,
@Nullable String reason) {
return encodeResponse(timestamp, requestId, code.getBusinessCode(), reason);
}

public static ByteBuf encodeResponse(
long timestamp,
long requestId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public static ServiceResponse of(@NotNull ResponseStatusCode statusCode) {
return POOL.get(statusCode);
}

public static ServiceResponse of(@NotNull ResponseStatusCode statusCode, String message) {
public static ServiceResponse of(
@NotNull ResponseStatusCode statusCode,
@Nullable String message) {
return message == null
? POOL.get(statusCode)
: new ServiceResponse(statusCode, null, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package im.turms.server.common.infra.healthcheck;

import java.lang.management.ManagementFactory;
import jakarta.annotation.Nullable;

import com.sun.management.OperatingSystemMXBean;

Expand All @@ -40,6 +41,8 @@ public final class CpuHealthChecker extends HealthChecker {

private boolean isCpuHealthy;
private int currentUnhealthyTimes;
@Nullable
private String unhealthyReason;

public CpuHealthChecker(CpuHealthCheckProperties properties) {
operatingSystemBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
Expand All @@ -63,6 +66,12 @@ public boolean isHealthy() {
return isCpuHealthy;
}

@Nullable
@Override
public String getUnhealthyReason() {
return unhealthyReason;
}

@Override
public void updateHealthStatus() {
if (!isCpuHealthCheckAvailable) {
Expand All @@ -71,11 +80,16 @@ public void updateHealthStatus() {
boolean wasCpuHealthy = isCpuHealthy;
double cpuLoad = operatingSystemBean.getCpuLoad();
if (cpuLoad > unhealthyLoadThreshold) {
currentUnhealthyTimes++;
if (currentUnhealthyTimes > cpuCheckRetries) {
if (++currentUnhealthyTimes > cpuCheckRetries) {
unhealthyReason =
"The CPU usage is too high, and it has failed the CPU health check "
+ currentUnhealthyTimes
+ " times, which exceeds max retries "
+ cpuCheckRetries;
isCpuHealthy = false;
}
} else {
unhealthyReason = null;
currentUnhealthyTimes = 0;
isCpuHealthy = true;
}
Expand All @@ -91,4 +105,4 @@ public void updateHealthStatus() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import jakarta.annotation.Nullable;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.springframework.context.annotation.Lazy;
Expand Down Expand Up @@ -54,6 +55,16 @@ public HealthCheckManager(@Lazy Node node, TurmsPropertiesManager propertiesMana
startHealthCheck(properties.getCheckIntervalSeconds());
}

@Nullable
public String getCpuUnhealthyReason() {
return cpuHealthChecker.getUnhealthyReason();
}

@Nullable
public String getMemoryUnhealthyReason() {
return memoryHealthChecker.getUnhealthyReason();
}

public boolean isHealthy() {
return cpuHealthChecker.isHealthy() && memoryHealthChecker.isHealthy();
}
Expand Down Expand Up @@ -81,4 +92,4 @@ private void startHealthCheck(int intervalSeconds) {
}, 0, intervalSeconds, TimeUnit.SECONDS);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@

package im.turms.server.common.infra.healthcheck;

import jakarta.annotation.Nullable;

/**
* @author James Chen
*/
public abstract class HealthChecker {

public abstract boolean isHealthy();

@Nullable
public abstract String getUnhealthyReason();

public abstract void updateHealthStatus();

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.lang.management.MemoryMXBean;
import java.util.List;
import java.util.Optional;
import jakarta.annotation.Nullable;

import com.sun.management.HotSpotDiagnosticMXBean;
import com.sun.management.OperatingSystemMXBean;
Expand Down Expand Up @@ -66,7 +67,6 @@ public final class MemoryHealthChecker extends HealthChecker {
private long usedHeapMemory;
private long usedNonHeapMemory;
private long usedSystemMemory;
private long freeSystemMemory;

private final BufferPoolMXBean directBufferPoolBean;
private final MemoryMXBean memoryMXBean;
Expand All @@ -81,6 +81,10 @@ public final class MemoryHealthChecker extends HealthChecker {
private final int minHeapMemoryGcIntervalMillis;
private long lastHeapMemoryGcTimestamp;

private boolean isMemoryHealthy;
@Nullable
private String unhealthyReason;

public MemoryHealthChecker(MemoryHealthCheckProperties properties) {
operatingSystemBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);

Expand Down Expand Up @@ -166,9 +170,13 @@ public MemoryHealthChecker(MemoryHealthCheckProperties properties) {

@Override
public boolean isHealthy() {
return usedAvailableMemory < maxAvailableMemory
|| usedDirectMemory < maxAvailableDirectMemory
|| freeSystemMemory > minFreeSystemMemory;
return isMemoryHealthy;
}

@Nullable
@Override
public String getUnhealthyReason() {
return unhealthyReason;
}

@Override
Expand All @@ -177,21 +185,51 @@ public void updateHealthStatus() {
// and "PooledByteBufAllocator.DEFAULT.metric().usedDirectMemory()"
// because we have requested Netty to create DirectBuffer instances via its constructor with
// the counter supported by JDK
usedDirectMemory = directBufferPoolBean.getMemoryUsed();
usedHeapMemory = memoryMXBean.getHeapMemoryUsage()
long localUsedDirectMemory = directBufferPoolBean.getMemoryUsed();
usedDirectMemory = localUsedDirectMemory;
long localUsedHeapMemory = memoryMXBean.getHeapMemoryUsage()
.getUsed();
usedHeapMemory = localUsedHeapMemory;
// Non-heap memory pools: [CodeHeap 'non-nmethods', CodeHeap 'non-profiled nmethods',
// CodeHeap 'profiled nmethods',
// Compressed Class Space, Metaspace]
// via ManagementFactory.getMemoryPoolMXBeans().stream().filter(bean -> bean.getType() ==
// MemoryType.NON_HEAP)
// .map(MemoryPoolMXBean::getName).sorted().toList().toString()
usedNonHeapMemory = memoryMXBean.getNonHeapMemoryUsage()
long localUsedNonHeapMemory = memoryMXBean.getNonHeapMemoryUsage()
.getUsed();
usedAvailableMemory = usedDirectMemory + usedHeapMemory + usedNonHeapMemory;
freeSystemMemory = operatingSystemBean.getFreeMemorySize();
usedSystemMemory = totalPhysicalMemorySize - freeSystemMemory;
usedNonHeapMemory = localUsedNonHeapMemory;
long localUsedAvailableMemory =
localUsedDirectMemory + localUsedHeapMemory + localUsedNonHeapMemory;
usedAvailableMemory = localUsedAvailableMemory;
long localFreeSystemMemory = operatingSystemBean.getFreeMemorySize();
long localUsedSystemMemory = totalPhysicalMemorySize - localFreeSystemMemory;
usedSystemMemory = localUsedSystemMemory;

if (localUsedAvailableMemory < maxAvailableMemory
|| localUsedDirectMemory < maxAvailableDirectMemory
|| localFreeSystemMemory > minFreeSystemMemory) {
unhealthyReason = null;
isMemoryHealthy = true;
} else {
unhealthyReason =
"The memory is insufficient. The insufficient memory usage snapshot is: "
+ "Used system memory: "
+ asMbString(localUsedSystemMemory)
+ "/"
+ asMbString(totalPhysicalMemorySize)
+ "; "
+ "Used available memory: "
+ asMbString(localUsedAvailableMemory)
+ "/"
+ asMbString(maxAvailableMemory)
+ "; "
+ "Used direct memory: "
+ asMbString(localUsedDirectMemory)
+ "/"
+ asMbString(maxAvailableDirectMemory);
isMemoryHealthy = false;
}
tryLog();
}

Expand Down Expand Up @@ -266,4 +304,4 @@ private String asMbString(long bytes) {
+ "MB";
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,26 @@ public ServerStatusManager(
this.healthCheckManager = healthCheckManager;
}

public boolean isActive() {
return !context.isClosing() && node.isActive() && healthCheckManager.isHealthy();
public ServiceAvailability getServiceAvailability() {
if (context.isClosing()) {
return ServiceAvailability.SHUTTING_DOWN;
}
if (!node.isActive()) {
return ServiceAvailability.INACTIVE;
}
String unhealthyReason = healthCheckManager.getCpuUnhealthyReason();
if (unhealthyReason != null) {
return new ServiceAvailability(
ServiceAvailability.Status.HIGH_CPU_USAGE,
unhealthyReason);
}
unhealthyReason = healthCheckManager.getMemoryUnhealthyReason();
if (unhealthyReason != null) {
return new ServiceAvailability(
ServiceAvailability.Status.INSUFFICIENT_MEMORY,
unhealthyReason);
}
return ServiceAvailability.OK;
}

}
Loading

0 comments on commit ccc0e19

Please sign in to comment.