diff --git a/.gitignore b/.gitignore index cf7b324..3b7bf2e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ -.gitignore target .project .classpath .settings -.iml -*~ +*.iml +.idea \ No newline at end of file diff --git a/pom.xml b/pom.xml index ede0210..18f01a0 100644 --- a/pom.xml +++ b/pom.xml @@ -1,178 +1,178 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + 4.0.0 - org.msgpack - msgpack-rpc - jar - 0.7.1-SNAPSHOT + org.msgpack + msgpack-rpc + jar + 0.7.1-SNAPSHOT - msgpack-rpc - http://msgpack.org/ + msgpack-rpc + http://msgpack.org/ - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - - + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + - - scm:git:git://github.com/msgpack/msgpack-rpc.git - scm:git:git://github.com/msgpack/msgpack-rpc.git - + + scm:git:git://github.com/msgpack/msgpack-rpc.git + scm:git:git://github.com/msgpack/msgpack-rpc.git + - - - - src/main/resources - - - - - src/test/resources - - + + + + src/main/resources + + + + + src/test/resources + + - - - maven-compiler-plugin - 2.3.2 - - 1.5 - 1.5 - - + + + maven-compiler-plugin + 2.3.2 + + 1.5 + 1.5 + + - - maven-eclipse-plugin - 2.5.1 - + + maven-eclipse-plugin + 2.5.1 + - - maven-release-plugin - 2.1 - - - deploy - scm:git:git://github.com/msgpack/msgpack-rpc.git - - - - + + maven-release-plugin + 2.1 + + + deploy + scm:git:git://github.com/msgpack/msgpack-rpc.git + + + + - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.8.1 - - ${project.name} ${project.version} API - true - en_US - UTF-8 - - + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8.1 + + ${project.name} ${project.version} API + true + en_US + UTF-8 + + - - org.apache.maven.plugins - maven-jxr-plugin - 2.2 - + + org.apache.maven.plugins + maven-jxr-plugin + 2.2 + - - org.apache.maven.plugins - maven-surefire-report-plugin - 2.11 - - - + + org.apache.maven.plugins + maven-surefire-report-plugin + 2.11 + + + - - - repository.jboss.org - https://repository.jboss.org/nexus/content/groups/public/ - - false - - - + + + repository.jboss.org + https://repository.jboss.org/nexus/content/groups/public/ + + false + + + - - - org.msgpack - msgpack - 0.6.6 - compile - - - org.jboss.netty - netty - 3.2.1.Final - - - javax.servlet - servlet-api - - - commons-logging - commons-logging - - - compile - - - org.slf4j - slf4j-api - 1.6.1 - - - org.slf4j - slf4j-log4j12 - 1.6.1 - - - junit - junit - 4.8.2 - test - - + + + org.msgpack + msgpack + 0.6.6 + compile + + + io.netty + netty-all + 4.0.14.Final + + + javax.servlet + servlet-api + + + commons-logging + commons-logging + + + compile + + + org.slf4j + slf4j-api + 1.6.1 + + + org.slf4j + slf4j-log4j12 + 1.6.1 + + + junit + junit + 4.8.2 + test + + - - - false - msgpack.org - Repository at msgpack.org - file://${project.build.directory}/website/maven2/ - - - true - msgpack.org - Repository at msgpack.org - file://${project.build.directory}/website/maven2/ - - + + + false + msgpack.org + Repository at msgpack.org + file://${project.build.directory}/website/maven2/ + + + true + msgpack.org + Repository at msgpack.org + file://${project.build.directory}/website/maven2/ + + - - - release - - - - true - org.apache.maven.plugins - maven-deploy-plugin - 2.4 - - true - - - - - - - + + + release + + + + true + org.apache.maven.plugins + maven-deploy-plugin + 2.4 + + true + + + + + + + \ No newline at end of file diff --git a/settings.xml b/settings.xml new file mode 100644 index 0000000..616c3f4 --- /dev/null +++ b/settings.xml @@ -0,0 +1,4 @@ + + + /Users/huzhou/maven.repo + \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/Request.java b/src/main/java/org/msgpack/rpc/Request.java index a0007e1..8ef4759 100644 --- a/src/main/java/org/msgpack/rpc/Request.java +++ b/src/main/java/org/msgpack/rpc/Request.java @@ -22,7 +22,9 @@ import org.msgpack.rpc.transport.MessageSendable; public class Request implements Callback { + private MessageSendable channel; // TODO #SF synchronized? + private int msgid; private String method; private Value args; @@ -66,9 +68,11 @@ public void sendError(Object error, Object data) { } public synchronized void sendResponse(Object result, Object error) { + if (channel == null) { return; } + ResponseMessage msg = new ResponseMessage(msgid, error, result); channel.sendMessage(msg); channel = null; diff --git a/src/main/java/org/msgpack/rpc/Server.java b/src/main/java/org/msgpack/rpc/Server.java index ba027d5..9502504 100644 --- a/src/main/java/org/msgpack/rpc/Server.java +++ b/src/main/java/org/msgpack/rpc/Server.java @@ -106,7 +106,8 @@ public void onRequest(MessageSendable channel, int msgid, String method, Value a Request request = new Request(channel, msgid, method, args); try { dp.dispatch(request); - } catch (RPCError e) { + } + catch (RPCError e) { // FIXME request.sendError(e.getCode(), e); } catch (Exception e) { diff --git a/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java b/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java index 3d9fdbb..2d5ccff 100644 --- a/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java +++ b/src/main/java/org/msgpack/rpc/dispatcher/Dispatcher.java @@ -20,5 +20,6 @@ import org.msgpack.rpc.Request; public interface Dispatcher { + void dispatch(Request request) throws Exception; } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java index 41f2c5a..11a95a6 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/ChannelAdaptor.java @@ -17,22 +17,25 @@ // package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; +import io.netty.channel.Channel; import org.msgpack.rpc.transport.ClientTransport; class ChannelAdaptor implements ClientTransport { - private Channel channel; - ChannelAdaptor(Channel channel) { - this.channel = channel; + private final Channel _channel; + + protected ChannelAdaptor(final Channel channel) { + + _channel = channel; } public void sendMessage(Object msg) { - Channels.write(channel, msg); + + _channel.writeAndFlush(msg); } public void close() { - channel.close(); + + _channel.close(); } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java index 5141b5c..56df020 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessageHandler.java @@ -1,53 +1,28 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import org.msgpack.rpc.transport.RpcMessageHandler; import org.msgpack.type.Value; -class MessageHandler extends SimpleChannelUpstreamHandler { - private RpcMessageHandler handler; - private ChannelAdaptor adaptor; +class MessageHandler extends ChannelInboundHandlerAdapter { - MessageHandler(RpcMessageHandler handler) { - this.handler = handler; + private final RpcMessageHandler _rpcHandler; + + public MessageHandler(final RpcMessageHandler rpcHandler){ + _rpcHandler = rpcHandler; } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - this.adaptor = new ChannelAdaptor(e.getChannel()); - ctx.sendUpstream(e); + public void channelRead(ChannelHandlerContext ctx, Object msg) { + + _rpcHandler.handleMessage(new ChannelAdaptor(ctx.channel()), (Value) msg); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { - Object m = e.getMessage(); - if (!(m instanceof Value)) { - ctx.sendUpstream(e); - return; - } - - Value msg = (Value) m; - handler.handleMessage(adaptor, msg); + public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) throws Exception { + + cause.printStackTrace(); + ctx.close(); } -} +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java index f977328..8d34b90 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackDecoder.java @@ -1,64 +1,40 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// package org.msgpack.rpc.loop.netty; -import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; import org.msgpack.MessagePack; -import org.msgpack.type.Value; +import org.msgpack.unpacker.Unpacker; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.List; -public class MessagePackDecoder extends OneToOneDecoder { +class MessagePackDecoder extends ByteToMessageDecoder { - MessagePack messagePack; + private final MessagePack _msgpack; - public MessagePackDecoder(MessagePack messagePack) { - super(); - this.messagePack = messagePack; + public MessagePackDecoder(final MessagePack msgpack){ + _msgpack = msgpack; } @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) { - return msg; - } + protected void decode(final ChannelHandlerContext channelHandlerContext, + final ByteBuf byteBuf, + final List out) throws Exception { - ChannelBuffer source = (ChannelBuffer) msg; + final ByteBuffer buffer = byteBuf.markReaderIndex().nioBuffer().slice(); - ByteBuffer buffer = source.toByteBuffer(); - if (!buffer.hasRemaining()) { - return null; - } + try{ + Unpacker unpacker = _msgpack.createBufferUnpacker(buffer); - byte[] bytes = buffer.array(); // FIXME buffer must has array - int offset = buffer.arrayOffset() + buffer.position(); - int length = buffer.arrayOffset() + buffer.limit(); + out.add(unpacker.readValue()); - Value v = messagePack.read(bytes, offset, length); - return v; + byteBuf.skipBytes(buffer.position()); + } + catch(EOFException e){ - // TODO MessagePack.unpack() - /* - * Unpacker pac = new Unpacker(); pac.wrap(bytes, offset, length); - * return pac.unpackObject(); - */ + byteBuf.resetReaderIndex(); + } } -} +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java index 2a8cc27..2472e5e 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackEncoder.java @@ -1,59 +1,23 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// package org.msgpack.rpc.loop.netty; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack; +import org.msgpack.type.Value; -public class MessagePackEncoder extends OneToOneEncoder { - private final int estimatedLength; +class MessagePackEncoder extends MessageToByteEncoder { - private MessagePack messagePack; + private final MessagePack _msgpack; - public MessagePackEncoder(MessagePack messagePack) { - this(1024, messagePack); - } - - public MessagePackEncoder(int estimatedLength, MessagePack messagePack) { - this.estimatedLength = estimatedLength; - this.messagePack = messagePack; + public MessagePackEncoder(final MessagePack msgpack){ + _msgpack = msgpack; } @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (msg instanceof ChannelBuffer) { - return msg; - } - - ChannelBufferOutputStream out = new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(estimatedLength, ctx.getChannel() - .getConfig().getBufferFactory())); - - // MessagePack.pack(out, msg); - messagePack.write(out, msg); + protected void encode(ChannelHandlerContext ctx, Value msg, ByteBuf out) throws Exception { - ChannelBuffer result = out.buffer(); - return result; + _msgpack.createPacker(new ByteBufOutputStream(out)).write(msg); } -} +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java deleted file mode 100644 index edfdb7e..0000000 --- a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackStreamDecoder.java +++ /dev/null @@ -1,67 +0,0 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import java.io.ByteArrayInputStream; -import java.io.EOFException; -import java.nio.ByteBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.handler.codec.frame.FrameDecoder; -import org.msgpack.MessagePack; -import org.msgpack.type.Value; -import org.msgpack.unpacker.Unpacker; - -public class MessagePackStreamDecoder extends FrameDecoder { - protected MessagePack msgpack; - - public MessagePackStreamDecoder(MessagePack msgpack) { - super(); - this.msgpack = msgpack; - } - - @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer source) throws Exception { - // TODO #MN will modify the body with MessagePackBufferUnpacker. - ByteBuffer buffer = source.toByteBuffer(); - if (!buffer.hasRemaining()) { - return null; - } - source.markReaderIndex(); - - byte[] bytes = buffer.array(); // FIXME buffer must has array - int offset = buffer.arrayOffset() + buffer.position(); - int length = buffer.arrayOffset() + buffer.limit(); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes, offset, - length); - int startAvailable = stream.available(); - try{ - Unpacker unpacker = msgpack.createUnpacker(stream); - Value v = unpacker.readValue(); - source.skipBytes(startAvailable - stream.available()); - return v; - }catch( EOFException e ){ - // not enough buffers. - // So retry reading - source.resetReaderIndex(); - return null; - } - } -} diff --git a/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java new file mode 100644 index 0000000..65ef611 --- /dev/null +++ b/src/main/java/org/msgpack/rpc/loop/netty/MessagePackableEncoder.java @@ -0,0 +1,25 @@ +package org.msgpack.rpc.loop.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.MessageToByteEncoder; +import org.msgpack.MessagePack; +import org.msgpack.MessagePackable; + +class MessagePackableEncoder extends MessageToByteEncoder { + + private final MessagePack _msgpack; + + public MessagePackableEncoder(final MessagePack msgpack){ + + _msgpack = msgpack; + } + + @Override + protected void encode(ChannelHandlerContext ctx, MessagePackable msg, ByteBuf out) throws Exception { + + msg.writeTo(_msgpack.createPacker(new ByteBufOutputStream(out))); + } +} \ No newline at end of file diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java index cf71a25..f6c4308 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyEventLoop.java @@ -19,10 +19,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.msgpack.MessagePack; import org.msgpack.rpc.Session; import org.msgpack.rpc.Server; @@ -39,27 +35,6 @@ public NettyEventLoop(ExecutorService workerExecutor, super(workerExecutor, ioExecutor, scheduledExecutor, messagePack); } - private ClientSocketChannelFactory clientFactory = null; - private ServerSocketChannelFactory serverFactory = null; - - public synchronized ClientSocketChannelFactory getClientFactory() { - if (clientFactory == null) { - clientFactory = new NioClientSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - } - return clientFactory; - } - - public synchronized ServerSocketChannelFactory getServerFactory() { - if (serverFactory == null) { - serverFactory = new NioServerSocketChannelFactory(getIoExecutor(), - getWorkerExecutor()); // TODO: workerCount - // messages will be dispatched to worker thread on server. - // see useThread(true) in NettyTcpClientTransport(). - } - return serverFactory; - } - protected ClientTransport openTcpTransport(TcpClientConfig config, Session session) { return new NettyTcpClientTransport(config, session, this); @@ -67,6 +42,13 @@ protected ClientTransport openTcpTransport(TcpClientConfig config, protected ServerTransport listenTcpTransport(TcpServerConfig config, Server server) { - return new NettyTcpServerTransport(config, server, this); + + try{ + return new NettyTcpServerTransport(config, server, this); + } + catch(Exception ex){ + ex.printStackTrace(); + return null; + } } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java index b9115fd..709cbf4 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpClientTransport.java @@ -17,101 +17,115 @@ // package org.msgpack.rpc.loop.netty; -import java.util.Map; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.buffer.ChannelBufferOutputStream; -import org.jboss.netty.buffer.HeapChannelBufferFactory; -import org.jboss.netty.bootstrap.ClientBootstrap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.TcpClientConfig; +import org.msgpack.rpc.transport.ClientTransport; import org.msgpack.rpc.transport.RpcMessageHandler; -import org.msgpack.rpc.transport.PooledStreamClientTransport; -class NettyTcpClientTransport extends PooledStreamClientTransport { - private static final String TCP_NO_DELAY = "tcpNoDelay"; +class NettyTcpClientTransport implements ClientTransport { - private final ClientBootstrap bootstrap; + private final Session _session; + private final Bootstrap _bootstrap; + private final AtomicInteger _availables = new AtomicInteger(1024); + private final ConcurrentLinkedQueue _writables; + + NettyTcpClientTransport(final TcpClientConfig config, + final Session session, + final NettyEventLoop loop) { - NettyTcpClientTransport(TcpClientConfig config, Session session, - NettyEventLoop loop) { // TODO check session.getAddress() instanceof IPAddress - super(config, session); + final RpcMessageHandler handler = new RpcMessageHandler(session); + + _bootstrap = new Bootstrap() + .group(new NioEventLoopGroup(/*2*/)) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.round((float) config.getConnectTimeout())) + .option(ChannelOption.TCP_NODELAY, !Boolean.FALSE.equals(config.getOption(ChannelOption.TCP_NODELAY.name()))) + .option(ChannelOption.SO_KEEPALIVE, !Boolean.FALSE.equals(config.getOption(ChannelOption.SO_KEEPALIVE.name()))) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }); + + _session = session; + _writables = new ConcurrentLinkedQueue(); + } - RpcMessageHandler handler = new RpcMessageHandler(session); + protected ChannelFuture startConnection() { - bootstrap = new ClientBootstrap(loop.getClientFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - Map options = config.getOptions(); - setIfNotPresent(options, TCP_NO_DELAY, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); + return _bootstrap.connect(_session.getAddress().getSocketAddress()); } - private final ChannelFutureListener connectListener = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - onConnectFailed(future.getChannel(), future.getCause()); - return; - } - Channel c = future.getChannel(); - c.getCloseFuture().addListener(closeListener); - onConnected(c); - } - }; + public void sendMessage(final Object msg) { + + if(_writables.isEmpty() && _availables.getAndDecrement() > 0){ + + startConnection().addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture future) throws Exception { - private final ChannelFutureListener closeListener = new ChannelFutureListener() { - public void operationComplete(ChannelFuture future) throws Exception { - Channel c = future.getChannel(); - onClosed(c); + final Channel connected = future.channel(); + + sendMessageChannel(connected, msg); + + connected.closeFuture().addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture channelFuture) throws Exception { + _availables.incrementAndGet(); + } + }); + } + }); } - }; + else{ - @Override - protected void startConnection() { - ChannelFuture f = bootstrap.connect(session.getAddress().getSocketAddress()); - f.addListener(connectListener); - } + final Channel writable = _writables.poll(); - @Override - protected ChannelBufferOutputStream newPendingBuffer() { - return new ChannelBufferOutputStream( - ChannelBuffers.dynamicBuffer(HeapChannelBufferFactory.getInstance())); - } + if(writable != null){ - @Override - protected void resetPendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); - } + sendMessageChannel(writable, msg); + } + else{ - @Override - protected void flushPendingBuffer(ChannelBufferOutputStream b, Channel c) { - Channels.write(c, b.buffer()); - b.buffer().clear(); + Thread.yield(); + sendMessage(msg); + } + } } - @Override - protected void closePendingBuffer(ChannelBufferOutputStream b) { - b.buffer().clear(); - } + public void close(){ - @Override - protected void sendMessageChannel(Channel c, Object msg) { - Channels.write(c, msg); - } + while(!_writables.isEmpty()){ + _writables.poll().close(); + } - @Override - protected void closeChannel(Channel c) { - c.close(); } - private static void setIfNotPresent(Map options, - String key, Object value, ClientBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); - } + protected ChannelFuture sendMessageChannel(Channel c, Object msg) { + + //System.out.println("[client transport] send message"); + + return c.writeAndFlush(msg).addListener(new ChannelFutureListener() { + + public void operationComplete(ChannelFuture future) throws Exception { + + //System.out.println("[client transport] message sent!!!"); + + _writables.offer(future.channel()); + } + }); } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java index cab6cfe..fffaf09 100644 --- a/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java +++ b/src/main/java/org/msgpack/rpc/loop/netty/NettyTcpServerTransport.java @@ -17,10 +17,11 @@ // package org.msgpack.rpc.loop.netty; -import java.util.Map; - -import org.jboss.netty.channel.Channel; -import org.jboss.netty.bootstrap.ServerBootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; import org.msgpack.rpc.Server; import org.msgpack.rpc.config.TcpServerConfig; import org.msgpack.rpc.transport.RpcMessageHandler; @@ -28,36 +29,46 @@ import org.msgpack.rpc.address.Address; class NettyTcpServerTransport implements ServerTransport { - private Channel listenChannel; - private final static String CHILD_TCP_NODELAY = "child.tcpNoDelay"; - private final static String REUSE_ADDRESS = "reuseAddress"; - NettyTcpServerTransport(TcpServerConfig config, Server server, NettyEventLoop loop) { + private final ChannelFuture channelFuture; + + NettyTcpServerTransport(final TcpServerConfig config, + final Server server, + final NettyEventLoop loop) throws InterruptedException { + if (server == null) { throw new IllegalArgumentException("Server must not be null"); } - Address address = config.getListenAddress(); - RpcMessageHandler handler = new RpcMessageHandler(server); + final Address address = config.getListenAddress(); + final RpcMessageHandler handler = new RpcMessageHandler(server); + handler.useThread(true); - ServerBootstrap bootstrap = new ServerBootstrap(loop.getServerFactory()); - bootstrap.setPipelineFactory(new StreamPipelineFactory(loop.getMessagePack(), handler)); - final Map options = config.getOptions(); - setIfNotPresent(options, CHILD_TCP_NODELAY, Boolean.TRUE, bootstrap); - setIfNotPresent(options, REUSE_ADDRESS, Boolean.TRUE, bootstrap); - bootstrap.setOptions(options); - this.listenChannel = bootstrap.bind(address.getSocketAddress()); - } + final EventLoopGroup bossGroup = new NioEventLoopGroup(/*1*/); // (1) + final EventLoopGroup workerGroup = new NioEventLoopGroup(/*4*/); + final ServerBootstrap b = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) // (3) + .childHandler(new ChannelInitializer() { // (4) + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new MessagePackDecoder(loop.getMessagePack()), + new MessageHandler(handler), + new MessagePackEncoder(loop.getMessagePack()), + new MessagePackableEncoder(loop.getMessagePack())); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) // (5) + .childOption(ChannelOption.TCP_NODELAY, !Boolean.FALSE.equals(config.getOption(ChannelOption.TCP_NODELAY.name()))) + .childOption(ChannelOption.SO_KEEPALIVE, !Boolean.FALSE.equals(config.getOption(ChannelOption.SO_KEEPALIVE.name()))); - public void close() { - listenChannel.close(); + // Bind and start to accept incoming connections. + channelFuture = b.bind(address.getSocketAddress()).sync(); // (7) } - private static void setIfNotPresent(Map options, - String key, Object value, ServerBootstrap bootstrap) { - if (!options.containsKey(key)) { - bootstrap.setOption(key, value); - } + public void close() { + channelFuture.channel().close(); } } diff --git a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java b/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java deleted file mode 100644 index 64a1cae..0000000 --- a/src/main/java/org/msgpack/rpc/loop/netty/StreamPipelineFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -// -// MessagePack-RPC for Java -// -// Copyright (C) 2010 FURUHASHI Sadayuki -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -package org.msgpack.rpc.loop.netty; - -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.msgpack.MessagePack; -import org.msgpack.rpc.transport.RpcMessageHandler; - -class StreamPipelineFactory implements ChannelPipelineFactory { - private RpcMessageHandler handler; - private MessagePack messagePack; - - StreamPipelineFactory(MessagePack messagePack, RpcMessageHandler handler) { - this.handler = handler; - this.messagePack = messagePack; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline p = Channels.pipeline(); - p.addLast("msgpack-decode-stream", new MessagePackStreamDecoder(messagePack)); - p.addLast("msgpack-encode", new MessagePackEncoder(messagePack)); - p.addLast("message", new MessageHandler(handler)); - return p; - } -} diff --git a/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java b/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java index 8d81c17..50d6c27 100644 --- a/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java +++ b/src/main/java/org/msgpack/rpc/reflect/ReflectionProxyBuilder.java @@ -23,7 +23,6 @@ import java.lang.reflect.*; import org.msgpack.rpc.*; import org.msgpack.*; -import org.msgpack.rpc.loop.netty.MessagePackEncoder; import org.msgpack.template.*; import org.msgpack.type.Value; import org.msgpack.unpacker.Converter; diff --git a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java index f37abcf..5c86e6d 100644 --- a/src/main/java/org/msgpack/rpc/transport/ClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/ClientTransport.java @@ -20,6 +20,7 @@ import java.io.Closeable; public interface ClientTransport extends Closeable, MessageSendable { + public void sendMessage(Object obj); public void close(); diff --git a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java index c09f23b..969e069 100644 --- a/src/main/java/org/msgpack/rpc/transport/MessageSendable.java +++ b/src/main/java/org/msgpack/rpc/transport/MessageSendable.java @@ -18,5 +18,7 @@ package org.msgpack.rpc.transport; public interface MessageSendable { + public void sendMessage(Object obj); + } diff --git a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java index 6a3e029..90d32d3 100644 --- a/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/PooledStreamClientTransport.java @@ -22,8 +22,9 @@ import java.util.ArrayList; import java.util.List; -import org.jboss.netty.logging.InternalLogger; -import org.jboss.netty.logging.InternalLoggerFactory; +import io.netty.channel.ChannelFuture; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; import org.msgpack.rpc.Session; import org.msgpack.rpc.config.StreamClientConfig; import org.msgpack.MessagePack; @@ -71,9 +72,11 @@ public void sendMessage(Object msg) { if (pool.isEmpty()) { // may be already connected try { messagePack.write(getPendingBuffer(), msg); - } catch (IOException e) { + } + catch (IOException e) { // FIXME } + flushPendingBuffer(getPendingBuffer(), pool.get(0)); return; } } @@ -171,9 +174,9 @@ protected PendingBuffer getPendingBuffer() { protected abstract void closePendingBuffer(PendingBuffer b); - protected abstract void startConnection(); + protected abstract ChannelFuture startConnection(); - protected abstract void sendMessageChannel(Channel c, Object msg); + protected abstract ChannelFuture sendMessageChannel(Channel c, Object msg); protected abstract void closeChannel(Channel c); } diff --git a/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java b/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java index f26a5c9..15600c2 100644 --- a/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java +++ b/src/main/java/org/msgpack/rpc/transport/RpcMessageHandler.java @@ -24,6 +24,7 @@ import org.msgpack.rpc.loop.EventLoop; public class RpcMessageHandler { + protected final Session session; protected final Server server; protected final EventLoop loop; @@ -42,7 +43,8 @@ public RpcMessageHandler(Session session, Server server) { this.server = server; if (session == null) { this.loop = server.getEventLoop(); - } else { + } + else { this.loop = session.getEventLoop(); } } @@ -77,6 +79,7 @@ public void handleMessage(MessageSendable channel, Value msg) { } private void handleMessageImpl(MessageSendable channel, Value msg) { + Value[] array = msg.asArrayValue().getElementArray(); // TODO check array.length @@ -88,20 +91,23 @@ private void handleMessageImpl(MessageSendable channel, Value msg) { Value args = array[3]; handleRequest(channel, msgid, method, args); - } else if (type == Messages.RESPONSE) { + } + else if (type == Messages.RESPONSE) { // RESPONSE int msgid = array[1].asIntegerValue().getInt(); Value error = array[2]; Value result = array[3]; handleResponse(channel, msgid, result, error); - } else if (type == Messages.NOTIFY) { + } + else if (type == Messages.NOTIFY) { // NOTIFY String method = array[1].asRawValue().getString(); Value args = array[2]; handleNotify(channel, method, args); - } else { + } + else { // FIXME error result throw new RuntimeException("unknown message type: " + type); } diff --git a/src/main/java/org/msgpack/rpc/transport/ServerTransport.java b/src/main/java/org/msgpack/rpc/transport/ServerTransport.java index 6dfc247..ec7cc80 100644 --- a/src/main/java/org/msgpack/rpc/transport/ServerTransport.java +++ b/src/main/java/org/msgpack/rpc/transport/ServerTransport.java @@ -20,5 +20,7 @@ import java.io.Closeable; public interface ServerTransport extends Closeable { + public void close(); + } diff --git a/src/test/java/org/msgpack/rpc/BigDataTest.java b/src/test/java/org/msgpack/rpc/BigDataTest.java index 383afa6..7782123 100644 --- a/src/test/java/org/msgpack/rpc/BigDataTest.java +++ b/src/test/java/org/msgpack/rpc/BigDataTest.java @@ -12,54 +12,62 @@ public class BigDataTest extends TestCase { private static String getBigString() { - StringBuilder sb = new StringBuilder(1024 * 1024); // 1M + StringBuilder sb = new StringBuilder(1024); // 1M Random random = new Random(); - for(int i = 0;i < 1024 * 1024;i++){ + for(int i = 0;i < 1024;i++){ sb.append( (char)('a' + random.nextInt(26))); } return sb.toString(); } private static Value BIG_DATA = ValueFactory.createRawValue(getBigString()); + public static class BigDataDispatcher implements Dispatcher { - public void dispatch(Request request) { - assertEquals(BIG_DATA,request.getArguments().asArrayValue().get(0) ); + + public void dispatch(Request request) { + + assertEquals(BIG_DATA, request.getArguments().asArrayValue().get(0) ); + request.sendResult(BIG_DATA); } } @Test public void testSyncBigDataLoad() throws Exception { + MessagePack messagePack = new MessagePack(); EventLoop loop = EventLoop.start(messagePack); Server svr = new Server(loop); Client c = new Client("127.0.0.1", 19851, loop); - c.setRequestTimeout(10); - + c.setRequestTimeout(100); try { svr.serve(new BigDataDispatcher()); svr.listen(19851); - int num = 5; + //warmup + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); + + int num = 10; long start = System.currentTimeMillis(); for(int i=0; i < num; i++) { - Value result = c.callApply("test", new Object[]{BIG_DATA}); - assertEquals(BIG_DATA, result); + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); } long finish = System.currentTimeMillis(); double result = num / ((double)(finish - start) / 1000); - System.out.println("sync: "+result+" calls per sec"); + System.out.printf("sync: %f calls per sec, and avg: %fms per call", result, (double) (finish - start) / num); - } finally { + } + finally { svr.close(); c.close(); loop.shutdown(); } } - @Test + + @Test public void testAsyncBigDataLoad() throws Exception { EventLoop loop = EventLoop.start(); Server svr = new Server(loop); @@ -70,10 +78,13 @@ public void testAsyncBigDataLoad() throws Exception { svr.serve(new BigDataDispatcher()); svr.listen(19852); - int num = 10; + //warmup + assertEquals(BIG_DATA, c.callApply("test", new Object[]{BIG_DATA})); + + int num = 10000; long start = System.currentTimeMillis(); - for(int i=0; i < num-1; i++) { + for(int i = 0; i < num - 1; i++) { c.notifyApply("test", new Object[]{BIG_DATA}); } c.callApply("test", new Object[]{BIG_DATA}); @@ -82,7 +93,8 @@ public void testAsyncBigDataLoad() throws Exception { double result = num / ((double)(finish - start) / 1000); System.out.println("async: "+result+" calls per sec"); - } finally { + } + finally { svr.close(); c.close(); loop.shutdown();