Skip to content

Commit

Permalink
Support cleaning Publisher data based on DataID & ignore Publisher re…
Browse files Browse the repository at this point in the history
…quests from clients
  • Loading branch information
hui-cha committed Dec 16, 2024
1 parent 632e0d6 commit 0d7342c
Show file tree
Hide file tree
Showing 11 changed files with 1,110 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class ValueConstants {
DataInfo.toDataInfoId(
"session.blacklist.data", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);

public static final String SESSION_DATAID_BLACKLIST_DATA_ID =
DataInfo.toDataInfoId(
"session.dataid.blacklist", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);

public static final String CLIENT_OFF_ADDRESS_DATA_ID =
DataInfo.toDataInfoId(
"registry.client.off.list", SESSION_PROVIDE_DATA_INSTANCE_ID, SESSION_PROVIDE_DATA_GROUP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.alipay.sofa.registry.server.meta.resource.CircuitBreakerResources;
import com.alipay.sofa.registry.server.meta.resource.ClientManagerResource;
import com.alipay.sofa.registry.server.meta.resource.CompressResource;
import com.alipay.sofa.registry.server.meta.resource.DataInfoIDBlacklistResource;
import com.alipay.sofa.registry.server.meta.resource.HealthResource;
import com.alipay.sofa.registry.server.meta.resource.MetaCenterResource;
import com.alipay.sofa.registry.server.meta.resource.MetaDigestResource;
Expand All @@ -78,6 +79,14 @@
import com.alipay.sofa.registry.util.NamedThreadFactory;
import com.alipay.sofa.registry.util.OsUtils;
import com.alipay.sofa.registry.util.PropertySplitter;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
Expand All @@ -86,13 +95,6 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

/**
* @author shangyu.wh
Expand Down Expand Up @@ -394,6 +396,12 @@ public BlacklistDataResource blacklistDataResource() {
return new BlacklistDataResource();
}

@Bean
@ConditionalOnMissingBean
public DataInfoIDBlacklistResource dataInfoIDBlacklistResource() {
return new DataInfoIDBlacklistResource();
}

@Bean
public ClientManagerResource clientManagerResource() {
return new ClientManagerResource();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.common.model.Tuple;
import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.console.PersistenceDataBuilder;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.common.model.metaserver.ProvideDataChangeEvent;
import com.alipay.sofa.registry.common.model.store.DataInfo;
import com.alipay.sofa.registry.core.model.Result;
import com.alipay.sofa.registry.jdbc.constant.TableEnum;
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.server.meta.resource.filter.AuthRestController;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController;
import com.alipay.sofa.registry.store.api.DBResponse;
import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.store.api.config.DefaultCommonConfig;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.ws.rs.FormParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.HashSet;
import java.util.Set;

/**
* @author huicha
* @date 2024/12/13
*/
@Path("datainfoid/blacklist")
@AuthRestController
@LeaderAwareRestController
public class DataInfoIDBlacklistResource {
private static final Logger LOGGER = LoggerFactory.getLogger(DataInfoIDBlacklistResource.class);

@Autowired
private ProvideDataService provideDataService;

@Autowired
private ProvideDataNotifier provideDataNotifier;

@Autowired
private DefaultCommonConfig defaultCommonConfig;

@POST
@Path("add")
@Produces(MediaType.APPLICATION_JSON)
public Result addBlackList(@FormParam("dataCenter") String dataCenter,
@FormParam("dataId") String dataId,
@FormParam("group") String group,
@FormParam("instanceId") String instanceId) {
try {
return process(dataCenter, dataId, group, instanceId, Operation.ADD);
} catch (Throwable throwable) {
LOGGER.error("Save dataid black list exception", throwable);
return Result.failed("Save dataid black list exception");
}
}

@POST
@Path("delete")
@Produces(MediaType.APPLICATION_JSON)
public Result deleteBlackList(@FormParam("dataCenter") String dataCenter,
@FormParam("dataId") String dataId,
@FormParam("group") String group,
@FormParam("instanceId") String instanceId) {
try {
return process(dataCenter, dataId, group, instanceId, Operation.DELETE);
} catch (Throwable throwable) {
LOGGER.error("Delete dataid black list exception", throwable);
return Result.failed("Delete dataid black list exception");
}
}

private Result process(String dataCenter, String dataId, String group, String instanceId, Operation operation) {
// 1. 参数检查
// 1.1. 检查 DataCenter 是否就是当前 Meta 的所属 DataCenter
String clusterId = defaultCommonConfig.getClusterId(TableEnum.PROVIDE_DATA.getTableName(), ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
if (!StringUtils.equals(dataCenter, clusterId)) {
// 给定的机房不是当前机房,那么拒绝添加黑名单,直接返回
return Result.failed("Invalid data center");
}

// 1.2. 检查要处理的 DataId 以及 Group 是否符合规则
DataInfo dataInfo = new DataInfo(instanceId, dataId, group);
Tuple<Boolean, String> checkResult = this.checkDataInfoId(dataInfo);
if (!checkResult.o1) {
// 不符合规则,那么拒绝添加黑名单,直接返回
return Result.failed("Invalid dataid: " + checkResult.o2);
}

// 2. 查询出当前黑名单列表
DBResponse<PersistenceData> queryResponse =
this.provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);

// 3. 根据操作类型,添加 DataID 到列表中,或者删除列表中的 DataID,并保存
Tuple<PersistenceData, Long> tuple = this.createNewPersistenceData(queryResponse, dataInfo, operation);
PersistenceData newPersistenceData = tuple.o1;
Long oldVersion = tuple.o2;
if (!this.provideDataService.saveProvideData(newPersistenceData, oldVersion)) {
// 保存失败
return Result.failed("Save new black list fail");
}

// 4. 保存成功则通知 Session 黑名单变化了
ProvideDataChangeEvent provideDataChangeEvent =
new ProvideDataChangeEvent(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
newPersistenceData.getVersion());
this.provideDataNotifier.notifyProvideDataChange(provideDataChangeEvent);

return Result.success();
}

private Tuple<PersistenceData, Long> createNewPersistenceData(DBResponse<PersistenceData> queryResponse, DataInfo dataInfo, Operation operation) {
OperationStatus operationStatus = queryResponse.getOperationStatus();
if (OperationStatus.SUCCESS.equals(operationStatus)) {
// 读取旧数据成功,其格式为 Json 字符串,解析出来
PersistenceData oldPersistenceData = queryResponse.getEntity();
String oldBlackListJson = oldPersistenceData.getData();
Set<String> oldDataIdBlackList = JsonUtils.read(oldBlackListJson, new TypeReference<Set<String>>() {});

// 添加或删除新的需要拉黑的数据
if (Operation.ADD.equals(operation)) {
oldDataIdBlackList.add(dataInfo.getDataInfoId());
} else {
oldDataIdBlackList.remove(dataInfo.getDataInfoId());
}

// 创建新数据,并返回新数据以及旧数据的版本号
PersistenceData newPersistenceData = PersistenceDataBuilder
.createPersistenceData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
JsonUtils.writeValueAsString(oldDataIdBlackList));
return new Tuple<>(newPersistenceData, oldPersistenceData.getVersion());
} else {
// 没有旧数据旧直接创建新的,旧数据的版本号设置为 0
Set<String> dataIdBlackList = new HashSet<>();
if (Operation.ADD.equals(operation)) {
dataIdBlackList.add(dataInfo.getDataInfoId());
}
PersistenceData newPersistenceData = PersistenceDataBuilder
.createPersistenceData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID,
JsonUtils.writeValueAsString(dataIdBlackList));
return new Tuple<>(newPersistenceData, 0L);
}
}

protected Tuple<Boolean, String> checkDataInfoId(DataInfo dataInfo) {
return new Tuple<>(true, "");
}

@VisibleForTesting
public DataInfoIDBlacklistResource setProvideDataService(ProvideDataService provideDataService) {
this.provideDataService = provideDataService;
return this;
}

@VisibleForTesting
public DataInfoIDBlacklistResource setProvideDataNotifier(ProvideDataNotifier provideDataNotifier) {
this.provideDataNotifier = provideDataNotifier;
return this;
}

@VisibleForTesting
public DataInfoIDBlacklistResource setDefaultCommonConfig(DefaultCommonConfig defaultCommonConfig) {
this.defaultCommonConfig = defaultCommonConfig;
return this;
}
}

enum Operation {
ADD,
DELETE
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.common.model.Node.NodeType;
import com.alipay.sofa.registry.common.model.console.PersistenceData;
import com.alipay.sofa.registry.common.model.constants.ValueConstants;
import com.alipay.sofa.registry.core.model.Result;
import com.alipay.sofa.registry.server.meta.AbstractMetaServerTestBase;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataNotifier;
import com.alipay.sofa.registry.server.meta.provide.data.ProvideDataService;
import com.alipay.sofa.registry.store.api.DBResponse;
import com.alipay.sofa.registry.store.api.OperationStatus;
import com.alipay.sofa.registry.store.api.config.DefaultCommonConfig;
import com.alipay.sofa.registry.util.JsonUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* @author huicha
* @date 2024/12/13
*/
public class DataInfoIDBlacklistResourceTest extends AbstractMetaServerTestBase {

private ProvideDataService createProvideDataService() {
return spy(new InMemoryProvideDataRepo());
}

private DataInfoIDBlacklistResource createDataIDBlacklistResource(ProvideDataService provideDataService) {
ProvideDataNotifier provideDataNotifier = mock(ProvideDataNotifier.class);
DefaultCommonConfig defaultCommonConfig = mock(DefaultCommonConfig.class);
when(defaultCommonConfig.getClusterId(anyString(), anyString())).thenReturn("DEFAULT_DATACENTER");

return new DataInfoIDBlacklistResource()
.setProvideDataNotifier(provideDataNotifier)
.setProvideDataService(provideDataService)
.setDefaultCommonConfig(defaultCommonConfig);
}

private DataInfoIDBlacklistResource createDataIDBlacklistResource(ProvideDataService provideDataService,
ProvideDataNotifier provideDataNotifier) {
DefaultCommonConfig defaultCommonConfig = mock(DefaultCommonConfig.class);
when(defaultCommonConfig.getClusterId(anyString(), anyString())).thenReturn("DEFAULT_DATACENTER");

return new DataInfoIDBlacklistResource()
.setProvideDataNotifier(provideDataNotifier)
.setProvideDataService(provideDataService)
.setDefaultCommonConfig(defaultCommonConfig);
}

@Test
public void testAddAndDelete() {
ProvideDataService provideDataService = createProvideDataService();
DataInfoIDBlacklistResource resource = this.createDataIDBlacklistResource(provideDataService);

String dataCenter = "DEFAULT_DATACENTER";
String dataIdOne = "dataid.black.list.test";
String group = "dataid-black-list-test-group";
String instanceId = "DEFAULT_INSTANCE_ID";

// 添加了两个数据
Result resultOne = resource.addBlackList(dataCenter, dataIdOne, group, instanceId);
Assert.assertTrue(resultOne.isSuccess());

String dataIdTwo = "dataid.black.list.test2";
Result resultTwo = resource.addBlackList(dataCenter, dataIdTwo, group, instanceId);
Assert.assertTrue(resultTwo.isSuccess());

// 因此这里的查询结果也应该是两条
DBResponse<PersistenceData> queryResult = provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
Assert.assertEquals(OperationStatus.SUCCESS, queryResult.getOperationStatus());
Assert.assertNotNull(queryResult.getEntity());
PersistenceData persistenceData = queryResult.getEntity();
String dataJson = persistenceData.getData();
Set<String> data = JsonUtils.read(dataJson, new TypeReference<Set<String>>() {});
Assert.assertEquals(2, data.size());
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdOne, instanceId, group)));
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdTwo, instanceId, group)));

// 删除了第一条数据以及一条不存在的数据
Result deleteResultOne = resource.deleteBlackList(dataCenter, dataIdOne, group, instanceId);
Assert.assertTrue(deleteResultOne.isSuccess());

String notExistDataId = "not.exist";
Result deleteResultTwo = resource.deleteBlackList(dataCenter, notExistDataId, group, instanceId);
Assert.assertTrue(deleteResultTwo.isSuccess());

// 因此这里的查询结果应该是只有一条数据,且是第二条数据
DBResponse<PersistenceData> queryResultTwo = provideDataService.queryProvideData(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID);
Assert.assertEquals(OperationStatus.SUCCESS, queryResultTwo.getOperationStatus());
Assert.assertNotNull(queryResultTwo.getEntity());
PersistenceData persistenceDataTwo = queryResultTwo.getEntity();
String dataJsonTwo = persistenceDataTwo.getData();
Set<String> dataTwo = JsonUtils.read(dataJsonTwo, new TypeReference<Set<String>>() {});
Assert.assertEquals(1, dataTwo.size());
Assert.assertTrue(data.contains(String.format("%s#@#%s#@#%s", dataIdTwo, instanceId, group)));
}

@Test
public void testNotify() {
String dataCenter = "DEFAULT_DATACENTER";
String dataId = "dataid.black.list.test";
String group = "dataid-black-list-test-group";
String instanceId = "DEFAULT_INSTANCE_ID";

AtomicInteger counter = new AtomicInteger(0);

ProvideDataService provideDataService = createProvideDataService();
DataInfoIDBlacklistResource resource = this.createDataIDBlacklistResource(provideDataService, event -> {
// 这个数据是提供给 Session 消费的,因此消费的节点类型有且只有 Session
Set<NodeType> nodeTypes = event.getNodeTypes();
Assert.assertEquals(1, nodeTypes.size());
Assert.assertTrue(nodeTypes.contains(NodeType.SESSION));

// 检查 DataInfoId 是否是预期的
String dataInfoId = event.getDataInfoId();
Assert.assertEquals(ValueConstants.SESSION_DATAID_BLACKLIST_DATA_ID, dataInfoId);

// 增加计数
counter.addAndGet(1);
});

Result result = resource.addBlackList(dataCenter, dataId, group, instanceId);
Assert.assertTrue(result.isSuccess());
Assert.assertEquals(1, counter.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,6 @@ public interface SessionServerConfig extends ServerShareConfig {
int getScanExecutorQueueSize();

long getScanTimeoutMills();

int getScanPublisherInDataInfoIdBlackListIntervalMillis();
}
Loading

0 comments on commit 0d7342c

Please sign in to comment.