diff --git a/docs/sdk/dataproxy-sdk/java.md b/docs/sdk/dataproxy-sdk/java.md index ad955527bd8..d8d09633aea 100644 --- a/docs/sdk/dataproxy-sdk/java.md +++ b/docs/sdk/dataproxy-sdk/java.md @@ -10,7 +10,7 @@ Create a task on the Dashboard or through the command line, and use `Auto Push` ## Import Java SDK The library of the SDK need to be imported into the project before using the SDK. The library can be obtained in the following two ways: -- Get the source code and compile it yourself and deploy the SDK package to the local warehouse, see [How to Build](https://inlong.apache.org/docs/next/quick_start/how_to_build/). +- Get the source code and compile it yourself and deploy the SDK package to the local warehouse, see [How to Build](https://inlong.apache.org/docs/next/development/how_to_build/). - Imported through maven dependency like this:

 {`
@@ -22,106 +22,112 @@ The library of the SDK need to be imported into the project before using the SDK
 
## Data report process -After import the SDK, you can instantiate a [MessageSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java) object, call sync(`sendMessage()`) or async(`asyncSendMessage()`) interface to report single or multiple(batch) data. see [Send Demo](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java). +After import the SDK, you can instantiate a [TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java) object, call sync(`sendMessage()`) or async(`asyncSendMessage()`) interface to report single or multiple(batch) data. see [TcpClientExample.java](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java). The overall process includes the following three steps: ### Initialize SDK From the demo code, we can see that the client initialization is mainly done in the `getMessageSender()` function: ```java -public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, - String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, - String configBasePath, int msgType) { - ProxyClientConfig dataProxyConfig = null; - DefaultMessageSender messageSender = null; - try { - // Initialize client configuration. 'admin', 'inlong' is default username and password of InLong-Manager, which need to be replaced according to the environment configuration in actual use. - dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, - Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); - // Set the local save path of the configuration. This setting is optional. By default, the SDK will create a "/.inlong/" directory under the current user's working directory to store the configuration. - if (StringUtils.isNotEmpty(configBasePath)) { - dataProxyConfig.setConfStoreBasePath(configBasePath); - } - // Set whether to use the local saved configuration or not. This setting is optional. By default, do not use. - dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); - // Set the TCP protocol for transmission - dataProxyConfig.setProtocolType(ProtocolType.TCP); - // Initialize MessageSender object, if there is an exception, an exception will be thrown. - messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig); - // Set message type to send. This setting is optional. By default, send data in binary format. - messageSender.setMsgtype(msgType); - } catch (Exception e) { - logger.error("getMessageSender has exception e = {}", e); - } - // Return the sender. - return messageSender; + public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean visitMgrByHttps, + String managerAddr, String managerPort, String inlongGroupId, int msgType, + boolean useLocalMetaConfig, String configBasePath) { + TcpMsgSender messageSender = null; + try { + // build sender configure + // 'admin', 'inlong' is default username and password of InLong-Manager, which need to be replaced according to the environment configuration in actual use. + TcpMsgSenderConfig tcpConfig = + new TcpMsgSenderConfig(visitMgrByHttps, managerAddr, + Integer.parseInt(managerPort), inlongGroupId, "admin", "inlong"); + // Set the local save path of the configuration. This setting is optional. By default, the SDK will create a "/.inlong/" directory under the current user's working directory to store the configuration. + tcpConfig.setMetaStoreBasePath(configBasePath); + // Set whether to use the local saved configuration or not. This setting is optional. By default, do not use. + tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig); + // Set message type to send. This setting is optional. By default, send data in binary format. + tcpConfig.setSdkMsgType(MsgType.valueOf(msgType)); + tcpConfig.setRequestTimeoutMs(20000L); + // build sender object + messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig); + } catch (Throwable ex) { + System.out.println("Get MessageSender throw exception, " + ex); } + return messageSender; +} ``` -### ProxyClientConfig configuration -| parameter name | Parameter Description | default value | -| ------ | ------ | -------| -| inlongGroupId | inlongGroupId | not null | -| inlongStreamId | inlongStreamId | not null | -| username | username | not null| -| password | password | not null| -|requestByHttp| request inlong manager protocol | https: false , http: true| -|isReadProxyIPFromLocal|whether to read DataProxy ip from local|false| +### TcpMsgSenderConfig configuration +| parameter name | Parameter Description | default value | +|---------------------| ------ |---------------------------| +| inlongGroupId | inlongGroupId | not null | +| inlongStreamId | inlongStreamId | not null | +| username | username | not null | +| password | password | not null | +| visitMgrByHttps | request inlong manager protocol | https: true , http: false | +| useLocalMetaConfig |whether to read DataProxy ip from local| false | ### Call the send interface to report data The SDK data send interface is thread safe, support send single or multiple messages by sync and async two ways. The following demo uses a single sync way to send, and the message does not contain property information: + ```java -public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, String inlongStreamId, String messageBody, long dt) { - SendResult result = null; + public void sendTcpMessage(TcpMsgSender sender, + String inlongGroupId, String inlongStreamId, long dt, String messageBody) { + ProcessResult procResult = new ProcessResult(); try { - // Sends a single message in sync mode, and does not contain property information - result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId, - 0, String.valueOf(dt), 20, TimeUnit.SECONDS); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); + // Sends a single message in sync mode, and does not contain property information + sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId, + dt, null, messageBody.getBytes(StandardCharsets.UTF_8)), procResult); + } catch (Throwable ex) { + System.out.println("Message sent throw exception, " + ex); + return; } - logger.info("messageSender {}", result); + System.out.println("Message sent result = " + procResult); } ``` -You can also choose different send interfaces to report data according to your business needs. For the details of the interface, please refer to the definition in the [MessageSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java) interface file, which has a detailed introduction, no additional explanation here. +You can also choose different send interfaces to report data according to your business needs. For the details of the interface, please refer to the definition in the [TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java) interface file, which has a detailed introduction, no additional explanation here. ### Close SDK - -You can call the `close()` function of the MessageSender interface object to stop data reporting: +Since we create and reuse Sender objects through the Sender object factory, we close the data reporting service by calling the shutdownAll() function of the factory when exiting. ```java -sender.close(); // close the sender +senderFactory.shutdownAll(); ``` ## Warning - The `MessageSender` interface object is initialized based on the `inlongGroupId`, so each `MessageSender` object can be used differently based on the `inlongGroupId`, and multiple `MessageSender` objects can be created in the same process. -- The SDK provides three different network interaction ways: TCP, HTTP, and UDP. Examples of these three ways are given in the [example](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example) (refer to `TcpClientExample.java`, `HttpClientExample.java`, `UdpClientExample.java`), and the business can be customized according to its own needs to initialize different `MessageSender` object. +- The SDK provides three different network interaction ways: TCP, HTTP. Examples of these three ways are given in the [example](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example) (refer to `TcpClientExample.java`, `HttpClientExample.java`, `UdpClientExample.java`), and the business can be customized according to its own needs to initialize different `MessageSender` object. - The SDK contains complex network interactions, `MessageSender` should be used as a resident object. Avoid frequent initialization and shutdown of `MessageSender` (frequent initialization and shutdown will have a large resource overhead and will affect the timeliness of data reporting). - The SDK does not resend the failed message. When using the SDK to report data, if send fails, you need to decide whether to resend according to your own needs. ## Error Code Introduction Common error codes are as follows. -| Code | Explain | Remarks | -|---------------------------------------|------------------------------|-----------------------------------------------| -| SendResult.OK | Successfully sent | | -| SendResult.TIMEOUT | Request response timeout | | -| SendResult.CONNECTION_BREAK | Connection is breaked | | -| SendResult.THREAD_INTERRUPT | Interrupt | | -| SendResult.ASYNC_CALLBACK_BUFFER_FULL | Async callback buffer full | In this case, generally, it is caused by the speed of production data exceeding the response speed of the server. It is recommended to properly sleep when send to avoid blocking. | -| SendResult.NO_CONNECTION | No available connection | In this case, it is recommended to increase the number of available connections. | -| SendResult.INVALID_DATA | Invalid data, failed to report data to DataProxy via HTTP | | -| SendResult.INVALID_ATTRIBUTES | The packets sent are incorrect, such as empty packets or packets containing predefined attributes of the system | | -| SendResult.UNKOWN_ERROR | Unknown error | | - -## ProxyClientConfig Configuration Introduction - -| Parameter | Explain | Adjustment Suggestion | -|-----------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------| -| setAliveConnections(int aliveConnections) | Set the number of DataProxy connections. Default: 3. | 1) If the amount of data is large or sensitive to delay, increase this parameter appropriately; 2) According to the size of the DataProxy cluster, adjust this parameter appropriately. For example, if the cluster size is 30, this value can be set to 5 ~ 10; 3) Experience value 15 ~ 20. | -| setTotalAsyncCallbackSize(int asyncCallbackSize) | Set the size of SDK internal buffer queue during async send. The buffer is used to store packets that have been sent but have not received an Ack from the dataProxy. When the buffer reaches this threshold, continue to send data, and will receive an ASYNC_CALLBACK_BUFFER_FULL exception. Default: 50000. | 1) Normally, there is no need to adjust this parameter; 2) When the amount of data is very large or the load of DataProxy is high, it can be increased appropriately. Be careful not to be too large, which may cause OOM. | -| setConnectTimeoutMillis(long connectTimeoutMillis) | Set the connection timeout interval. Unit: ms, Default: 40000. | Set according to the actual environment. | -| setRequestTimeoutMillis(long requestTimeoutMillis) | Set request timeout interval. Unit: ms, Default: 40000. | Adjust settings as needed. | -| setMaxTimeoutCnt(int maxTimeoutCnt) | Set the number of timeout disconnections of a single DataProxy connection. The SDK will internally count the DataProxy connections that have timed out and have not received an Ack. If the timeout times of a connection reach the value within a short period of time, the SDK automatically disconnects the connection and selects another DataProxy to create a new connection for data reporting. Default value: 3.| If the size of the DataProxy cluster is small, you can appropriately increase this parameter to avoid frequent disconnection in a short time. | -| setManagerConnectionTimeout(int managerConnectionTimeout) | Set the timeout interval for SDK connection to InLong Manager. Unit: ms, Default: 10000. | 1) When the network environment is not good, the value can be increased appropriately; 2) When the client takes a long time to resolve the domain name, the value can be increased appropriately. | -| setManagerSocketTimeout(int managerSocketTimeout) | Sets the timeout for the SDK to get the DataProxy list from the InLong Manager connection, Unit: ms, Default: 30000. | When the network environment is not good, the value can be increased appropriately. | \ No newline at end of file +| Code | Explain | Remarks | +|---------------------------------------|----------------------------------------|-----------------------------------------------| +| ErrorCode.OK | Successfully sent | | +| ErrorCode.SDK_CLOSED | SDK has closed | | +| ErrorCode.FETCH_PROXY_META_FAILURE | SDK failed to obtain metadata | | +| ErrorCode.EMPTY_ACTIVE_NODE_SET | No active nodes available | | +| ErrorCode.EMPTY_WRITABLE_NODE_SET | All nodes are not writable | | +| ErrorCode.NO_VALID_REMOTE_NODE | No available connection | In this case, it is recommended to increase the number of available connections. | +| ErrorCode.REPORT_INFO_EXCEED_MAX_LEN | The reported data exceeds the maximum allowed length | | +| ErrorCode.CONNECTION_UNAVAILABLE | Connection unavailable | | +| ErrorCode.CONNECTION_BREAK | Connection is breaked | | +| ErrorCode.CONNECTION_UNWRITABLE | Connection not writable | This is usually caused by the front-end producing data faster than the server's response speed. It is recommended to sleep appropriately when sending to avoid blocking. | +| ErrorCode.CONNECTION_WRITE_EXCEPTION | Write report request process exception | | +| ErrorCode.SEND_WAIT_INTERRUPT | Interrupt | | +| ErrorCode.SEND_WAIT_TIMEOUT | Request response timeout | | +| ErrorCode.SEND_ON_EXCEPTION | Send request exception | | +| ErrorCode.UNKOWN_ERROR | Unknown error | | + + +## TcpMsgSenderConfig Configuration Introduction + +| Parameter | Explain | Adjustment Suggestion | +|-----------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------| +| setAliveConnections(int aliveConnections) | Set the number of DataProxy connections. Default: 7. | 1) If the amount of data is large or sensitive to delay, increase this parameter appropriately; 2) According to the size of the DataProxy cluster, adjust this parameter appropriately. For example, if the cluster size is 30, this value can be set to 5 ~ 10; 3) Experience value 15 ~ 20. | +| setSendBufferSize(int sendBufferSize) | Set the size of SDK internal buffer queue during async send. The buffer is used to store packets that have been sent but have not received an Ack from the dataProxy. When the buffer reaches this threshold, continue to send data, and will receive an ErrorCode.CONNECTION_UNWRITABLE exception. Default: 16 * 1024 * 1024 Bytes. | 1) Normally, there is no need to adjust this parameter; 2) When the amount of data is very large or the load of DataProxy is high, it can be increased appropriately. Be careful not to be too large, which may cause OOM. | +| setConnectTimeoutMs(int connectTimeoutMs) | Set the connection timeout interval. Unit: ms, Default: 8000. | Set according to the actual environment. | +| setRequestTimeoutMs(long requestTimeoutMs) | Set request timeout interval. Unit: ms, Default: 10000. | Adjust settings as needed. | +| setMaxAllowedSyncMsgTimeoutCnt(int maxAllowedSyncMsgTimeoutCnt) | Set the number of timeout disconnections of a single DataProxy connection. The SDK will internally count the DataProxy connections that have timed out and have not received an Ack. If the timeout times of a connection reach the value within a short period of time, the SDK automatically disconnects the connection and selects another DataProxy to create a new connection for data reporting. Default value: 10. | If the size of the DataProxy cluster is small, you can appropriately increase this parameter to avoid frequent disconnection in a short time. | +| setMgrConnTimeoutMs(int mgrConnTimeoutMs) | Set the timeout interval for SDK connection to InLong Manager. Unit: ms, Default: 8000. | 1) When the network environment is not good, the value can be increased appropriately; 2) When the client takes a long time to resolve the domain name, the value can be increased appropriately. | +| setMgrSocketTimeoutMs(int mgrSocketTimeoutMs) | Sets the timeout for the SDK to get the DataProxy list from the InLong Manager connection, Unit: ms, Default: 15000. | When the network environment is not good, the value can be increased appropriately. | \ No newline at end of file diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md index db64a6e5de9..6b2e5a55d35 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/sdk/dataproxy-sdk/java.md @@ -10,8 +10,8 @@ import {siteVariables} from '../../version'; ## 引入 Java SDK 需要在项目中包含 SDK 的头文件和库,进行 SDK 的使用。头文件和库提供以下两种获取方式: -- 获取源码自行编译并将 SDK 包部署到本地仓库,见[如何编译](https://inlong.apache.org/docs/next/quick_start/how_to_build/); -- 直接引用Apache仓库里的已有库,见 +- 获取源码自行编译并将 SDK 包部署到本地仓库,见[如何编译](https://inlong.apache.org/zh-CN/docs/next/development/how_to_build); +- 直接引用 Apache 仓库里的已有库,见

 {`
     org.apache.inlong
@@ -22,104 +22,104 @@ import {siteVariables} from '../../version';
 
## 数据上报流程 -引入 SDK 后,通过实例化一个[MessageSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java)接口对象后,调用相关的同步(sendMessage())或 异步(asyncSendMessage())接口来完成单条或多条(批量)数据的上报任务。发送Demo可参考 [TcpClientExample.java](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java)。 +引入 SDK 后,通过实例化一个 [TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java) 接口对象后,调用相关的同步(sendMessage())或 异步(asyncSendMessage())接口来完成单条或多条(批量)数据的上报任务。发送 Demo 可参考 [TcpClientExample.java](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/TcpClientExample.java)。 整体流程包括以下三个步骤: ### 初始化 SDK -从Demo示例代码我们可以看到,客户端初始化主要是在 `getMessageSender()` 函数中完成: +从 Demo 示例代码我们可以看到,客户端初始化主要是在 `getMessageSender()` 函数中完成: ```java -public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, - String inlongGroupId, boolean requestByHttp, boolean isReadProxyIPFromLocal, - String configBasePath, int msgType) { - ProxyClientConfig dataProxyConfig = null; - DefaultMessageSender messageSender = null; - try { - // 初始化客户端配置,其中“admin,“inlong”是InLong-Manager的用户名和密码,实际使用时需要根据环境配置进行更替 - dataProxyConfig = new ProxyClientConfig(localIP, requestByHttp, inLongManagerAddr, - Integer.valueOf(inLongManagerPort), inlongGroupId, "admin", "inlong"); - // 设置配置信息的本地保存路径,该设置可选,缺省情况下 SDK 会在当前用户工作目录下构造一个"/.inlong/"目录存储配置数据 - if (StringUtils.isNotEmpty(configBasePath)) { - dataProxyConfig.setConfStoreBasePath(configBasePath); - } - // 设置是否允许使用本地保存的配置信息,该设置可选,缺省不启用 - dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal); - // 设置采用TCP协议进行数据传输 - - dataProxyConfig.setProtocolType(ProtocolType.TCP); - // 初始化MessageSender对象,异常将抛异常 - messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig); - // 设置 SDK 与DataProxy间消息发送的消息类型,该设置可选,缺省默认为7以二进制形式进行数据发送 - messageSender.setMsgtype(msgType); - } catch (Exception e) { - logger.error("getMessageSender has exception e = {}", e); - } - // 返回sender - return messageSender; + public TcpMsgSender getMessageSender(MsgSenderFactory senderFactory, boolean visitMgrByHttps, + String managerAddr, String managerPort, String inlongGroupId, int msgType, + boolean useLocalMetaConfig, String configBasePath) { + TcpMsgSender messageSender = null; + try { + // build sender configure + TcpMsgSenderConfig tcpConfig = + new TcpMsgSenderConfig(visitMgrByHttps, managerAddr, + Integer.parseInt(managerPort), inlongGroupId, "admin", "inlong"); + tcpConfig.setMetaStoreBasePath(configBasePath); + tcpConfig.setOnlyUseLocalProxyConfig(useLocalMetaConfig); + tcpConfig.setSdkMsgType(MsgType.valueOf(msgType)); + tcpConfig.setRequestTimeoutMs(20000L); + // build sender object + messageSender = senderFactory.genTcpSenderByClusterId(tcpConfig); + } catch (Throwable ex) { + System.out.println("Get MessageSender throw exception, " + ex); } + return messageSender; +} ``` -### 配置参数 -| 参数名 | 参数说明 | 默认值 | -| ------ | ------ | -------| -| inlongGroupId | inlongGroupId | not null | -| inlongStreamId | inlongStreamId | not null | -| username | 用户名 |not null| -| password | 密码 |not null| -| requestByHttp| 请求inlong Manager协议 |https: false , http: true| -|isReadProxyIPFromLocal|是否从本地读取 DataProxy Ip|false| +### TcpMsgSenderConfig 配置参数 +| 参数名 | 参数说明 | 默认值 | +|----------------------|----------------------|---------------------------| +| inlongGroupId | inlongGroupId | not null | +| inlongStreamId | inlongStreamId | not null | +| username | 用户名 | not null | +| password | 密码 | not null | +| visitMgrByHttps | 请求 inlong Manager 协议 | https: true , http: false | +| useLocalMetaConfig | 是否从本地读取 DataProxy IP | false | ### 调用发送接口进行数据上报 -SDK 的数据发送接口时线程安全的,支持以同步或者异步模式发送单条或多条消息。Demo里采用的是单条同步消息发送,并且消息中不包含属性信息: +SDK 的数据发送接口时线程安全的,支持以同步或者异步模式发送单条或多条消息。Demo 里采用的是单条同步消息发送,并且消息中不包含属性信息: ```java -public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, String inlongStreamId, String messageBody, long dt) { - SendResult result = null; + public void sendTcpMessage(TcpMsgSender sender, + String inlongGroupId, String inlongStreamId, long dt, String messageBody) { + ProcessResult procResult = new ProcessResult(); try { - // 以同步模式发送单条消息,不携带属性信息 - result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId, - 0, String.valueOf(dt), 20, TimeUnit.SECONDS); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); + sender.sendMessage(new TcpEventInfo(inlongGroupId, inlongStreamId, + dt, null, messageBody.getBytes(StandardCharsets.UTF_8)), procResult); + } catch (Throwable ex) { + System.out.println("Message sent throw exception, " + ex); + return; } - logger.info("messageSender {}", result); + System.out.println("Message sent result = " + procResult); + logger.info("Message sent result = {}", procResult); } ``` -大家还可以根据业务需要选择不同的发送接口进行数据上报,具体接口细节可以参考[MessageSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/MessageSender.java)接口文件中的定义,里面有详细的接口使用及参数定义介绍,这里不做额外说明。 +大家还可以根据业务需要选择不同的发送接口进行数据上报,具体接口细节可以参考 [TcpMsgSender](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/sender/tcp/TcpMsgSender.java) 接口文件中的定义,里面有详细的接口使用及参数定义介绍,这里不做额外说明。 ### 关闭 SDK -可以调用MessageSender接口对象的close()函数关闭数据上报服务: +由于我们通过 Sender 对象工厂来创建和复用 Sender 对象,在退出上报服务时我们通过调用工厂的 shutdownAll() 函数关闭数据上报服务: ```java -sender.close(); +senderFactory.shutdownAll(); ``` ## 注意事项 -- `MessageSender` 接口对象是基于 `inlongGroupId` 进行初始化,因而每个 `MessageSender` 对象基于 `inlongGroupId` 区别使用,同一个进程内允许创建多个 `MessageSender` 对象; -- SDK 封装了TCP、HTTP、UDP共三种不同的网络交互方式,并在[example](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example)目录里给出了3种方式的不同示例(参考TcpClientExample.java,HttpClientExample.java,UdpClientExample.java实现),业务可以根据自身需要来初始化不同的MessageSender对象; -- SDK 中包含了复杂的网络交互,使用时需要将 SDK 作为常驻服务对象来使用,避免同个进程中途频繁地初始化和关闭MessageSender对象(重复初始化和关闭会带来很大的资源开销,并且影响数据上报的时效性); +- `MessageSender` 接口对象是基于 `inlongGroupId` 进行初始化,因而每个 `TcpMsgSender` 对象基于 `inlongGroupId` 区别使用,同一个进程内允许创建多个 `MessageSender` 对象; +- SDK 封装了 TCP、HTTP、UDP 共三种不同的网络交互方式,并在[example](https://github.com/apache/inlong/blob/master/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example)目录里给出了 3 种方式的不同示例(参考 TcpClientExample.java,HttpClientExample.java,UdpClientExample.java 实现),业务可以根据自身需要来初始化不同的 MessageSender 对象; +- SDK 中包含了复杂的网络交互,使用时需要将 SDK 作为常驻服务对象来使用,避免同个进程中途频繁地初始化和关闭 MessageSender 对象(重复初始化和关闭会带来很大的资源开销,并且影响数据上报的时效性); - SDK 不对发送失败的消息做重发处理,用户在使用 SDK 上报数据时遇到发送失败,业务要根据自身数据要求来决定是否重发消息,避免数据丢失。 ## 错误码介绍 -常见result会有以下几种值 +错误码参考 ErrorCode 枚举类定义,常见错误码会有以下几种值 -| 返回值 | 含义 | 备注 | -|---------------------------------------|------------------------------|-----------------------------------------------| -| SendResult.OK | 消息发送成功 | | -| SendResult.TIMEOUT | 请求响应超时 | | -| SendResult.CONNECTION_BREAK | 链接被断开 | | -| SendResult.THREAD_INTERRUPT | 中断 | | -| SendResult.ASYNC_CALLBACK_BUFFER_FULL | SDK 待回包请求消息满 | 这种情况一般为前端生产数据的速度超过服务端的响应速度导致,建议发送时适当sleep避免阻塞 | -| SendResult.NO_CONNECTION | 没有可用链接 | 这种情况建议业务增大可用链接数 | -| SendResult.INVALID_DATA | 数据无效,通过HTTP上报数据DataProxy返回失败 | | -| SendResult.INVALID_ATTRIBUTES | 发送的数据包不合理,比如为空数据包或包含了系统预定义属性 | | -| SendResult.UNKOWN_ERROR | 未知错误 | | +| 返回值 | 含义 | 备注 | +|----------------------------------------|----------------------|-------------------| +| ErrorCode.OK | 消息发送成功 | | +| ErrorCode.SDK_CLOSED | SDK 已关闭 | | +| ErrorCode.FETCH_PROXY_META_FAILURE | SDK 获取元数据失败 | | +| ErrorCode.EMPTY_ACTIVE_NODE_SET | 无可用 active 节点 | | +| ErrorCode.EMPTY_WRITABLE_NODE_SET | 所有节点都不可写 | | +| ErrorCode.NO_VALID_REMOTE_NODE | 所有 active 节点都无效,无可用节点 | 这种情况建议业务增大可用链接数 | +| ErrorCode.REPORT_INFO_EXCEED_MAX_LEN | 上报数据超最大允许长度 | | +| ErrorCode.CONNECTION_UNAVAILABLE | 连接已无效 | 一般为前端生产数据的速度超过服务端的响应速度导致,建议发送时适当sleep避免阻塞 | +| ErrorCode.CONNECTION_BREAK | 链接被断开 | | +| ErrorCode.CONNECTION_UNWRITABLE | 链接不可写 | 一般为前端生产数据的速度超过服务端的响应速度导致,建议发送时适当sleep避免阻塞 | +| ErrorCode.CONNECTION_WRITE_EXCEPTION | 上报请求过程异常 | | +| ErrorCode.SEND_WAIT_INTERRUPT | 等待响应被中断 | | +| ErrorCode.SEND_WAIT_TIMEOUT | 请求响应超时 | | +| ErrorCode.SEND_ON_EXCEPTION | 发送请求异常 | | +| ErrorCode.UNKOWN_ERROR | 未知错误 | | -## ProxyClientConfig相关配置项介绍 +## TcpMsgSenderConfig 相关配置项介绍 -| 参数设置 | 说明 | 调整建议 | -|-----------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------| -| setAliveConnections(int aliveConnections) | 设置DataProxy连接数大小;默认值:3 | 1)数据量大或对时延敏感,适当增大该参数;2)根据DataProxy集群大小,适当调整该参数,比如集群规模为30,该值可设为5~10;3)现网经验值15~20 | -| setTotalAsyncCallbackSize(int asyncCallbackSize) | 设置异步发送时 SDK 内部缓冲队列大小;缓存队列用于暂存已发送但未收到服务端Ack的数据包。当缓冲数据达到该值,业务继续异步上报数据,会收到ASYNC_CALLBACK_BUFFER_FULL异常;默认值: 50000 | 1)通常无需调整该参数;2)数据量非常大或者DataProxy服务端负载较高情况下,可适当增大,注意不要太大导致OOM | -| setConnectTimeoutMillis(long connectTimeoutMillis) | 设置连接超时时长,单位ms,缺省40000 | 根据实际环境需要设置 | -| setRequestTimeoutMillis(long requestTimeoutMillis) | 设置请求超时时长,单位ms,缺省40000 | 根据需要调整设置 | -| setMaxTimeoutCnt(int maxTimeoutCnt) | 设置单个DataProxy连接超时断连次数; SDK 内部会对超时未收到Ack的DataProxy连接进行计数,短时间内同一个连接超时数达到该值,会主动断开该连接,选择其他DataProxy创建新的连接进行数据发送。默认值:3 | 如果DataProxy集群本身规模较小,可适当调大该参数,避免短时间频繁断连 | -| setManagerConnectionTimeout(int managerConnectionTimeout) | 设置 SDK 连接Manager的超时时长,单位ms,默认10000ms | 1)网络环境不好的情况下可适当增大该值;2)客户端解析域名时间较长情况下可适当增大该值 | -| setManagerSocketTimeout(int managerSocketTimeout) | 设置 SDK 从Manager连接读取DataProxy列表的超时时间,单位ms,默认值30000 | 网络环境不好的情况下可适当增大该值 | \ No newline at end of file +| 参数设置 | 说明 | 调整建议 | +|-----------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------| +| setAliveConnections(int aliveConnections) | 设置 DataProxy 连接数大小;默认值:7 | 1)数据量大或对时延敏感,适当增大该参数;2)根据 DataProxy 集群大小,适当调整该参数,比如集群规模为 30,该值可设为 5~10;3)现网经验值 15~20 | +| setSendBufferSize(int sendBufferSize) | 设置异步发送时 SDK netty 的发送缓冲队列大小;缓存队列用于暂存已发送但未收到服务端 Ack 的数据包。当缓冲区满时会收到 ErrorCode.CONNECTION_UNWRITABLE 异常;默认值: 16 * 1024 * 1024字节 | 1)通常无需调整该参数;2)数据量非常大或者 DataProxy 服务端负载较高情况下,可适当增大,注意不要太大导致 OOM | +| setConnectTimeoutMs(int connectTimeoutMs) | 设置连接超时时长,单位 ms,缺省 8000 | 根据实际环境需要设置 | +| setRequestTimeoutMs(long requestTimeoutMs) | 设置请求超时时长,单位 ms,缺省 10000 | 根据需要调整设置 | +| setMaxAllowedSyncMsgTimeoutCnt(int maxAllowedSyncMsgTimeoutCnt) | 设置单个 DataProxy 连接同步超时次数; SDK 内部会对每个 DataProxy 连接上的超时未收到 Ack 的同步请求进行计数,短时间内同一个连接超时数达到该值,会主动断开该连接,选择其他 DataProxy 创建新的连接进行数据发送。默认值:10 | 如果 DataProxy 集群本身规模较小,可适当调大该参数,避免短时间频繁断连 | +| setMgrConnTimeoutMs(int mgrConnTimeoutMs) | 设置 SDK 连接 Manager 的超时时长,单位 ms,默认 8000 | 1)网络环境不好的情况下可适当增大该值;2)客户端解析域名时间较长情况下可适当增大该值 | +| setMgrSocketTimeoutMs(int mgrSocketTimeoutMs) | 设置 SDK 从 Manager 连接读取 DataProxy 列表的超时时间,单位 ms,默认值 15000 | 网络环境不好的情况下可适当增大该值 | \ No newline at end of file