Skip to content

Commit

Permalink
prevent recursive update exception thrown from CHM
Browse files Browse the repository at this point in the history
  • Loading branch information
popduke committed Dec 17, 2024
1 parent d0c02b8 commit 6d88bb0
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import com.baidu.bifromq.basecrdt.service.ICRDTService;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
class BaseKVMetaService implements IBaseKVMetaService {
Expand All @@ -43,7 +45,8 @@ public Observable<Set<String>> clusterIds() {
.filter(NameUtil::isLandscapeURI)
.map(NameUtil::parseClusterId)
.collect(Collectors.toSet()))
.distinctUntilChanged();
.distinctUntilChanged()
.observeOn(Schedulers.single());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.baidu.bifromq.basecrdt.service.ICRDTService;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -30,11 +31,13 @@ public RPCServiceTrafficService(ICRDTService crdtService) {

@Override
public Observable<Set<String>> services() {
return crdtService.aliveCRDTs().map(crdtUris -> crdtUris.stream()
return crdtService.aliveCRDTs()
.map(crdtUris -> crdtUris.stream()
.filter(NameUtil::isLandscapeURI)
.map(NameUtil::parseServiceUniqueName)
.collect(Collectors.toSet()))
.distinctUntilChanged();
.distinctUntilChanged()
.observeOn(Schedulers.single());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void channelInactive(ChannelHandlerContext ctx) {
@Override
public final void channelRead(ChannelHandlerContext ctx, Object msg) {
MqttMessage mqttMessage = (MqttMessage) msg;
log.trace("Received {}", mqttMessage);
if (mqttMessage.fixedHeader().messageType() == MqttMessageType.CONNECT) {
MqttConnectMessage connMsg = (MqttConnectMessage) msg;
GoAway goAway = sanityCheck(connMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,6 @@

package com.baidu.bifromq.mqtt.handler.v5;

import static com.baidu.bifromq.metrics.TenantMetric.MqttAuthFailureCount;
import static com.baidu.bifromq.mqtt.handler.MQTTConnectHandler.AuthResult.goAway;
import static com.baidu.bifromq.mqtt.handler.MQTTConnectHandler.AuthResult.ok;
import static com.baidu.bifromq.mqtt.handler.condition.ORCondition.or;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.authData;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.authMethod;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.isUTF8Payload;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.maximumPacketSize;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.requestProblemInformation;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.requestResponseInformation;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.toUserProperties;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.toWillMessage;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.topicAliasMaximum;
import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildConnAction;
import static com.baidu.bifromq.mqtt.utils.MQTT5MessageSizer.MIN_CONTROL_PACKET_SIZE;
import static com.baidu.bifromq.plugin.eventcollector.ThreadLocalEventPool.getLocal;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_CHANNEL_ID_KEY;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ADDRESS_KEY;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_BROKER_KEY;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_CLIENT_ID_KEY;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_5_VALUE;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_PROTOCOL_VER_KEY;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_RESPONSE_INFO;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_TYPE_VALUE;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.MQTT_USER_ID_KEY;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BANNED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IMPLEMENTATION_SPECIFIC;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_MALFORMED_PACKET;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED_5;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_PACKET_TOO_LARGE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_PAYLOAD_FORMAT_INVALID;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_PROTOCOL_ERROR;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_QOS_NOT_SUPPORTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_QUOTA_EXCEEDED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_RETAIN_NOT_SUPPORTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_MOVED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_TOPIC_NAME_INVALID;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSPECIFIED_ERROR;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNSUPPORTED_PROTOCOL_VERSION;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;

import com.baidu.bifromq.inbox.storage.proto.LWT;
import com.baidu.bifromq.metrics.ITenantMeter;
import com.baidu.bifromq.mqtt.handler.ChannelAttrs;
Expand All @@ -70,28 +26,12 @@
import com.baidu.bifromq.mqtt.utils.AuthUtil;
import com.baidu.bifromq.mqtt.utils.IMQTTMessageSizer;
import com.baidu.bifromq.plugin.authprovider.IAuthProvider;
import com.baidu.bifromq.plugin.authprovider.type.Continue;
import com.baidu.bifromq.plugin.authprovider.type.Failed;
import com.baidu.bifromq.plugin.authprovider.type.MQTT5AuthData;
import com.baidu.bifromq.plugin.authprovider.type.MQTT5ExtendedAuthData;
import com.baidu.bifromq.plugin.authprovider.type.MQTTAction;
import com.baidu.bifromq.plugin.authprovider.type.Success;
import com.baidu.bifromq.plugin.authprovider.type.*;
import com.baidu.bifromq.plugin.clientbalancer.IClientBalancer;
import com.baidu.bifromq.plugin.clientbalancer.Redirection;
import com.baidu.bifromq.plugin.eventcollector.OutOfTenantResource;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.AuthError;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.EnhancedAuthAbortByClient;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.MalformedClientIdentifier;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.MalformedUserName;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.MalformedWillTopic;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.NotAuthorizedClient;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.ProtocolError;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.UnauthenticatedClient;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.InboxTransientError;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.InvalidTopic;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ProtocolViolation;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.Redirect;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.ResourceThrottled;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.channelclosed.*;
import com.baidu.bifromq.plugin.eventcollector.mqttbroker.clientdisconnect.*;
import com.baidu.bifromq.sysprops.props.MaxMqtt5ClientIdLength;
import com.baidu.bifromq.type.ClientInfo;
import com.baidu.bifromq.type.QoS;
Expand All @@ -100,18 +40,25 @@
import com.bifromq.plugin.resourcethrottler.TenantResourceType;
import com.google.common.base.Strings;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.*;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.baidu.bifromq.metrics.TenantMetric.MqttAuthFailureCount;
import static com.baidu.bifromq.mqtt.handler.MQTTConnectHandler.AuthResult.goAway;
import static com.baidu.bifromq.mqtt.handler.MQTTConnectHandler.AuthResult.ok;
import static com.baidu.bifromq.mqtt.handler.condition.ORCondition.or;
import static com.baidu.bifromq.mqtt.handler.v5.MQTT5MessageUtils.*;
import static com.baidu.bifromq.mqtt.utils.AuthUtil.buildConnAction;
import static com.baidu.bifromq.mqtt.utils.MQTT5MessageSizer.MIN_CONTROL_PACKET_SIZE;
import static com.baidu.bifromq.plugin.eventcollector.ThreadLocalEventPool.getLocal;
import static com.baidu.bifromq.type.MQTTClientInfoConstants.*;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;

@Slf4j
public class MQTT5ConnectHandler extends MQTTConnectHandler {
Expand Down Expand Up @@ -257,9 +204,18 @@ protected CompletableFuture<AuthResult> authenticate(MqttConnectMessage message)
.build(),
getLocal(UnauthenticatedClient.class).peerAddress(clientAddress));
}
case BadAuthMethod -> {
return goAway(MqttMessageBuilders
.connAck()
.returnCode(CONNECTION_REFUSED_BAD_AUTHENTICATION_METHOD)
.build(),
getLocal(AuthError.class)
.cause(failed.getReason())
.peerAddress(clientAddress));
}
// fallthrough
default -> {
log.error("Unexpected error from auth provider:{}", failed.getReason());
log.error("Unexpected auth error:{}", failed.getReason());
return goAway(MqttMessageBuilders
.connAck()
.returnCode(CONNECTION_REFUSED_UNSPECIFIED_ERROR)
Expand Down Expand Up @@ -387,7 +343,7 @@ private void extendedAuth(MQTT5ExtendedAuthData authData) {
.peerAddress(clientAddress)));
// fallthrough
default -> {
log.error("Unexpected error from auth provider:{}", failed.getReason());
log.error("Unexpected ext-auth error:{}", failed.getReason());
extendedAuthFuture.complete(goAway(MqttMessageBuilders
.connAck()
.returnCode(CONNECTION_REFUSED_UNSPECIFIED_ERROR)
Expand Down

0 comments on commit 6d88bb0

Please sign in to comment.