Skip to content

Commit

Permalink
Add the capability to query the Publisher information of dataInfoId i…
Browse files Browse the repository at this point in the history
…n the Session
  • Loading branch information
hui-cha committed Dec 23, 2024
1 parent 0d7342c commit 593ab10
Show file tree
Hide file tree
Showing 7 changed files with 306 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package com.alipay.sofa.registry.common.model;

import com.alipay.sofa.registry.common.model.dataserver.DatumSummary;
import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher;
import com.alipay.sofa.registry.common.model.slot.filter.SyncAcceptorRequest;
import com.alipay.sofa.registry.common.model.slot.filter.SyncSlotAcceptorManager;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.*;
import org.apache.commons.collections.CollectionUtils;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* @author xiaojian.xj
Expand Down Expand Up @@ -92,4 +99,23 @@ public static Publisher clonePublisher(Publisher publisher) {
newPub.setSessionProcessId(publisher.getSessionProcessId());
return newPub;
}

public static SimplePublisher convert(Publisher publisher) {
return new SimplePublisher(
publisher.getClientId(),
publisher.getSourceAddress().buildAddressString(),
publisher.getAppName()
);
}

public static List<SimplePublisher> convert(Collection<Publisher> publishers) {
if (CollectionUtils.isEmpty(publishers)) {
return Collections.emptyList();
}
List<SimplePublisher> ret = Lists.newArrayListWithCapacity(publishers.size());
for (Publisher publisher : publishers) {
ret.add(convert(publisher));
}
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.alipay.sofa.registry.common.model.sessionserver;

import com.alipay.sofa.registry.util.StringFormatter;

import java.io.Serializable;

/**
* @author huicha
* @date 2024/12/23
*/
public class QueryPublisherRequest implements Serializable {

private static final long serialVersionUID = 5295572570779995725L;

private final String dataInfoId;

public QueryPublisherRequest(String dataInfoId) {
this.dataInfoId = dataInfoId;
}

public String getDataInfoId() {
return dataInfoId;
}

@Override
public String toString() {
return StringFormatter.format("QueryPublisherRequest={}}", dataInfoId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.alipay.sofa.registry.common.model.sessionserver;

import com.alipay.sofa.registry.util.StringFormatter;

import java.io.Serializable;

/**
* @author huicha
* @date 2024/12/23
*/
public final class SimplePublisher implements Serializable {

private static final long serialVersionUID = 6861155219172594665L;

private final String clientId;

private final String sourceAddress;

private final String appName;

public SimplePublisher(String clientId, String sourceAddress, String appName) {
this.clientId = clientId;
this.sourceAddress = sourceAddress;
this.appName = appName;
}

public String getClientId() {
return clientId;
}

public String getSourceAddress() {
return sourceAddress;
}

public String getAppName() {
return appName;
}

@Override
public String toString() {
return StringFormatter.format(
"SimplePublisher{app={},clientId={},add={}}", appName, clientId, sourceAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import com.alipay.sofa.registry.server.session.remoting.console.handler.FilterSubscriberIPsHandler;
import com.alipay.sofa.registry.server.session.remoting.console.handler.GetClientManagerRequestHandler;
import com.alipay.sofa.registry.server.session.remoting.console.handler.PubSubDataInfoIdRequestHandler;
import com.alipay.sofa.registry.server.session.remoting.console.handler.QueryPublisherRequestHandler;
import com.alipay.sofa.registry.server.session.remoting.console.handler.QuerySubscriberRequestHandler;
import com.alipay.sofa.registry.server.session.remoting.console.handler.StopPushRequestHandler;
import com.alipay.sofa.registry.server.session.remoting.handler.AppRevisionSliceHandler;
Expand Down Expand Up @@ -301,6 +302,7 @@ public Collection<AbstractServerHandler> serverSyncHandlers() {
public Collection<AbstractServerHandler> consoleHandlers() {
Collection<AbstractServerHandler> list = new ArrayList<>();
list.add(querySubscriberRequestHandler());
list.add(queryPublisherRequestHandler());
list.add(clientOffRequestHandler());
list.add(clientOnRequestHandler());
list.add(getClientManagerRequestHandler());
Expand Down Expand Up @@ -341,6 +343,11 @@ public AbstractServerHandler querySubscriberRequestHandler() {
return new QuerySubscriberRequestHandler();
}

@Bean
public AbstractServerHandler queryPublisherRequestHandler() {
return new QueryPublisherRequestHandler();
}

@Bean
public AbstractServerHandler filterSubscriberIPsHandler() {
return new FilterSubscriberIPsHandler();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.alipay.sofa.registry.server.session.remoting.console.handler;

import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.PublisherUtils;
import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.remoting.Channel;
import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager;
import com.alipay.sofa.registry.server.session.store.DataStore;
import com.google.common.annotations.VisibleForTesting;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;

/**
* @author huicha
* @date 2024/12/23
*/
public class QueryPublisherRequestHandler extends AbstractConsoleHandler<QueryPublisherRequest> {

@Autowired
protected DataStore sessionDataStore;

@Override
public Object doHandle(Channel channel, QueryPublisherRequest request) {
Collection<Publisher> publishers = sessionDataStore.getDatas(request.getDataInfoId());
return new GenericResponse().fillSucceed(PublisherUtils.convert(publishers));
}

@Override
public Object buildFailedResponse(String msg) {
return new GenericResponse().fillFailed(msg);
}

@Override
public Class interest() {
return QueryPublisherRequest.class;
}

@VisibleForTesting
public QueryPublisherRequestHandler setSessionDataStore(DataStore sessionDataStore) {
this.sessionDataStore = sessionDataStore;
return this;
}

@VisibleForTesting
public QueryPublisherRequestHandler setExecutorManager(ExecutorManager executorManager) {
this.executorManager = executorManager;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
*/
package com.alipay.sofa.registry.server.session.resource;

import static com.alipay.sofa.registry.common.model.constants.ValueConstants.CONNECT_ID_SPLIT;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.ConnectId;
import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.PublisherUtils;
import com.alipay.sofa.registry.common.model.Tuple;
import com.alipay.sofa.registry.common.model.appmeta.InterfaceMapping;
import com.alipay.sofa.registry.common.model.sessionserver.PubSubDataInfoIdRequest;
import com.alipay.sofa.registry.common.model.sessionserver.PubSubDataInfoIdResp;
import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest;
import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.StoreData;
import com.alipay.sofa.registry.common.model.store.Subscriber;
Expand Down Expand Up @@ -52,9 +53,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.ws.rs.GET;
Expand All @@ -64,8 +65,20 @@
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static com.alipay.sofa.registry.common.model.constants.ValueConstants.CONNECT_ID_SPLIT;

/**
* @author shangyu.wh
Expand Down Expand Up @@ -422,4 +435,43 @@ public List<String> getDataServerList() {
public List<String> getMetaServerLeader() {
return Lists.newArrayList(mataNodeService.getMetaServerLeader());
}

@GET
@Path("/data/zone/queryPublisher")
@Produces(MediaType.APPLICATION_JSON)
public GenericResponse<List<SimplePublisher>> queryZonePublisher(@QueryParam("dataInfoId") String dataInfoId) {
Collection<Publisher> publishers = this.sessionDataStore.getDatas(dataInfoId);
List<SimplePublisher> allPublishers = publishers.stream().map(PublisherUtils::convert).collect(Collectors.toList());

List<URL> otherSessions = Sdks.getOtherConsoleServers(null, this.sessionServerConfig, this.metaNodeService);
if (!CollectionUtils.isEmpty(otherSessions)) {
Map<URL, CommonResponse> respMap = Sdks.concurrentSdkSend(
pubSubQueryZoneExecutor,
otherSessions,
(URL url) -> {
final QueryPublisherRequest req = new QueryPublisherRequest(dataInfoId);
return (CommonResponse) sessionConsoleExchanger.request(new SimpleRequest(req, url)).getResult();
},
5000
);

for (Entry<URL, CommonResponse> entry : respMap.entrySet()) {
CommonResponse response = entry.getValue();
if (response instanceof GenericResponse) {
GenericResponse<List<SimplePublisher>> genericResponse = (GenericResponse<List<SimplePublisher>>) response;
if (genericResponse.isSuccess()) {
List<SimplePublisher> subPublishers = genericResponse.getData();
allPublishers.addAll(subPublishers);
} else {
LOGGER.error("url={} query publishers fail, response:{}.",
entry.getKey().getIpAddress(), entry.getValue());
}
} else {
LOGGER.error("url={} query publishers fail, unexpect response type, response:{}.",
entry.getKey().getIpAddress(), entry.getValue());
}
}
}
return new GenericResponse<List<SimplePublisher>>().fillSucceed(allPublishers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package com.alipay.sofa.registry.server.session.remoting.console.handler;

import com.alipay.sofa.registry.common.model.CommonResponse;
import com.alipay.sofa.registry.common.model.GenericResponse;
import com.alipay.sofa.registry.common.model.Node;
import com.alipay.sofa.registry.common.model.sessionserver.QueryPublisherRequest;
import com.alipay.sofa.registry.common.model.sessionserver.SimplePublisher;
import com.alipay.sofa.registry.common.model.store.Publisher;
import com.alipay.sofa.registry.common.model.store.URL;
import com.alipay.sofa.registry.remoting.ChannelHandler;
import com.alipay.sofa.registry.server.session.TestUtils;
import com.alipay.sofa.registry.server.session.bootstrap.ExecutorManager;
import com.alipay.sofa.registry.server.session.store.DataStore;
import org.apache.commons.collections.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.List;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* @author huicha
* @date 2024/12/23
*/
public class QueryPublisherRequestHandlerTest {


@Test
public void testHandle() {
String dataInfoId = "test-data-info-id";

List<Publisher> mockPublishers = new ArrayList<>();
for (int index = 0; index < 3; index++) {
Publisher mockPublisher = new Publisher();
mockPublisher.setDataInfoId(dataInfoId);
mockPublisher.setClientId("ClientId-" + index);
mockPublisher.setSourceAddress(URL.valueOf("127.0.0." + index + ":1234"));
mockPublisher.setAppName("App");
mockPublishers.add(mockPublisher);
}

DataStore dataStore = mock(DataStore.class);
when(dataStore.getDatas(Mockito.eq(dataInfoId))).thenReturn(mockPublishers);

QueryPublisherRequestHandler handler = new QueryPublisherRequestHandler();
handler
.setExecutorManager(new ExecutorManager(TestUtils.newSessionConfig("testDc")))
.setSessionDataStore(dataStore);

Assert.assertNotNull(handler.getExecutor());
Assert.assertEquals(handler.interest(), QueryPublisherRequest.class);
Assert.assertEquals(handler.getConnectNodeType(), Node.NodeType.CONSOLE);
Assert.assertEquals(handler.getType(), ChannelHandler.HandlerType.PROCESSER);
Assert.assertEquals(handler.getInvokeType(), ChannelHandler.InvokeType.SYNC);
Assert.assertFalse(((CommonResponse) handler.buildFailedResponse("msg")).isSuccess());

QueryPublisherRequest notExistReq = new QueryPublisherRequest("not-exist");
GenericResponse<List<SimplePublisher>> notExistResp = (GenericResponse) handler.doHandle(null, notExistReq);
Assert.assertTrue(notExistResp.isSuccess());
List<SimplePublisher> notExistPublishers = notExistResp.getData();
Assert.assertTrue(CollectionUtils.isEmpty(notExistPublishers));

QueryPublisherRequest existReq = new QueryPublisherRequest(dataInfoId);
GenericResponse<List<SimplePublisher>> existResp = (GenericResponse) handler.doHandle(null, existReq);
Assert.assertTrue(existResp.isSuccess());
List<SimplePublisher> existPublishers = existResp.getData();
Assert.assertFalse(CollectionUtils.isEmpty(existPublishers));
Assert.assertEquals(3, existPublishers.size());

for (int index = 0; index < existPublishers.size(); index++) {
SimplePublisher existPublisher = existPublishers.get(index);

String clientId = existPublisher.getClientId();
String sourceAddr = existPublisher.getSourceAddress();
String appName = existPublisher.getAppName();

Assert.assertEquals("ClientId-" + index, clientId);
Assert.assertEquals("127.0.0." + index + ":1234", sourceAddr);
Assert.assertEquals("App", appName);
}
}

}

0 comments on commit 593ab10

Please sign in to comment.