Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Added fragmented text message handling in WebSocketClientHandler #360

Merged
merged 6 commits into from
Aug 21, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package info.bitrich.xchangestream.service.netty;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClientHandler.class);
private final StringBuilder currentMessage = new StringBuilder();

public interface WebSocketMessageHandler {
public void onMessage(String message);
Expand Down Expand Up @@ -76,8 +77,9 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception

WebSocketFrame frame = (WebSocketFrame)msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
handler.onMessage(textFrame.text());
dealWithTextFrame((TextWebSocketFrame) frame);
} else if (frame instanceof ContinuationWebSocketFrame) {
dealWithContinuation((ContinuationWebSocketFrame) frame);
} else if (frame instanceof PingWebSocketFrame) {
LOG.debug("WebSocket Client received ping");
ch.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
Expand All @@ -89,6 +91,22 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception
}
}

private void dealWithTextFrame(TextWebSocketFrame frame) {
if (frame.isFinalFragment()) {
handler.onMessage(frame.text());
return;
}
currentMessage.append(frame.text());
}

private void dealWithContinuation(ContinuationWebSocketFrame frame) {
currentMessage.append(frame.text());
if (frame.isFinalFragment()) {
handler.onMessage(currentMessage.toString());
currentMessage.setLength(0);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("WebSocket client encountered exception ({} - {}). Closing", cause.getClass().getSimpleName(), cause.getMessage());
Expand Down