Skip to content

Commit

Permalink
[INLONG-11706][SDK] Optimize HTTP Sender implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
gosonzhang committed Jan 22, 2025
1 parent 59211cc commit b6782c9
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 27 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.inlong.sdk.dataproxy.MsgSenderMultiFactory;
import org.apache.inlong.sdk.dataproxy.MsgSenderSingleFactory;
import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.InLongTcpMsgSender;
import org.apache.inlong.sdk.dataproxy.sender.tcp.TcpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
Expand Down Expand Up @@ -74,6 +76,22 @@ public static void main(String[] args) throws Exception {
ProxyUtils.sleepSomeTime(10000L);
tcpMsgSender.close();

// report data by http
HttpMsgSenderConfig httpMsgSenderConfig = new HttpMsgSenderConfig(
false, managerIp, managerPort, groupId, secretId, secretKey);
InLongHttpMsgSender httpMsgSender =
singleFactory.genHttpSenderByGroupId(httpMsgSenderConfig);
if (!httpMsgSender.start(procResult)) {
System.out.println("Start http sender failure: process result=" + procResult);
}
ExampleUtils.sendHttpMessages(httpMsgSender, false, false,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ExampleUtils.sendHttpMessages(httpMsgSender, false, true,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ProxyUtils.sleepSomeTime(10000L);
httpMsgSender.close();
System.out.println("Cur singleton factory sender count is " + singleFactory.getMsgSenderCount());

// report data use multi-factory
MsgSenderMultiFactory multiFactory1 = new MsgSenderMultiFactory();
MsgSenderMultiFactory multiFactory2 = new MsgSenderMultiFactory();
Expand Down Expand Up @@ -104,6 +122,26 @@ public static void main(String[] args) throws Exception {
System.out.println("Multi-1.2 Cur multiFactory1 sender count = "
+ multiFactory1.getMsgSenderCount()
+ ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount());
// report data by http
InLongHttpMsgSender httpMsgSender1 =
multiFactory1.genHttpSenderByGroupId(httpMsgSenderConfig);
HttpMsgSenderConfig httpConfg2 = new HttpMsgSenderConfig(false,
managerIp, managerPort, groupId, secretId, secretKey);
InLongHttpMsgSender httpMsgSender2 =
multiFactory2.genHttpSenderByGroupId(httpConfg2);
ExampleUtils.sendHttpMessages(httpMsgSender1, false, false,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ExampleUtils.sendHttpMessages(httpMsgSender2, false, true,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ProxyUtils.sleepSomeTime(10000L);
httpMsgSender1.close();
System.out.println("Multi-2.1 Cur multiFactory1 sender count = "
+ multiFactory1.getMsgSenderCount()
+ ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount());
httpMsgSender2.close();
System.out.println("Multi-2.2 Cur multiFactory1 sender count = "
+ multiFactory1.getMsgSenderCount()
+ ", cur multiFactory2 sender count is " + multiFactory2.getMsgSenderCount());

// test self DefineFactory
ThreadFactory selfDefineFactory = new DefaultThreadFactory("test_self_thread_factory");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sdk.dataproxy.example;

import org.apache.inlong.sdk.dataproxy.common.ProcessResult;
import org.apache.inlong.sdk.dataproxy.sender.http.HttpMsgSenderConfig;
import org.apache.inlong.sdk.dataproxy.sender.http.InLongHttpMsgSender;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InLongHttpClientExample {

protected static final Logger logger = LoggerFactory.getLogger(InLongHttpClientExample.class);

public static void main(String[] args) throws Exception {

String managerIp = args[0];
String managerPort = args[1];
String groupId = args[2];
String streamId = args[3];
String secretId = args[4];
String secretKey = args[5];
int reqCnt = Integer.parseInt(args[6]);
int msgSize = 1024;
int msgCnt = 1;
if (args.length > 7) {
msgSize = Integer.parseInt(args[7]);
msgCnt = Integer.parseInt(args[8]);
}

String managerAddr = "http://" + managerIp + ":" + managerPort;

HttpMsgSenderConfig dataProxyConfig =
new HttpMsgSenderConfig(managerAddr, groupId, secretId, secretKey);
InLongHttpMsgSender messageSender = new InLongHttpMsgSender(dataProxyConfig);

ProcessResult procResult = new ProcessResult();
if (!messageSender.start(procResult)) {
System.out.println("Start http sender failure: process result=" + procResult);
}

System.out.println("InLongHttpMsgSender start, nodes="
+ messageSender.getProxyNodeInfos());

ExampleUtils.sendHttpMessages(messageSender, true, false,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ExampleUtils.sendHttpMessages(messageSender, true, true,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ExampleUtils.sendHttpMessages(messageSender, false, false,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);
ExampleUtils.sendHttpMessages(messageSender, false, true,
groupId, streamId, reqCnt, msgSize, msgCnt, procResult);

ProxyUtils.sleepSomeTime(10000L);
}
}

0 comments on commit b6782c9

Please sign in to comment.