Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11678][SDK] Optimize the ProxyClientConfig class #11679

Merged
merged 1 commit into from
Jan 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.HttpManager;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.enums.ComponentTypeEnum;
import org.apache.inlong.common.enums.NodeSrvStatus;
import org.apache.inlong.common.heartbeat.AbstractHeartbeatManager;
import org.apache.inlong.common.heartbeat.HeartbeatMsg;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -194,13 +193,12 @@ private void createMessageSender() {
String managerAddr = conf.get(AGENT_MANAGER_ADDR);
String authSecretId = conf.get(AGENT_MANAGER_AUTH_SECRET_ID);
String authSecretKey = conf.get(AGENT_MANAGER_AUTH_SECRET_KEY);
ProxyClientConfig proxyClientConfig = null;
TcpMsgSenderConfig proxyClientConfig = null;
try {
proxyClientConfig = new ProxyClientConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig = new TcpMsgSenderConfig(managerAddr, INLONG_AGENT_SYSTEM, authSecretId, authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(CommonConstants.DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE);
proxyClientConfig.setAliveConnections(CommonConstants.DEFAULT_PROXY_ALIVE_CONNECTION_NUM);
proxyClientConfig.setIoThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setNettyWorkerThreadNum(CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
proxyClientConfig.setRequestTimeoutMs(30000L);
ThreadFactory SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-heartbeat",
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.exception.ProxySdkException;
Expand Down Expand Up @@ -199,15 +198,14 @@ private AgentMetricItem getMetricItem(String groupId, String streamId) {
* createMessageSender
*/
private void createMessageSender() throws Exception {
ProxyClientConfig proxyClientConfig = new ProxyClientConfig(managerAddr, inlongGroupId, authSecretId,
authSecretKey);
TcpMsgSenderConfig proxyClientConfig = new TcpMsgSenderConfig(
managerAddr, inlongGroupId, authSecretId, authSecretKey);
proxyClientConfig.setTotalAsyncCallbackSize(totalAsyncBufSize);
proxyClientConfig.setAliveConnections(aliveConnectionNum);
proxyClientConfig.setRequestTimeoutMs(maxSenderTimeout * 1000L);

proxyClientConfig.setIoThreadNum(ioThreadNum);
proxyClientConfig.setEnableBusyWait(enableBusyWait);
proxyClientConfig.setProtocolType(ProtocolType.TCP);
proxyClientConfig.setNettyWorkerThreadNum(ioThreadNum);
proxyClientConfig.setEnableEpollBusyWait(enableBusyWait);

SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + sourcePath,
Thread.currentThread().isDaemon());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.inlong.sdk.dataproxy;

import org.apache.inlong.common.constant.ProtocolType;
import org.apache.inlong.common.msg.AttributeConstants;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SdkConsts;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
Expand Down Expand Up @@ -55,58 +55,54 @@ public class DefaultMessageSender implements MessageSender {
/* Store index <groupId_streamId,cnt> */
private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
private String groupId;
private int msgtype = ConfigConstants.MSG_TYPE;
private int msgtype = SdkConsts.MSG_TYPE;
private boolean isCompress = true;
private boolean isGroupIdTransfer = false;
private boolean isReport = false;
private boolean isSupportLF = false;
private int maxPacketLength = -1;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
private int cpsSize = SdkConsts.COMPRESS_SIZE;
private final int senderMaxAttempt;

public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
public DefaultMessageSender(TcpMsgSenderConfig configure) throws Exception {
this(configure, null);
}

public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
public DefaultMessageSender(TcpMsgSenderConfig configure, ThreadFactory selfDefineFactory) throws Exception {
ProxyUtils.validClientConfig(configure);
sender = new Sender(configure, selfDefineFactory);
sender.start();
groupId = configure.getInlongGroupId();
indexCol = new IndexCollectThread(storeIndex);
senderMaxAttempt = configure.getSenderMaxAttempt();
senderMaxAttempt = configure.getMaxSyncSendAttempt();
indexCol.start();

}

/**
* generate by cluster id
*
* @param configure - sender
* @param tcpConfig - sender
* @return - sender
*/
public static DefaultMessageSender generateSenderByClusterId(
ProxyClientConfig configure) throws Exception {
TcpMsgSenderConfig tcpConfig) throws Exception {

return generateSenderByClusterId(configure, null);
return generateSenderByClusterId(tcpConfig, null);
}

/**
* generate by cluster id
*
* @param configure - sender
* @param tcpConfig - sender
* @param selfDefineFactory - sender factory
* @return - sender
*/
public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
public static DefaultMessageSender generateSenderByClusterId(TcpMsgSenderConfig tcpConfig,
ThreadFactory selfDefineFactory) throws Exception {
// correct ProtocolType settings
if (!ProtocolType.TCP.equals(configure.getProtocolType())) {
configure.setProtocolType(ProtocolType.TCP);
}
LOGGER.info("Initial tcp sender, configure is {}", configure);
LOGGER.info("Initial tcp sender, configure is {}", tcpConfig);
// initial sender object
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure);
ProxyConfigManager proxyConfigManager = new ProxyConfigManager(tcpConfig);
Tuple2<ProxyConfigEntry, String> result =
proxyConfigManager.getGroupIdConfigure(true);
if (result.getF0() == null) {
Expand All @@ -117,7 +113,7 @@ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig c
return sender;
} else {
DefaultMessageSender tmpMessageSender =
new DefaultMessageSender(configure, selfDefineFactory);
new DefaultMessageSender(tcpConfig, selfDefineFactory);
tmpMessageSender.setMaxPacketLength(result.getF0().getMaxPacketLength());
CACHE_SENDER.put(result.getF0().getClusterId(), tmpMessageSender);
return tmpMessageSender;
Expand All @@ -144,8 +140,8 @@ public void close() {
shutdownInternalThreads();
}

public ProxyClientConfig getProxyClientConfig() {
return sender.getConfigure();
public TcpMsgSenderConfig getProxyClientConfig() {
return sender.getTcpConfig();
}

public boolean isSupportLF() {
Expand Down Expand Up @@ -213,7 +209,7 @@ public void setMaxPacketLength(int maxPacketLength) {
}

public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
return SdkConsts.PROXY_SDK_VERSION;
}

private SendResult attemptSendMessage(Function<Sender, SendResult> sendOperation) {
Expand Down
Loading
Loading