Skip to content

Commit

Permalink
channel add get latency interface
Browse files Browse the repository at this point in the history
  • Loading branch information
xxo1shine committed Oct 27, 2022
1 parent 7e6f0d9 commit 58df2ed
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/main/java/org/tron/p2p/base/Parameter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public class Parameter {

public static final int NODE_CONNECTION_TIMEOUT = 2000;

public static final int KEEP_ALIVE_PERIOD = 20_000;
public static final int KEEP_ALIVE_TIMEOUT = 20_000;

public static final int PING_TIMEOUT = 20_000;

public static final int NETWORK_TIME_DIFF = 1000;

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/tron/p2p/connection/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
public class Channel {

public volatile boolean waitForPong = false;
public volatile long pingSent;
public volatile long pingSent = System.currentTimeMillis();;

private ChannelHandlerContext ctx;
@Getter
Expand Down Expand Up @@ -59,6 +59,9 @@ public class Channel {
private String nodeId;
@Getter
private boolean discoveryMode;
@Getter
private long latency;
private long count;

public void init(ChannelPipeline pipeline, String nodeId, boolean discoveryMode) {
this.discoveryMode = discoveryMode;
Expand Down Expand Up @@ -135,6 +138,12 @@ private void send(ByteBuf byteBuf, byte type) {
setLastSendTime(System.currentTimeMillis());
}

public void updateLatency(long latency) {
long total = this.latency * this.count;
this.count++;
this.latency = (total + latency) / this.count;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
7 changes: 5 additions & 2 deletions src/main/java/org/tron/p2p/connection/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ChannelManager {

private static byte tronHelloMessageType = 32;

private static byte tronDisconnectType = 33;

private static PeerServer peerServer;

@Getter
Expand Down Expand Up @@ -125,7 +127,8 @@ public static synchronized DisconnectCode processPeer(Channel channel) {
}

channels.put(channel.getInetSocketAddress(), channel);
log.info("Add peer {}, total peers: {}", channel, channels.size());
log.info("Add peer {}, total peers: {}",
channel.getInetSocketAddress(), channels.size());
return DisconnectCode.NORMAL;
}

Expand Down Expand Up @@ -171,7 +174,7 @@ private static void handMessage(Channel channel, byte[] data) throws P2pExceptio
}

if (!channel.isFinishHandshake()) {
if (tronHelloMessageType != data[0]) {
if (!channel.isFinishHandshake() && tronDisconnectType != data[0]) {
throw new P2pException(P2pException.TypeEnum.BAD_PROTOCOL, "type: " + data[0]);
}
channel.setFinishHandshake(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void processMessage(Channel channel, Message message) {
sendHelloMsg(channel, DisconnectCode.NORMAL);
}

channel.updateLatency(System.currentTimeMillis() - channel.getStartTime());
Parameter.handlerList.forEach(h -> h.onConnect(channel));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.p2p.connection.business.keepalive;

import static org.tron.p2p.base.Parameter.KEEP_ALIVE_PERIOD;
import static org.tron.p2p.base.Parameter.KEEP_ALIVE_TIMEOUT;
import static org.tron.p2p.base.Parameter.PING_TIMEOUT;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -27,11 +28,11 @@ public void init() {
.filter(p -> !p.isDisconnect())
.forEach(p -> {
if (p.waitForPong) {
if (now - p.pingSent > KEEP_ALIVE_PERIOD) {
if (now - p.pingSent > KEEP_ALIVE_TIMEOUT) {
p.close();
}
} else {
if (now - p.getLastSendTime() > KEEP_ALIVE_PERIOD) {
if (now - p.getLastSendTime() > PING_TIMEOUT) {
p.send(new PingMessage());
p.waitForPong = true;
p.pingSent = now;
Expand All @@ -55,6 +56,7 @@ public void processMessage(Channel channel, Message message) {
channel.send(new PongMessage());
break;
case KEEP_ALIVE_PONG:
channel.updateLatency(System.currentTimeMillis() - channel.pingSent);
channel.waitForPong = false;
break;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void initChannel(NioSocketChannel ch) {

// be aware of channel closing
ch.closeFuture().addListener((ChannelFutureListener) future -> {
log.info("Close channel:{}", channel);
log.info("Close channel:{}", channel.getInetSocketAddress());
if (!peerDiscoveryMode) {
ChannelManager.notifyDisconnect(channel);
}
Expand Down

0 comments on commit 58df2ed

Please sign in to comment.