From 2ab654e03b3c75c2b5d87541e75637512de9be78 Mon Sep 17 00:00:00 2001 From: skywalker Date: Fri, 17 Feb 2017 10:08:56 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BE=8E=E5=8C=96=E4=BB=A3=E7=A0=81,=20?= =?UTF-8?q?=E8=A1=A5=E5=85=85ReadMe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .classpath | 27 -- .gitignore | 2 + .project | 23 - .settings/org.eclipse.core.resources.prefs | 4 - .settings/org.eclipse.jdt.core.prefs | 2 - .settings/org.eclipse.m2e.core.prefs | 4 - README.md | 64 ++- pom.xml | 72 +-- src/main/java/context/HandlerContext.java | 298 ++++++------- src/main/java/event/ChannelActiveEvent.java | 2 +- src/main/java/event/ChannelInActiveEvent.java | 19 +- src/main/java/event/ChannelReadEvent.java | 21 +- src/main/java/event/ChannelWriteEvent.java | 19 +- src/main/java/event/Event.java | 48 +- .../java/handler/DefaultOutBoundHandler.java | 61 ++- src/main/java/handler/Handler.java | 4 +- src/main/java/handler/HandlerChain.java | 58 +-- src/main/java/handler/HandlerInitializer.java | 38 +- src/main/java/handler/InBoundHandler.java | 30 +- .../java/handler/InBoundHandlerAdapter.java | 26 +- src/main/java/handler/OutBoundHandler.java | 13 +- .../java/handler/OutBoundHandlerAdapter.java | 10 +- .../decoder/DelimiterBasedDecoder.java | 165 ++++--- .../decoder/LengthFieldBasedDecoder.java | 409 +++++++++--------- .../handler/decoder/LineBasedDecoder.java | 20 +- .../java/handler/decoder/StringDecoder.java | 77 ++-- .../java/handler/encoder/StringEncoder.java | 20 +- src/main/java/lifecycle/LifeCycle.java | 12 +- .../java/manager/AbstractChooseStrategy.java | 68 ++- src/main/java/manager/AbstractManager.java | 100 ++--- src/main/java/manager/ChooseStrategy.java | 12 +- .../java/manager/DefaultChooseStrategy.java | 15 +- src/main/java/manager/Manager.java | 15 +- .../java/manager/PortBasedChooseStrategy.java | 42 +- src/main/java/selector/QueuedSelector.java | 405 +++++++++-------- src/main/java/selector/SelectorManager.java | 68 +-- src/main/java/server/Server.java | 212 +++++---- src/main/java/worker/Worker.java | 93 ++-- src/main/java/worker/WorkerManager.java | 26 +- src/main/resource/logback.xml | 20 +- src/test/java/bootstrap/Bootstrap.java | 62 +-- src/test/java/client/Client.java | 93 ++-- src/test/java/handler/ResponseHandler.java | 12 +- .../java/handler/SimpleInBoundHandler.java | 35 +- src/test/java/util/DataUtils.java | 28 +- 45 files changed, 1450 insertions(+), 1404 deletions(-) delete mode 100644 .classpath delete mode 100644 .project delete mode 100644 .settings/org.eclipse.core.resources.prefs delete mode 100644 .settings/org.eclipse.jdt.core.prefs delete mode 100644 .settings/org.eclipse.m2e.core.prefs diff --git a/.classpath b/.classpath deleted file mode 100644 index 7613fa1..0000000 --- a/.classpath +++ /dev/null @@ -1,27 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/.gitignore b/.gitignore index 7949b73..faccf4c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,7 @@ *.class /target/ +/.idea/ +/MiniNetty.iml # Mobile Tools for Java (J2ME) .mtj.tmp/ diff --git a/.project b/.project deleted file mode 100644 index a0e7d8e..0000000 --- a/.project +++ /dev/null @@ -1,23 +0,0 @@ - - - MiniNetty - - - - - - org.eclipse.jdt.core.javabuilder - - - - - org.eclipse.m2e.core.maven2Builder - - - - - - org.eclipse.jdt.core.javanature - org.eclipse.m2e.core.maven2Nature - - diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs deleted file mode 100644 index f9fe345..0000000 --- a/.settings/org.eclipse.core.resources.prefs +++ /dev/null @@ -1,4 +0,0 @@ -eclipse.preferences.version=1 -encoding//src/main/java=UTF-8 -encoding//src/test/java=UTF-8 -encoding/=UTF-8 diff --git a/.settings/org.eclipse.jdt.core.prefs b/.settings/org.eclipse.jdt.core.prefs deleted file mode 100644 index 4ede96d..0000000 --- a/.settings/org.eclipse.jdt.core.prefs +++ /dev/null @@ -1,2 +0,0 @@ -eclipse.preferences.version=1 -org.eclipse.jdt.core.compiler.problem.forbiddenReference=warning diff --git a/.settings/org.eclipse.m2e.core.prefs b/.settings/org.eclipse.m2e.core.prefs deleted file mode 100644 index f897a7f..0000000 --- a/.settings/org.eclipse.m2e.core.prefs +++ /dev/null @@ -1,4 +0,0 @@ -activeProfiles= -eclipse.preferences.version=1 -resolveWorkspaceProjects=true -version=1 diff --git a/README.md b/README.md index 360db43..64835ca 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,4 @@ -# MiniNetty - -## 实现的功能 +# 实现的功能 - channelActive/channelInActive/channelRead/channelWrite事件处理 @@ -16,7 +14,65 @@ - LineBasedDecoder -## 线程模型 +# 线程模型 ![ThreadMode](images/thread_mode.jpg) +# 示例 + +## 服务器启动 + +以定长解码器为例: + +```java +@Test +public void lengthFieldBasedDecoder() { + Server server = new Server(); + server.bind(8080).setHandlers(new HandlerInitializer() { + @Override + public Handler[] init() { + return new Handler[] {new LengthFieldBasedDecoder(0, 4), + new StringDecoder(), new SimpleInBoundHandler()}; + } + }).start(); +} +``` + +## SimpleInBoundHandler + +简单地打印出事件触发以及收到的消息: + +```java +public class SimpleInBoundHandler extends InBoundHandlerAdapter { + @Override + public void channelActive(HandlerContext context) { + System.out.println("channel active"); + } + @Override + public void channelInActive(HandlerContext context) { + System.out.println("channel inActive"); + } + @Override + public void channelRead(Object message, HandlerContext context) { + System.out.println(message.toString()); + } +} +``` + +## 客户端 + +数据发送代码: + +```java +@Test +public void lengthFieldBasedDecoder() throws IOException, InterruptedException { + byte[] result = new byte[35]; + System.arraycopy(DataUtils.int2Bytes(31), 0, result, 0, 4); + System.arraycopy("org.apache.commons.lang.builder".getBytes(), 0, result, 4, 31); + for (int i = 0; i < 6; i++) { + bos.write(result); + } + TimeUnit.SECONDS.sleep(6); +} +``` + diff --git a/pom.xml b/pom.xml index 021ef9c..31504ac 100644 --- a/pom.xml +++ b/pom.xml @@ -1,37 +1,47 @@ - 4.0.0 + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + 4.0.0 - skywalker - MiniNetty - 0.0.1-SNAPSHOT - jar + skywalker + MiniNetty + 0.0.1-SNAPSHOT + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + jar - MiniNetty - http://maven.apache.org + MiniNetty + http://maven.apache.org - - UTF-8 - + + UTF-8 + - - - - ch.qos.logback - logback-classic - 1.1.7 - - - - io.netty - netty-all - 4.1.4.Final - - - - junit - junit - 4.12 - - + + + ch.qos.logback + logback-classic + 1.1.7 + + + io.netty + netty-all + 4.1.4.Final + + + junit + junit + 4.12 + test + + diff --git a/src/main/java/context/HandlerContext.java b/src/main/java/context/HandlerContext.java index 1db0a39..d8a32b4 100644 --- a/src/main/java/context/HandlerContext.java +++ b/src/main/java/context/HandlerContext.java @@ -18,154 +18,154 @@ */ public class HandlerContext { - private final List inBoundHandlers; - private final List outBoundHandlers; - private int index; - private int inBoundSize, outBoundSize; - private WorkerManager workerManager; - private SocketChannel channel; - private final boolean isInBound; - - public HandlerContext(List inBoundHandlers, - List outBoundHandlers, boolean isInBound) { - this.inBoundHandlers = inBoundHandlers; - this.outBoundHandlers = outBoundHandlers; - this.inBoundSize = inBoundHandlers.size(); - this.outBoundSize = outBoundHandlers.size(); - this.isInBound = isInBound; - reset(); - } - - public void setWorkerManager(WorkerManager workerManager) { - this.workerManager = workerManager; - } - - /** - * 直接使用Channel原生方法返回方法数据不会被OutBoundHandler处理 - */ - public SocketChannel getChannel() { - return channel; - } - - public void setChannel(SocketChannel channel) { - this.channel = channel; - } - - /** - * 添加Handler,用于HandlerInitializer,对HandlerContext的添加并不会影响HandlerChain - * - * @param handler - * {@link Handler} - * @return this {@link HandlerContext} - */ - public HandlerContext addHandler(Handler handler) { - if (handler instanceof InBoundHandler) { - inBoundHandlers.add((InBoundHandler) handler); - ++inBoundSize; - } else if (handler instanceof OutBoundHandler) { - outBoundHandlers.add((OutBoundHandler) handler); - ++outBoundSize; - } - return this; - } - - /** - * 移除HandlerInitializer - * - * @param handlerInitializer - * {@link HandlerInitializer} - */ - public void removeHandlerInitializer(HandlerInitializer handlerInitializer) { - inBoundHandlers.remove(handlerInitializer); - --inBoundSize; - --index; - } - - /** - * reset the index to original value. - */ - public void reset() { - this.index = isInBound ? 0 : (outBoundSize - 1); - } - - /** - * 触发连接建立事件 - */ - public void fireChannelActive() { - if (index >= inBoundSize) - return; - InBoundHandler handler = inBoundHandlers.get(index); - ++index; - handler.channelActive(this); - } - - /** - * 触发连接断开事件 - */ - public void fireChannelInActive() { - if (index >= inBoundSize) - return; - InBoundHandler handler = inBoundHandlers.get(index); - ++index; - handler.channelInActive(this); - } - - /** - * 触发消息读取事件 - * - * @param message - * {@link Object} 消息 - */ - public void fireChannelRead(Object message) { - if (index >= inBoundSize) - return; - InBoundHandler handler = inBoundHandlers.get(index); - ++index; - handler.channelRead(message, this); - } - - /** - * 多次触发后续Handler channelRead事件 - * - * @param messages - * {@link List} - */ - public void fireChannelReads(List messages) { - if (messages != null) { - int oldIndex = index; - for (int i = 0, s = messages.size(); i < s; i++) { - index = oldIndex; - fireChannelRead(messages.get(i)); - } - } - } - - /** - * 触发消息写出事件 - * - * @param message - * {@link Object} 消息 - */ - public void fireChannelWrite(Object message) { - if (index < 0) - return; - OutBoundHandler handler = outBoundHandlers.get(index); - --index; - handler.channelWrite(message, this); - } - - /** - * 向客户端返回数据,触发OutBound事件 - * - * @param message - * {@link Object} 数据 - */ - public void writeFlush(Object message) { - HandlerContext context = new HandlerContext(inBoundHandlers, - outBoundHandlers, false); - context.setChannel(channel); - context.setWorkerManager(workerManager); - workerManager.chooseOne(channel).submit(new ChannelWriteEvent(context, message)); - } + private final List inBoundHandlers; + private final List outBoundHandlers; + private int index; + private int inBoundSize, outBoundSize; + private WorkerManager workerManager; + private SocketChannel channel; + private final boolean isInBound; + + public HandlerContext(List inBoundHandlers, + List outBoundHandlers, boolean isInBound) { + this.inBoundHandlers = inBoundHandlers; + this.outBoundHandlers = outBoundHandlers; + this.inBoundSize = inBoundHandlers.size(); + this.outBoundSize = outBoundHandlers.size(); + this.isInBound = isInBound; + reset(); + } + + public void setWorkerManager(WorkerManager workerManager) { + this.workerManager = workerManager; + } + + /** + * 直接使用Channel原生方法返回方法数据不会被OutBoundHandler处理 + */ + public SocketChannel getChannel() { + return channel; + } + + public void setChannel(SocketChannel channel) { + this.channel = channel; + } + + /** + * 添加Handler,用于HandlerInitializer,对HandlerContext的添加并不会影响HandlerChain + * + * @param handler + * {@link Handler} + * @return this {@link HandlerContext} + */ + public HandlerContext addHandler(Handler handler) { + if (handler instanceof InBoundHandler) { + inBoundHandlers.add((InBoundHandler) handler); + ++inBoundSize; + } else if (handler instanceof OutBoundHandler) { + outBoundHandlers.add((OutBoundHandler) handler); + ++outBoundSize; + } + return this; + } + + /** + * 移除HandlerInitializer + * + * @param handlerInitializer + * {@link HandlerInitializer} + */ + public void removeHandlerInitializer(HandlerInitializer handlerInitializer) { + inBoundHandlers.remove(handlerInitializer); + --inBoundSize; + --index; + } + + /** + * reset the index to original value. + */ + public void reset() { + this.index = isInBound ? 0 : (outBoundSize - 1); + } + + /** + * 触发连接建立事件 + */ + public void fireChannelActive() { + if (index >= inBoundSize) + return; + InBoundHandler handler = inBoundHandlers.get(index); + ++index; + handler.channelActive(this); + } + + /** + * 触发连接断开事件 + */ + public void fireChannelInActive() { + if (index >= inBoundSize) + return; + InBoundHandler handler = inBoundHandlers.get(index); + ++index; + handler.channelInActive(this); + } + + /** + * 触发消息读取事件 + * + * @param message + * {@link Object} 消息 + */ + public void fireChannelRead(Object message) { + if (index >= inBoundSize) + return; + InBoundHandler handler = inBoundHandlers.get(index); + ++index; + handler.channelRead(message, this); + } + + /** + * 多次触发后续Handler channelRead事件 + * + * @param messages + * {@link List} + */ + public void fireChannelReads(List messages) { + if (messages != null) { + int oldIndex = index; + for (int i = 0, s = messages.size(); i < s; i++) { + index = oldIndex; + fireChannelRead(messages.get(i)); + } + } + } + + /** + * 触发消息写出事件 + * + * @param message + * {@link Object} 消息 + */ + public void fireChannelWrite(Object message) { + if (index < 0) + return; + OutBoundHandler handler = outBoundHandlers.get(index); + --index; + handler.channelWrite(message, this); + } + + /** + * 向客户端返回数据,触发OutBound事件 + * + * @param message + * {@link Object} 数据 + */ + public void writeFlush(Object message) { + HandlerContext context = new HandlerContext(inBoundHandlers, + outBoundHandlers, false); + context.setChannel(channel); + context.setWorkerManager(workerManager); + workerManager.chooseOne(channel).submit(new ChannelWriteEvent(context, message)); + } } diff --git a/src/main/java/event/ChannelActiveEvent.java b/src/main/java/event/ChannelActiveEvent.java index 0b6764f..8315d04 100644 --- a/src/main/java/event/ChannelActiveEvent.java +++ b/src/main/java/event/ChannelActiveEvent.java @@ -12,5 +12,5 @@ public ChannelActiveEvent(HandlerContext ctx) { protected void doRun(HandlerContext context, Object message) { context.fireChannelActive(); } - + } diff --git a/src/main/java/event/ChannelInActiveEvent.java b/src/main/java/event/ChannelInActiveEvent.java index e29dc34..6c06b70 100644 --- a/src/main/java/event/ChannelInActiveEvent.java +++ b/src/main/java/event/ChannelInActiveEvent.java @@ -3,19 +3,20 @@ import context.HandlerContext; /** - * 连接断开事件 + * 连接断开事件. + * * @author skywalker * */ public class ChannelInActiveEvent extends Event { - public ChannelInActiveEvent(HandlerContext ctx) { - super(ctx); - } + public ChannelInActiveEvent(HandlerContext ctx) { + super(ctx); + } + + @Override + protected void doRun(HandlerContext context, Object message) { + context.fireChannelInActive(); + } - @Override - protected void doRun(HandlerContext context, Object message) { - context.fireChannelInActive(); - } - } diff --git a/src/main/java/event/ChannelReadEvent.java b/src/main/java/event/ChannelReadEvent.java index 9c0d27f..daf72b9 100644 --- a/src/main/java/event/ChannelReadEvent.java +++ b/src/main/java/event/ChannelReadEvent.java @@ -2,15 +2,20 @@ import context.HandlerContext; +/** + * {@link Event}实现,数据读取事件. + * + * @author skywalker + */ public class ChannelReadEvent extends Event { - public ChannelReadEvent(HandlerContext context, Object message) { - super(context, message); - } + public ChannelReadEvent(HandlerContext context, Object message) { + super(context, message); + } + + @Override + protected void doRun(HandlerContext context, Object message) { + context.fireChannelRead(message); + } - @Override - protected void doRun(HandlerContext context, Object message) { - context.fireChannelRead(message); - } - } diff --git a/src/main/java/event/ChannelWriteEvent.java b/src/main/java/event/ChannelWriteEvent.java index a780762..c8da82c 100644 --- a/src/main/java/event/ChannelWriteEvent.java +++ b/src/main/java/event/ChannelWriteEvent.java @@ -3,19 +3,20 @@ import context.HandlerContext; /** - * 数据发送事件 + * 数据发送事件. + * * @author skywalker * */ public class ChannelWriteEvent extends Event { - public ChannelWriteEvent(HandlerContext context, Object message) { - super(context, message); - } + public ChannelWriteEvent(HandlerContext context, Object message) { + super(context, message); + } + + @Override + protected void doRun(HandlerContext context, Object message) { + context.fireChannelWrite(message); + } - @Override - protected void doRun(HandlerContext context, Object message) { - context.fireChannelWrite(message); - } - } diff --git a/src/main/java/event/Event.java b/src/main/java/event/Event.java index 3a11225..71423fc 100644 --- a/src/main/java/event/Event.java +++ b/src/main/java/event/Event.java @@ -3,34 +3,34 @@ import context.HandlerContext; /** - * 事件 + * 事件. * * @author skywalker * */ public abstract class Event implements Runnable { - private HandlerContext ctx; - private Object message; - - public Event(HandlerContext context, Object message) { - this.ctx = context; - this.message = message; - } - - public Event(HandlerContext ctx) { - this.ctx = ctx; - } - - @Override - public final void run() { - ctx.reset(); - doRun(ctx, message); - } - - /** - * 子类真正的运行方法 - */ - protected abstract void doRun(HandlerContext context, Object message); - + private HandlerContext ctx; + private Object message; + + public Event(HandlerContext context, Object message) { + this.ctx = context; + this.message = message; + } + + public Event(HandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public final void run() { + ctx.reset(); + doRun(ctx, message); + } + + /** + * 子类真正的运行方法. + */ + protected abstract void doRun(HandlerContext context, Object message); + } diff --git a/src/main/java/handler/DefaultOutBoundHandler.java b/src/main/java/handler/DefaultOutBoundHandler.java index 3af8044..f4b22c2 100644 --- a/src/main/java/handler/DefaultOutBoundHandler.java +++ b/src/main/java/handler/DefaultOutBoundHandler.java @@ -10,40 +10,39 @@ import context.HandlerContext; /** - * 负责最后真正向客户端发送数据 - * - * @author skywalker + * 负责最后真正向客户端发送数据. * + * @author skywalker */ public class DefaultOutBoundHandler implements OutBoundHandler { - - private static final Logger logger = LoggerFactory.getLogger(DefaultOutBoundHandler.class); - @Override - public void channelWrite(Object message, HandlerContext context) { - if (message == null) return; - SocketChannel channel = context.getChannel(); - try { - ByteBuffer result = null; - if (message instanceof ByteBuffer) { - result = (ByteBuffer) message; - } else if (message instanceof byte[]) { - result = ByteBuffer.wrap((byte[]) message); - } else if (message instanceof String) { - result = ByteBuffer.wrap(message.toString().getBytes()); - } - if (result == null) { - logger.debug("Unsupported type: " + message.getClass().getName()); - } else { - channel.write(result); - } - } catch (IOException e) { - try { - logger.error("Write to " + channel.getRemoteAddress().toString() + " failed."); - } catch (IOException e1) { - logger.error(e1.getMessage()); - } - } - } + private static final Logger logger = LoggerFactory.getLogger(DefaultOutBoundHandler.class); + + @Override + public void channelWrite(Object message, HandlerContext context) { + if (message == null) return; + SocketChannel channel = context.getChannel(); + try { + ByteBuffer result = null; + if (message instanceof ByteBuffer) { + result = (ByteBuffer) message; + } else if (message instanceof byte[]) { + result = ByteBuffer.wrap((byte[]) message); + } else if (message instanceof String) { + result = ByteBuffer.wrap(message.toString().getBytes()); + } + if (result == null) { + logger.debug("Unsupported type: " + message.getClass().getName()); + } else { + channel.write(result); + } + } catch (IOException e) { + try { + logger.error("Write to " + channel.getRemoteAddress().toString() + " failed."); + } catch (IOException e1) { + logger.error(e1.getMessage()); + } + } + } } diff --git a/src/main/java/handler/Handler.java b/src/main/java/handler/Handler.java index 7b2fa41..c856641 100644 --- a/src/main/java/handler/Handler.java +++ b/src/main/java/handler/Handler.java @@ -1,8 +1,8 @@ package handler; /** - * 事件处理器 - * Mark Interface + * 事件处理器(Mark Interface). + * * @author skywalker * */ diff --git a/src/main/java/handler/HandlerChain.java b/src/main/java/handler/HandlerChain.java index e65e77a..adf8df2 100644 --- a/src/main/java/handler/HandlerChain.java +++ b/src/main/java/handler/HandlerChain.java @@ -4,39 +4,39 @@ import java.util.List; /** - * Handler调用链 + * {@link Handler}调用链. * * @author skywalker * */ public class HandlerChain { - private final List inBoundHandlers; - private final List outBoundHandlers; - - public HandlerChain() { - this.inBoundHandlers = new ArrayList<>(); - this.outBoundHandlers = new ArrayList<>(); - this.outBoundHandlers.add(new DefaultOutBoundHandler()); - } - - /** - * 添加处理器 - */ - public void addHandler(Handler handler) { - if (handler instanceof InBoundHandler) { - inBoundHandlers.add((InBoundHandler) handler); - } else if (handler instanceof OutBoundHandler) { - outBoundHandlers.add((OutBoundHandler) handler); - } - } - - public List getInBoundHandlers() { - return new ArrayList(inBoundHandlers); - } - - public List getOutBoundHandlers() { - return new ArrayList(outBoundHandlers); - } - + private final List inBoundHandlers; + private final List outBoundHandlers; + + public HandlerChain() { + this.inBoundHandlers = new ArrayList<>(); + this.outBoundHandlers = new ArrayList<>(); + this.outBoundHandlers.add(new DefaultOutBoundHandler()); + } + + /** + * 添加处理器 + */ + public void addHandler(Handler handler) { + if (handler instanceof InBoundHandler) { + inBoundHandlers.add((InBoundHandler) handler); + } else if (handler instanceof OutBoundHandler) { + outBoundHandlers.add((OutBoundHandler) handler); + } + } + + public List getInBoundHandlers() { + return new ArrayList(inBoundHandlers); + } + + public List getOutBoundHandlers() { + return new ArrayList(outBoundHandlers); + } + } diff --git a/src/main/java/handler/HandlerInitializer.java b/src/main/java/handler/HandlerInitializer.java index e40bba0..221540a 100644 --- a/src/main/java/handler/HandlerInitializer.java +++ b/src/main/java/handler/HandlerInitializer.java @@ -5,30 +5,30 @@ import context.HandlerContext; /** - * 用以在客户端Channel建立时想HandlerChain添加Handler - * 这样可以实现对于每一次客户端调用,HandlerChain中的Handler对象都是不同的, 否则都是同一个对象。 + * 用以在客户端Channel建立时想HandlerChain添加Handler. + *

这样可以实现对于每一次客户端调用,HandlerChain中的Handler对象都是不同的, 否则都是同一个对象。

* * @author skywalker * */ public abstract class HandlerInitializer extends InBoundHandlerAdapter { - @Override - public void channelActive(HandlerContext context) { - Handler[] handlers = init(); - Objects.requireNonNull(handlers); - context.removeHandlerInitializer(this); - for (int i = 0, l = handlers.length; i < l; i++) { - context.addHandler(handlers[i]); - } - context.fireChannelActive(); - } - - /** - * 返回想要添加的Handler数组 - * - * @return a {@link Handler} array - */ - public abstract Handler[] init(); + @Override + public void channelActive(HandlerContext context) { + Handler[] handlers = init(); + Objects.requireNonNull(handlers); + context.removeHandlerInitializer(this); + for (int i = 0, l = handlers.length; i < l; i++) { + context.addHandler(handlers[i]); + } + context.fireChannelActive(); + } + + /** + * 返回想要添加的Handler数组. + * + * @return {@link Handler} array + */ + public abstract Handler[] init(); } diff --git a/src/main/java/handler/InBoundHandler.java b/src/main/java/handler/InBoundHandler.java index 75a9e15..9fcbe16 100644 --- a/src/main/java/handler/InBoundHandler.java +++ b/src/main/java/handler/InBoundHandler.java @@ -3,16 +3,32 @@ import context.HandlerContext; /** - * 数据读取事件处理器 + * 数据读取事件处理器. + * * @author skywalker * */ public interface InBoundHandler extends Handler { - public void channelActive(HandlerContext context); - - public void channelInActive(HandlerContext context); - - public void channelRead(Object message, HandlerContext context); - + /** + * 连接建立. + * + * @param context {@link HandlerContext} 处理器上下文 + */ + void channelActive(HandlerContext context); + + /** + * 连接断开. + * + * @param context {@link HandlerContext} 处理器上下文 + */ + void channelInActive(HandlerContext context); + + /** + * 数据读取. + * + * @param context {@link HandlerContext} 处理器上下文 + */ + void channelRead(Object message, HandlerContext context); + } diff --git a/src/main/java/handler/InBoundHandlerAdapter.java b/src/main/java/handler/InBoundHandlerAdapter.java index b466a02..55745f3 100644 --- a/src/main/java/handler/InBoundHandlerAdapter.java +++ b/src/main/java/handler/InBoundHandlerAdapter.java @@ -3,26 +3,26 @@ import context.HandlerContext; /** - * 除了向下转发什么也不做,自己实现的InBoundHandlerAdapter可继承此类 + * 除了向下转发什么也不做,自己实现的InBoundHandlerAdapter可继承此类。 * * @author skywalker * */ public class InBoundHandlerAdapter implements InBoundHandler { - @Override - public void channelActive(HandlerContext context) { - context.fireChannelActive(); - } + @Override + public void channelActive(HandlerContext context) { + context.fireChannelActive(); + } - @Override - public void channelInActive(HandlerContext context) { - context.fireChannelInActive(); - } + @Override + public void channelInActive(HandlerContext context) { + context.fireChannelInActive(); + } - @Override - public void channelRead(Object message, HandlerContext context) { - context.fireChannelRead(message); - } + @Override + public void channelRead(Object message, HandlerContext context) { + context.fireChannelRead(message); + } } diff --git a/src/main/java/handler/OutBoundHandler.java b/src/main/java/handler/OutBoundHandler.java index 8b6b216..4b5a461 100644 --- a/src/main/java/handler/OutBoundHandler.java +++ b/src/main/java/handler/OutBoundHandler.java @@ -4,12 +4,17 @@ /** * 数据输出事件处理器 - * - * @author skywalker * + * @author skywalker */ public interface OutBoundHandler extends Handler { - public void channelWrite(Object message, HandlerContext context); - + /** + * 向客户端返回数据. + * + * @param message 数据(消息) + * @param context {@link HandlerContext} 处理器上下文 + */ + void channelWrite(Object message, HandlerContext context); + } diff --git a/src/main/java/handler/OutBoundHandlerAdapter.java b/src/main/java/handler/OutBoundHandlerAdapter.java index 3b85d16..e2140a0 100644 --- a/src/main/java/handler/OutBoundHandlerAdapter.java +++ b/src/main/java/handler/OutBoundHandlerAdapter.java @@ -3,16 +3,16 @@ import context.HandlerContext; /** - * 除了向下转发什么也不做,自己实现的OutBoundHandlerAdapter可继承此类 + * 除了向下转发什么也不做,自己实现的OutBoundHandlerAdapter可继承此类。 * * @author skywalker * */ public class OutBoundHandlerAdapter implements OutBoundHandler { - @Override - public void channelWrite(Object message, HandlerContext context) { - context.fireChannelWrite(message); - } + @Override + public void channelWrite(Object message, HandlerContext context) { + context.fireChannelWrite(message); + } } diff --git a/src/main/java/handler/decoder/DelimiterBasedDecoder.java b/src/main/java/handler/decoder/DelimiterBasedDecoder.java index f9ce1bc..985b65e 100644 --- a/src/main/java/handler/decoder/DelimiterBasedDecoder.java +++ b/src/main/java/handler/decoder/DelimiterBasedDecoder.java @@ -14,92 +14,91 @@ /** * 基于分割符(ASCII)的decoder,应该配合{@link HandlerInitializer}使用. * 以byte数组的形式传向下一个Handler,分隔符不含在内。 - * - * @author skywalker * + * @author skywalker */ public class DelimiterBasedDecoder extends InBoundHandlerAdapter { - private final byte delimiter; - //上次处理的剩余 - private byte[] todo; - private static final int defaultMaxLength = 1024; - private final int maxLength; - private static final Logger logger = LoggerFactory.getLogger(DelimiterBasedDecoder.class); + private final byte delimiter; + //上次处理的剩余 + private byte[] todo; + private static final int defaultMaxLength = 1024; + private final int maxLength; + private static final Logger logger = LoggerFactory.getLogger(DelimiterBasedDecoder.class); + + public DelimiterBasedDecoder(char delimiter) { + this(delimiter, 0); + } + + public DelimiterBasedDecoder(char delimiter, int maxLength) { + if (delimiter > 127) { + throw new IllegalArgumentException("We support ASCII code only."); + } + this.delimiter = (byte) delimiter; + if (maxLength > 0) { + this.maxLength = maxLength; + } else { + this.maxLength = defaultMaxLength; + } + } + + @Override + public void channelRead(Object message, HandlerContext context) { + if (message instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) message; + if (buffer.hasArray()) { + buffer.flip(); + byte[] array = buffer.array(); + List out = new ArrayList<>(); + int start = 0, i = 0; + for (int l = buffer.limit(); i < l; i++) { + if (array[i] == delimiter) { + byte[] result = null; + if (start == 0 && todo != null) { + //处理上次读取的剩余 + int tl = todo.length, length = tl + i; + if (check(length)) return; + result = new byte[length]; + System.arraycopy(todo, 0, result, 0, tl); + System.arraycopy(array, 0, result, tl, i); + todo = null; + } else { + int length = i - start; + if (check(length)) return; + result = new byte[length]; + System.arraycopy(array, start, result, 0, length); + } + out.add(result); + start = i + 1; + } + } + if (array[i - 1] != delimiter) { + int length = i - start; + if (check(length)) return; + todo = new byte[length]; + System.arraycopy(array, start, todo, 0, length); + } + context.fireChannelReads(out); + } else { + logger.debug("We support heap buffer only."); + } + } else { + context.fireChannelRead(message); + } + } + + /** + * 检查内容长度是否达到最大值。 + * + * @param i 内容长度 + * @return true, 如果达到 + */ + private boolean check(int i) { + boolean result = i > maxLength; + if (result) { + logger.debug("The content length " + i + "exceeds the max length " + maxLength); + } + return result; + } - public DelimiterBasedDecoder(char delimiter) { - this(delimiter, 0); - } - - public DelimiterBasedDecoder(char delimiter, int maxLength) { - if (delimiter > 127) { - throw new IllegalArgumentException("We support ASCII code only."); - } - this.delimiter = (byte) delimiter; - if (maxLength > 0) { - this.maxLength = maxLength; - } else { - this.maxLength = defaultMaxLength; - } - } - - @Override - public void channelRead(Object message, HandlerContext context) { - if (message instanceof ByteBuffer) { - ByteBuffer buffer = (ByteBuffer) message; - if (buffer.hasArray()) { - buffer.flip(); - byte[] array = buffer.array(); - List out = new ArrayList<>(); - int start = 0, i = 0; - for (int l = buffer.limit(); i < l; i++) { - if (array[i] == delimiter) { - byte[] result = null; - if (start == 0 && todo != null) { - //处理上次读取的剩余 - int tl = todo.length, length = tl + i; - if (check(length)) return; - result = new byte[length]; - System.arraycopy(todo, 0, result, 0, tl); - System.arraycopy(array, 0, result, tl, i); - todo = null; - } else { - int length = i - start; - if (check(length)) return; - result = new byte[length]; - System.arraycopy(array, start, result, 0, length); - } - out.add(result); - start = i + 1; - } - } - if (array[i - 1] != delimiter) { - int length = i - start; - if (check(length)) return; - todo = new byte[length]; - System.arraycopy(array, start, todo, 0, length); - } - context.fireChannelReads(out); - } else { - logger.debug("We support heap buffer only."); - } - } else { - context.fireChannelRead(message); - } - } - - /** - * 检查内容长度是否达到最大值 - * - * @param i 内容长度 - * @return true, 如果达到 - */ - private boolean check(int i) { - boolean result = i > maxLength; - if (result) { - logger.debug("The content length " + i + "exceeds the max length " + maxLength); - } - return result; - } - } diff --git a/src/main/java/handler/decoder/LengthFieldBasedDecoder.java b/src/main/java/handler/decoder/LengthFieldBasedDecoder.java index 6c7c7a8..8e436da 100644 --- a/src/main/java/handler/decoder/LengthFieldBasedDecoder.java +++ b/src/main/java/handler/decoder/LengthFieldBasedDecoder.java @@ -14,218 +14,217 @@ /** * 根据指定的长度读取相应的数据, 解决粘包和半包的问题. - * 处理的结果以byte[]的形式传至下一个Handler. - * 此Handler应该配合 {@link HandlerInitializer}使用. + *

处理的结果以byte[]的形式传至下一个Handler,此Handler应该配合{@link HandlerInitializer}使用

. * * @author skywalker * */ public class LengthFieldBasedDecoder extends InBoundHandlerAdapter { - - private static final Logger logger = LoggerFactory.getLogger(LengthFieldBasedDecoder.class); - - //字节序 - private final ByteOrder byteOrder; - //长度标志字节起始位置 - private final int offset; - //长度标志位长度 - private final int length; - //内容的最大长度 - private final int maxLength; - private final int dataOffset; - private State state = State.INIT; - private byte[] todo; - //还需要的数目 - private int needed = 0; - private int neededOffset = 0; - private static final int defaultMaxLength = 2048; - public LengthFieldBasedDecoder(int offset, int length) { - this(offset, length, defaultMaxLength); - } - - public LengthFieldBasedDecoder(ByteOrder byteOrder, int offset, int length) { - this(byteOrder, offset, length, defaultMaxLength); - } + private static final Logger logger = LoggerFactory.getLogger(LengthFieldBasedDecoder.class); - public LengthFieldBasedDecoder(int offset, int length, int maxLength) { - this(ByteOrder.LITTLE_ENDIAN, offset, length, maxLength); - } + //字节序 + private final ByteOrder byteOrder; + //长度标志字节起始位置 + private final int offset; + //长度标志位长度 + private final int length; + //内容的最大长度 + private final int maxLength; + private final int dataOffset; + private State state = State.INIT; + private byte[] todo; + //还需要的数目 + private int needed = 0; + private int neededOffset = 0; + private static final int defaultMaxLength = 2048; - public LengthFieldBasedDecoder(ByteOrder byteOrder, int offset, int length, - int maxLength) { - if (byteOrder == null) byteOrder = ByteOrder.LITTLE_ENDIAN; - this.byteOrder = byteOrder; - this.offset = offset; - this.length = length; - this.dataOffset = offset + length; - this.maxLength = maxLength - this.dataOffset; - } + public LengthFieldBasedDecoder(int offset, int length) { + this(offset, length, defaultMaxLength); + } + + public LengthFieldBasedDecoder(ByteOrder byteOrder, int offset, int length) { + this(byteOrder, offset, length, defaultMaxLength); + } + + public LengthFieldBasedDecoder(int offset, int length, int maxLength) { + this(ByteOrder.LITTLE_ENDIAN, offset, length, maxLength); + } + + public LengthFieldBasedDecoder(ByteOrder byteOrder, int offset, int length, + int maxLength) { + if (byteOrder == null) byteOrder = ByteOrder.LITTLE_ENDIAN; + this.byteOrder = byteOrder; + this.offset = offset; + this.length = length; + this.dataOffset = offset + length; + this.maxLength = maxLength - this.dataOffset; + } + + @Override + public void channelRead(Object message, HandlerContext context) { + if (message instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) message; + if (buffer.hasArray()) { + buffer.flip(); + List out = new ArrayList<>(); + int remaining = buffer.remaining(); + switch (state) { + case INIT: + process(buffer, 0, out); + break; + case HEAD_NEEDED: + //补齐head + if (remaining < needed) { + //头部依然不足 + System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); + neededOffset += remaining; + } else if (remaining == needed) { + System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); + state = State.CONTENT_NEEDED; + int contentLength = bytes2Int(todo, offset, length); + if (contentLength > maxLength) { + logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); + break; + } + byte[] arr = new byte[dataOffset + contentLength]; + System.arraycopy(todo, 0, arr, 0, dataOffset); + todo = arr; + needed = contentLength; + neededOffset = dataOffset; + } else { + System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); + int contentLength = bytes2Int(todo, offset, length); + if (contentLength > maxLength) { + logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); + break; + } + byte[] arr = new byte[dataOffset + contentLength]; + System.arraycopy(todo, 0, arr, 0, dataOffset); + remaining -= dataOffset; + if (remaining < contentLength) { + System.arraycopy(buffer.array(), needed, arr, dataOffset, remaining); + state = State.CONTENT_NEEDED; + todo = arr; + needed = contentLength - remaining; + neededOffset = remaining; + } else if (remaining == contentLength) { + System.arraycopy(buffer.array(), needed, arr, dataOffset, remaining); + out.add(arr); + todo = null; + state = State.INIT; + } else { + System.arraycopy(buffer.array(), needed, arr, dataOffset, contentLength); + out.add(arr); + todo = null; + process(buffer, needed + contentLength, out); + } + } + break; + case CONTENT_NEEDED: + if (remaining < needed) { + //内容不足 + System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); + neededOffset += remaining; + needed -= remaining; + } else if (remaining == needed) { + System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); + state = State.INIT; + out.add(todo); + todo = null; + } else { + System.arraycopy(buffer.array(), 0, todo, neededOffset, needed); + out.add(todo); + process(buffer, needed, out); + } + } + context.fireChannelReads(out); + } else { + logger.debug("We support heap ByteBuffer only."); + } + } else { + context.fireChannelRead(message); + } + } + + /** + * 处理从head开始的一组数据. + * + * @param buffer 缓冲区 + * @param begin 开始处理的位置 + * @param out 结果集 + */ + private void process(ByteBuffer buffer, int begin, List out) { + //读取到的数据总量 + int limit = buffer.limit(), remaining = limit - begin; + byte[] array = buffer.array(); + while (begin < limit) { + //头部不足 + if (remaining < dataOffset) { + byte[] head = new byte[dataOffset]; + System.arraycopy(array, begin, head, 0, remaining); + state = State.HEAD_NEEDED; + needed = dataOffset - remaining; + neededOffset = remaining; + todo = head; + break; + } + int contentLength = bytes2Int(array, begin + offset, length); + if (contentLength > maxLength) { + logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); + break; + } + remaining -= dataOffset; + byte[] result = new byte[dataOffset + contentLength]; + if (remaining < contentLength) { + System.arraycopy(array, begin, result, 0, dataOffset + remaining); + state = State.CONTENT_NEEDED; + todo = result; + needed = contentLength - remaining; + neededOffset = dataOffset + remaining; + break; + } + if (remaining == contentLength) { + System.arraycopy(array, begin, result, 0, dataOffset + remaining); + state = State.INIT; + needed = 0; + todo = null; + out.add(result); + break; + } + int total = dataOffset + contentLength; + System.arraycopy(array, begin, result, 0, total); + out.add(result); + remaining -= contentLength; + begin += total; + } + } + + /** + * 将byte数组转为int值. + * + * @param data byte数组 + * @return int + */ + private int bytes2Int(byte[] data, int offset, int length) { + int result = 0; + if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + for (int i = 0; i < length; i++) { + result |= ((data[offset + i] & 0xff) << (i << 3)); + } + } else { + for (int i = 0; i < length; i++) { + result |= ((data[offset + i] & 0xff) << ((length - 1 - i) << 3)); + } + } + return result; + } + + private enum State { + //初始状态,需要读取头部 + INIT, + CONTENT_NEEDED, + HEAD_NEEDED + } - @Override - public void channelRead(Object message, HandlerContext context) { - if (message instanceof ByteBuffer) { - ByteBuffer buffer = (ByteBuffer) message; - if (buffer.hasArray()) { - buffer.flip(); - List out = new ArrayList<>(); - int remaining = buffer.remaining(); - switch (state) { - case INIT: - process(buffer, 0, out); - break; - case HEAD_NEEDED: - //补齐head - if (remaining < needed) { - //头部依然不足 - System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); - neededOffset += remaining; - } else if (remaining == needed) { - System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); - state = State.CONTENT_NEEDED; - int contentLength = bytes2Int(todo, offset, length); - if (contentLength > maxLength) { - logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); - break; - } - byte[] arr = new byte[dataOffset + contentLength]; - System.arraycopy(todo, 0, arr, 0, dataOffset); - todo = arr; - needed = contentLength; - neededOffset = dataOffset; - } else { - System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); - int contentLength = bytes2Int(todo, offset, length); - if (contentLength > maxLength) { - logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); - break; - } - byte[] arr = new byte[dataOffset + contentLength]; - System.arraycopy(todo, 0, arr, 0, dataOffset); - remaining -= dataOffset; - if (remaining < contentLength) { - System.arraycopy(buffer.array(), needed, arr, dataOffset, remaining); - state = State.CONTENT_NEEDED; - todo = arr; - needed = contentLength - remaining; - neededOffset = remaining; - } else if (remaining == contentLength) { - System.arraycopy(buffer.array(), needed, arr, dataOffset, remaining); - out.add(arr); - todo = null; - state = State.INIT; - } else { - System.arraycopy(buffer.array(), needed, arr, dataOffset, contentLength); - out.add(arr); - todo = null; - process(buffer, needed + contentLength, out); - } - } - break; - case CONTENT_NEEDED: - if (remaining < needed) { - //内容不足 - System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); - neededOffset += remaining; - needed -= remaining; - } else if (remaining == needed) { - System.arraycopy(buffer.array(), 0, todo, neededOffset, remaining); - state = State.INIT; - out.add(todo); - todo = null; - } else { - System.arraycopy(buffer.array(), 0, todo, neededOffset, needed); - out.add(todo); - process(buffer, needed, out); - } - } - context.fireChannelReads(out); - } else { - logger.debug("We support heap ByteBuffer only."); - } - } else { - context.fireChannelRead(message); - } - } - - /** - * 处理从head开始的一组数据 - * - * @param buffer 缓冲区 - * @param begin 开始处理的位置 - * @param out 结果集 - */ - private void process(ByteBuffer buffer, int begin, List out) { - //读取到的数据总量 - int limit = buffer.limit(), remaining = limit - begin; - byte[] array = buffer.array(); - while (begin < limit) { - //头部不足 - if (remaining < dataOffset) { - byte[] head = new byte[dataOffset]; - System.arraycopy(array, begin, head, 0, remaining); - state = State.HEAD_NEEDED; - needed = dataOffset - remaining; - neededOffset = remaining; - todo = head; - break; - } - int contentLength = bytes2Int(array, begin + offset, length); - if (contentLength > maxLength) { - logger.debug("The content length " + contentLength + "exceeds the max length " + maxLength); - break; - } - remaining -= dataOffset; - byte[] result = new byte[dataOffset + contentLength]; - if (remaining < contentLength) { - System.arraycopy(array, begin, result, 0, dataOffset + remaining); - state = State.CONTENT_NEEDED; - todo = result; - needed = contentLength - remaining; - neededOffset = dataOffset + remaining; - break; - } - if (remaining == contentLength) { - System.arraycopy(array, begin, result, 0, dataOffset + remaining); - state = State.INIT; - needed = 0; - todo = null; - out.add(result); - break; - } - int total = dataOffset + contentLength; - System.arraycopy(array, begin, result, 0, total); - out.add(result); - remaining -= contentLength; - begin += total; - } - } - - /** - * 将byte数组转为int值 - * - * @param data byte数组 - * @return int - */ - private int bytes2Int(byte[] data, int offset, int length) { - int result = 0; - if (byteOrder == ByteOrder.LITTLE_ENDIAN) { - for (int i = 0; i < length; i++) { - result |= ((data[offset + i] & 0xff) << (i << 3)); - } - } else { - for (int i = 0; i < length; i++) { - result |= ((data[offset + i] & 0xff) << ((length - 1 - i) << 3)); - } - } - return result; - } - - private static enum State { - //初始状态,需要读取头部 - INIT, - CONTENT_NEEDED, - HEAD_NEEDED - } - } diff --git a/src/main/java/handler/decoder/LineBasedDecoder.java b/src/main/java/handler/decoder/LineBasedDecoder.java index f46d953..c524d82 100644 --- a/src/main/java/handler/decoder/LineBasedDecoder.java +++ b/src/main/java/handler/decoder/LineBasedDecoder.java @@ -10,15 +10,15 @@ * */ public class LineBasedDecoder extends DelimiterBasedDecoder { - - public LineBasedDecoder() { - //bug? - //DelimiterBasedDecoder只支持ASCII码,而\r\n不是一个字符,所以暂且这样处理 - this('\n'); - } - private LineBasedDecoder(char delimiter) { - super(delimiter); - } - + public LineBasedDecoder() { + //bug? + //DelimiterBasedDecoder只支持ASCII码,而\r\n不是一个字符,所以暂且这样处理 + this('\n'); + } + + private LineBasedDecoder(char delimiter) { + super(delimiter); + } + } \ No newline at end of file diff --git a/src/main/java/handler/decoder/StringDecoder.java b/src/main/java/handler/decoder/StringDecoder.java index 2b1ea01..c533ed6 100644 --- a/src/main/java/handler/decoder/StringDecoder.java +++ b/src/main/java/handler/decoder/StringDecoder.java @@ -10,48 +10,47 @@ import handler.InBoundHandlerAdapter; /** - * 将ByteBuffer或byte[]转为字符串 - * 仅支持Heap ByteBuffer + * 将ByteBuffer或byte[]转为字符串, 仅支持Heap ByteBuffer. * * @author skywalker * */ public class StringDecoder extends InBoundHandlerAdapter { - - private static final Charset defaultCharSet = Charset.forName("UTF-8"); - private static final Logger logger = LoggerFactory.getLogger(StringDecoder.class); - private final Charset charset; - - public StringDecoder() { - this(null); - } - - public StringDecoder(Charset charset) { - if (charset != null) { - this.charset = charset; - } else { - this.charset = defaultCharSet; - } - } - - @Override - public void channelRead(Object message, HandlerContext context) { - if (message == null) return; - byte[] array = null; - if (message instanceof ByteBuffer) { - ByteBuffer buffer = (ByteBuffer) message; - if (buffer.hasArray()) { - array = buffer.array(); - } else { - logger.debug("We support heap ByteBuffer only."); - } - } else if (message instanceof byte[]) { - array = (byte[]) message; - } - if (array != null) { - message = new String(array, charset); - } - context.fireChannelRead(message); - } - + + private static final Charset defaultCharSet = Charset.forName("UTF-8"); + private static final Logger logger = LoggerFactory.getLogger(StringDecoder.class); + private final Charset charset; + + public StringDecoder() { + this(null); + } + + public StringDecoder(Charset charset) { + if (charset != null) { + this.charset = charset; + } else { + this.charset = defaultCharSet; + } + } + + @Override + public void channelRead(Object message, HandlerContext context) { + if (message == null) return; + byte[] array = null; + if (message instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) message; + if (buffer.hasArray()) { + array = buffer.array(); + } else { + logger.debug("We support heap ByteBuffer only."); + } + } else if (message instanceof byte[]) { + array = (byte[]) message; + } + if (array != null) { + message = new String(array, charset); + } + context.fireChannelRead(message); + } + } diff --git a/src/main/java/handler/encoder/StringEncoder.java b/src/main/java/handler/encoder/StringEncoder.java index 799a3ca..3a0a6b9 100644 --- a/src/main/java/handler/encoder/StringEncoder.java +++ b/src/main/java/handler/encoder/StringEncoder.java @@ -6,20 +6,20 @@ import handler.OutBoundHandlerAdapter; /** - * 将String转为{@link ByteBuffer} + * 将String转为{@link ByteBuffer}. * * @author skywalker * */ public class StringEncoder extends OutBoundHandlerAdapter { - @Override - public void channelWrite(Object message, HandlerContext context) { - if (message instanceof String) { - context.fireChannelWrite(ByteBuffer.wrap(((String) message).getBytes())); - } else { - context.fireChannelWrite(message); - } - } - + @Override + public void channelWrite(Object message, HandlerContext context) { + if (message instanceof String) { + context.fireChannelWrite(ByteBuffer.wrap(((String) message).getBytes())); + } else { + context.fireChannelWrite(message); + } + } + } diff --git a/src/main/java/lifecycle/LifeCycle.java b/src/main/java/lifecycle/LifeCycle.java index 326a442..af6814a 100644 --- a/src/main/java/lifecycle/LifeCycle.java +++ b/src/main/java/lifecycle/LifeCycle.java @@ -1,7 +1,15 @@ package lifecycle; +/** + * 组件的生命周期. + * + * @author skywalker + */ public interface LifeCycle { - public void start(); - + /** + * 启动. + */ + void start(); + } diff --git a/src/main/java/manager/AbstractChooseStrategy.java b/src/main/java/manager/AbstractChooseStrategy.java index 652b251..309b24c 100644 --- a/src/main/java/manager/AbstractChooseStrategy.java +++ b/src/main/java/manager/AbstractChooseStrategy.java @@ -5,46 +5,44 @@ import java.util.concurrent.locks.ReentrantLock; /** - * 为ChooseStrategy提供骨架功能 - * - * @author skywalker + * 为ChooseStrategy提供骨架功能. * + * @author skywalker */ public abstract class AbstractChooseStrategy implements ChooseStrategy { - protected List candidates; - protected int index = 0; - protected int length; - private final Lock lock = new ReentrantLock(); + protected List candidates; + protected int index = 0; + protected int length; + private final Lock lock = new ReentrantLock(); + + @Override + public final T choose(Object param) { + lock.lock(); + T result; + try { + result = doChoose(param); + ++index; + if (index >= length) + index = 0; + } finally { + lock.unlock(); + } + return result; + } - @Override - public final T choose(Object param) { - lock.lock(); - T result; - try { - result = doChoose(param); - ++index; - if (index >= length) - index = 0; - } finally { - lock.unlock(); - } - return result; - } - - @Override - public void setCandidates(List candidates) { - this.candidates = candidates; - this.length = candidates.size(); - } + @Override + public void setCandidates(List candidates) { + this.candidates = candidates; + this.length = candidates.size(); + } - /** - * 实际选择操作,子类实现 - * - * @param param - * {@link Object} 参数 - * @return T - */ - public abstract T doChoose(Object param); + /** + * 实际选择操作, 子类实现. + * + * @param param {@link Object} 参数 + * @return T + */ + public abstract T doChoose(Object param); } diff --git a/src/main/java/manager/AbstractManager.java b/src/main/java/manager/AbstractManager.java index a85390f..f73fd6d 100644 --- a/src/main/java/manager/AbstractManager.java +++ b/src/main/java/manager/AbstractManager.java @@ -7,58 +7,58 @@ import lifecycle.LifeCycle; /** - * Manager骨架实现 - * - * @author skywalker + * {@link Manager}骨架实现. * + * @author skywalker */ public abstract class AbstractManager implements Manager { - protected List candidates; - protected final ExecutorService executor; - private ChooseStrategy chooseStrategy; - private final int s; - - public AbstractManager(int s, ExecutorService executor) { - this(s, executor, null); - } - - public AbstractManager(int s, ExecutorService executor, ChooseStrategy chooseStrategy) { - if (s < 1) { - throw new IllegalArgumentException("The candidates count cant't be less than 1."); - } - candidates = new ArrayList(s); - this.s = s; - this.executor = executor; - if (chooseStrategy != null) { - this.chooseStrategy = chooseStrategy; - } else { - this.chooseStrategy = new DefaultChooseStrategy(); - } - } - - @Override - public void start() { - for (int i = 0; i < s; i++) { - T candidate = newCandidate(); - candidates.add(candidate); - if (candidate instanceof LifeCycle) { - LifeCycle lifeCycle = (LifeCycle) candidate; - lifeCycle.start(); - } - } - chooseStrategy.setCandidates(candidates); - } - - /** - * 生成一个候选人 - * @return - */ - protected abstract T newCandidate(); - - @Override - public T chooseOne(Object param) { - return chooseStrategy.choose(param); - } - + protected List candidates; + protected final ExecutorService executor; + private ChooseStrategy chooseStrategy; + private final int s; + + public AbstractManager(int s, ExecutorService executor) { + this(s, executor, null); + } + + public AbstractManager(int s, ExecutorService executor, ChooseStrategy chooseStrategy) { + if (s < 1) { + throw new IllegalArgumentException("The candidates count cant't be less than 1."); + } + candidates = new ArrayList(s); + this.s = s; + this.executor = executor; + if (chooseStrategy != null) { + this.chooseStrategy = chooseStrategy; + } else { + this.chooseStrategy = new DefaultChooseStrategy(); + } + } + + @Override + public void start() { + for (int i = 0; i < s; i++) { + T candidate = newCandidate(); + candidates.add(candidate); + if (candidate instanceof LifeCycle) { + LifeCycle lifeCycle = (LifeCycle) candidate; + lifeCycle.start(); + } + } + chooseStrategy.setCandidates(candidates); + } + + /** + * 生成一个候选人. + * + * @return 候选者 + */ + protected abstract T newCandidate(); + + @Override + public T chooseOne(Object param) { + return chooseStrategy.choose(param); + } + } diff --git a/src/main/java/manager/ChooseStrategy.java b/src/main/java/manager/ChooseStrategy.java index 6964d89..52ce58d 100644 --- a/src/main/java/manager/ChooseStrategy.java +++ b/src/main/java/manager/ChooseStrategy.java @@ -3,14 +3,14 @@ import java.util.List; /** - * 线程选取策略 - * @author skywalker + * 线程选取策略。 * + * @author skywalker */ public interface ChooseStrategy { - public T choose(Object param); - - public void setCandidates(List candidates); - + T choose(Object param); + + void setCandidates(List candidates); + } diff --git a/src/main/java/manager/DefaultChooseStrategy.java b/src/main/java/manager/DefaultChooseStrategy.java index 809fc5b..da5c2cd 100644 --- a/src/main/java/manager/DefaultChooseStrategy.java +++ b/src/main/java/manager/DefaultChooseStrategy.java @@ -1,16 +1,15 @@ package manager; /** - * 默认线程选取策略--简单的递增 - * @author skywalker + * 默认线程选取策略--简单的递增。 * - * @param + * @author skywalker */ public class DefaultChooseStrategy extends AbstractChooseStrategy { - @Override - public T doChoose(Object param) { - return candidates.get(index); - } - + @Override + public T doChoose(Object param) { + return candidates.get(index); + } + } diff --git a/src/main/java/manager/Manager.java b/src/main/java/manager/Manager.java index 1693052..512b83d 100644 --- a/src/main/java/manager/Manager.java +++ b/src/main/java/manager/Manager.java @@ -3,16 +3,15 @@ import lifecycle.LifeCycle; /** - * 对一组线程进行管理 - * @author skywalker + * 对一组线程进行管理。 * + * @author skywalker */ public interface Manager extends LifeCycle { - /** - * 从管理的线程中选取一个 - * @return - */ - public T chooseOne(Object param); - + /** + * 从管理的线程中选取一个。 + */ + T chooseOne(Object param); + } diff --git a/src/main/java/manager/PortBasedChooseStrategy.java b/src/main/java/manager/PortBasedChooseStrategy.java index 77e139e..afbcbf6 100644 --- a/src/main/java/manager/PortBasedChooseStrategy.java +++ b/src/main/java/manager/PortBasedChooseStrategy.java @@ -10,33 +10,33 @@ import worker.Worker; /** - * 基于远程端口的选取策略,可以保证同一个连接的事件被分发到同一个线程 + * 基于远程端口的选取策略,可以保证同一个连接的事件被分发到同一个线程。 * * @author skywalker * */ public class PortBasedChooseStrategy extends AbstractChooseStrategy { - private static final Logger logger = LoggerFactory.getLogger(PortBasedChooseStrategy.class); + private static final Logger logger = LoggerFactory.getLogger(PortBasedChooseStrategy.class); - @Override - public Worker doChoose(Object param) { - if (!(param instanceof SocketChannel)) { - throw new IllegalArgumentException( - "The param must be SocketChannel."); - } - Worker worker; - try { - SocketChannel channel = (SocketChannel) param; - InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); - int port = address.getPort(); - worker = candidates.get(port % length); - } catch (IOException e) { - logger.debug("Remote connection has closed."); - // 采用简单的递增策略 - worker = candidates.get(index); - } - return worker; - } + @Override + public Worker doChoose(Object param) { + if (!(param instanceof SocketChannel)) { + throw new IllegalArgumentException( + "The param must be SocketChannel."); + } + Worker worker; + try { + SocketChannel channel = (SocketChannel) param; + InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); + int port = address.getPort(); + worker = candidates.get(port % length); + } catch (IOException e) { + logger.debug("Remote connection has closed."); + // 采用简单的递增策略 + worker = candidates.get(index); + } + return worker; + } } diff --git a/src/main/java/selector/QueuedSelector.java b/src/main/java/selector/QueuedSelector.java index d4b32fa..cd0d9fe 100644 --- a/src/main/java/selector/QueuedSelector.java +++ b/src/main/java/selector/QueuedSelector.java @@ -25,215 +25,208 @@ import lifecycle.LifeCycle; /** - * 拥有自己的工作队列的Selector - * - * @author skywalker + * 拥有自己的工作队列的Selector. * + * @author skywalker */ public final class QueuedSelector implements Runnable, LifeCycle { - private Selector selector; - private final ArrayDeque jobs; - private final static int defaultQueueSize = 100; - // 默认ByteBuffer分配大小 - private static final int defaultAllocateSize = 1024; - private final ExecutorService executor; - private boolean closed = false; - private static final Logger logger = LoggerFactory - .getLogger(QueuedSelector.class); - private final Runnable eventProcessor = new EventProcessor(); - private final Lock lock = new ReentrantLock(); - private SelectorManager selectorManager; - private WorkerManager workerManager; - - public QueuedSelector(ExecutorService executor) { - this(0, executor); - } - - public QueuedSelector(int capacity, ExecutorService executor) { - if (capacity < 1) - capacity = defaultQueueSize; - jobs = new ArrayDeque<>(defaultQueueSize); - this.executor = executor; - } - - public void setSelectorManager(SelectorManager selectorManager) { - this.selectorManager = selectorManager; - } - - public void setWorkerManager(WorkerManager workerManager) { - this.workerManager = workerManager; - } - - /** - * 启动Selector - */ - @Override - public void start() { - try { - selector = Selector.open(); - executor.execute(this); - } catch (IOException e) { - logger.error("Selector open failed: " + e.getMessage()); - } - } - - /** - * Register the channel to the selector with the interested ops. - * - * @return boolean 是否提交成功 - */ - public boolean register(SocketChannel channel, int ops) { - Register register = new Register(channel, ops); - boolean result = false; - lock.lock(); - try { - result = jobs.offer(register); - } finally { - lock.unlock(); - } - if (result) { - // 唤醒Selector - selector.wakeup(); - } - return result; - } - - @Override - public void run() { - while (!closed) { - Runnable task; - lock.lock(); - try { - task = jobs.poll(); - } finally { - lock.unlock(); - } - if (task == null) - eventProcessor.run(); - else - task.run(); - } - } - - /** - * 向Selector注册感兴趣的事件 - * - * @author skywalker - * - */ - private class Register implements Runnable { - - private final SocketChannel channel; - private final int ops; - - public Register(SocketChannel channel, int ops) { - this.channel = channel; - this.ops = ops; - } - - @Override - public void run() { - try { - SelectionKey key = channel.register(selector, ops); - // fire the ChannelActiveEvent - HandlerChain handlerChain = selectorManager.getHandlerChain(); - HandlerContext context = new HandlerContext( - handlerChain.getInBoundHandlers(), - handlerChain.getOutBoundHandlers(), true); - context.setChannel(channel); - context.setWorkerManager(workerManager); - key.attach(context); - workerManager.chooseOne(channel).submit(new ChannelActiveEvent(context)); - } catch (ClosedChannelException e) { - logger.error("Channel register failed: " + e.getMessage()); - } - } - - } - - /** - * 处理Selector事件 - * - * @author skywalker - * - */ - private class EventProcessor implements Runnable { - - @Override - public void run() { - try { - int i = selector.select(); - if (i > 0) { - Set keys = selector.selectedKeys(); - for (SelectionKey key : keys) { - if (key.isReadable()) { - SocketChannel channel = (SocketChannel) key - .channel(); - ByteBuffer buffer = ByteBuffer - .allocate(defaultAllocateSize); - int n = channel.read(buffer); - if (n == -1) { - processInActive(key); - } else { - processRead(buffer, key); - } - } - } - keys.clear(); - } - } catch (IOException e) { - logger.debug("Selector was closed."); - closed = true; - } - } - - /** - * 如果SelectionKey.attachment()返回空,那么重新构造一个HandlerContext, 否则使用原有的。 - * - * @param attachment - * {@link Object} - * @param channel - * {@link SocketChannel} - * @return {@link HandlerContext} - */ - private HandlerContext checkAttachment(Object attachment, SocketChannel channel) { - HandlerContext context; - if (attachment == null) { - HandlerChain handlerChain = selectorManager.getHandlerChain(); - context = new HandlerContext(handlerChain.getInBoundHandlers(), - handlerChain.getOutBoundHandlers(), true); - context.setChannel(channel); - context.setWorkerManager(workerManager); - } else { - context = (HandlerContext) attachment; - } - return context; - } - - /** - * 客户端断开连接 - */ - private void processInActive(SelectionKey key) { - HandlerContext context = checkAttachment(key.attachment(), (SocketChannel) key.channel()); - workerManager.chooseOne(context.getChannel()).submit(new ChannelInActiveEvent(context)); - key.cancel(); - } - - /** - * 处理读事件 - * - * @param buffer - * {@link ByteBuffer} 读取的数据 - * @param key - * {@link SelectionKey} - * @throws IOException - */ - private void processRead(ByteBuffer buffer, SelectionKey key) - throws IOException { - HandlerContext context = checkAttachment(key.attachment(), (SocketChannel) key.channel()); - workerManager.chooseOne(context.getChannel()).submit(new ChannelReadEvent(context, buffer)); - } - } + private Selector selector; + private final ArrayDeque jobs; + private final static int defaultQueueSize = 100; + // 默认ByteBuffer分配大小 + private static final int defaultAllocateSize = 1024; + private final ExecutorService executor; + private boolean closed = false; + private static final Logger logger = LoggerFactory + .getLogger(QueuedSelector.class); + private final Runnable eventProcessor = new EventProcessor(); + private final Lock lock = new ReentrantLock(); + private SelectorManager selectorManager; + private WorkerManager workerManager; + + public QueuedSelector(ExecutorService executor) { + this(0, executor); + } + + public QueuedSelector(int capacity, ExecutorService executor) { + if (capacity < 1) + capacity = defaultQueueSize; + jobs = new ArrayDeque<>(defaultQueueSize); + this.executor = executor; + } + + public void setSelectorManager(SelectorManager selectorManager) { + this.selectorManager = selectorManager; + } + + public void setWorkerManager(WorkerManager workerManager) { + this.workerManager = workerManager; + } + + /** + * 启动Selector. + */ + @Override + public void start() { + try { + selector = Selector.open(); + executor.execute(this); + } catch (IOException e) { + logger.error("Selector open failed: " + e.getMessage()); + } + } + + /** + * Register the channel to the selector with the interested ops. + * + * @return boolean 是否提交成功 + */ + public boolean register(SocketChannel channel, int ops) { + Register register = new Register(channel, ops); + boolean result = false; + lock.lock(); + try { + result = jobs.offer(register); + } finally { + lock.unlock(); + } + if (result) { + // 唤醒Selector + selector.wakeup(); + } + return result; + } + + @Override + public void run() { + while (!closed) { + Runnable task; + lock.lock(); + try { + task = jobs.poll(); + } finally { + lock.unlock(); + } + if (task == null) + eventProcessor.run(); + else + task.run(); + } + } + + /** + * 向Selector注册感兴趣的事件. + * + * @author skywalker + */ + private class Register implements Runnable { + + private final SocketChannel channel; + private final int ops; + + public Register(SocketChannel channel, int ops) { + this.channel = channel; + this.ops = ops; + } + + @Override + public void run() { + try { + SelectionKey key = channel.register(selector, ops); + // fire the ChannelActiveEvent + HandlerChain handlerChain = selectorManager.getHandlerChain(); + HandlerContext context = new HandlerContext( + handlerChain.getInBoundHandlers(), + handlerChain.getOutBoundHandlers(), true); + context.setChannel(channel); + context.setWorkerManager(workerManager); + key.attach(context); + workerManager.chooseOne(channel).submit(new ChannelActiveEvent(context)); + } catch (ClosedChannelException e) { + logger.error("Channel register failed: " + e.getMessage()); + } + } + + } + + /** + * 处理Selector事件. + * + * @author skywalker + */ + private class EventProcessor implements Runnable { + + @Override + public void run() { + try { + int i = selector.select(); + if (i > 0) { + Set keys = selector.selectedKeys(); + for (SelectionKey key : keys) { + if (key.isReadable()) { + SocketChannel channel = (SocketChannel) key + .channel(); + ByteBuffer buffer = ByteBuffer + .allocate(defaultAllocateSize); + int n = channel.read(buffer); + if (n == -1) { + processInActive(key); + } else { + processRead(buffer, key); + } + } + } + keys.clear(); + } + } catch (IOException e) { + logger.debug("Selector was closed."); + closed = true; + } + } + + /** + * 如果SelectionKey.attachment()返回空,那么重新构造一个HandlerContext, 否则使用原有的。 + * + * @param attachment {@link Object} + * @param channel {@link SocketChannel} + * @return {@link HandlerContext} + */ + private HandlerContext checkAttachment(Object attachment, SocketChannel channel) { + HandlerContext context; + if (attachment == null) { + HandlerChain handlerChain = selectorManager.getHandlerChain(); + context = new HandlerContext(handlerChain.getInBoundHandlers(), + handlerChain.getOutBoundHandlers(), true); + context.setChannel(channel); + context.setWorkerManager(workerManager); + } else { + context = (HandlerContext) attachment; + } + return context; + } + + /** + * 客户端断开连接. + */ + private void processInActive(SelectionKey key) { + HandlerContext context = checkAttachment(key.attachment(), (SocketChannel) key.channel()); + workerManager.chooseOne(context.getChannel()).submit(new ChannelInActiveEvent(context)); + key.cancel(); + } + + /** + * 处理读事件. + * + * @param buffer {@link ByteBuffer} 读取的数据 + * @param key {@link SelectionKey} + * @throws IOException + */ + private void processRead(ByteBuffer buffer, SelectionKey key) + throws IOException { + HandlerContext context = checkAttachment(key.attachment(), (SocketChannel) key.channel()); + workerManager.chooseOne(context.getChannel()).submit(new ChannelReadEvent(context, buffer)); + } + } } diff --git a/src/main/java/selector/SelectorManager.java b/src/main/java/selector/SelectorManager.java index 3fdb15d..6e912b9 100644 --- a/src/main/java/selector/SelectorManager.java +++ b/src/main/java/selector/SelectorManager.java @@ -9,43 +9,43 @@ import manager.ChooseStrategy; /** - * Selector管理器,负责对Selector的启动、负载均衡处理 + * Selector管理器,负责对Selector的启动、负载均衡处理. * * @author skywalker * */ public class SelectorManager extends AbstractManager { - - private HandlerChain handlerChain; - private WorkerManager workerManager; - - public SelectorManager(int s, ExecutorService executor) { - super(s, executor); - } - - public SelectorManager(int s, ExecutorService executor, - ChooseStrategy chooseStrategy) { - super(s, executor, chooseStrategy); - } - - public void setHandlerChain(HandlerChain handlerChain) { - this.handlerChain = handlerChain; - } - - public HandlerChain getHandlerChain() { - return handlerChain; - } - - public void setWorkerManager(WorkerManager workerManager) { - this.workerManager = workerManager; - } - - @Override - protected QueuedSelector newCandidate() { - QueuedSelector selector = new QueuedSelector(executor); - selector.setSelectorManager(this); - selector.setWorkerManager(workerManager); - return selector; - } - + + private HandlerChain handlerChain; + private WorkerManager workerManager; + + public SelectorManager(int s, ExecutorService executor) { + super(s, executor); + } + + public SelectorManager(int s, ExecutorService executor, + ChooseStrategy chooseStrategy) { + super(s, executor, chooseStrategy); + } + + public void setHandlerChain(HandlerChain handlerChain) { + this.handlerChain = handlerChain; + } + + public HandlerChain getHandlerChain() { + return handlerChain; + } + + public void setWorkerManager(WorkerManager workerManager) { + this.workerManager = workerManager; + } + + @Override + protected QueuedSelector newCandidate() { + QueuedSelector selector = new QueuedSelector(executor); + selector.setSelectorManager(this); + selector.setWorkerManager(workerManager); + return selector; + } + } diff --git a/src/main/java/server/Server.java b/src/main/java/server/Server.java index 9840579..4e8c093 100644 --- a/src/main/java/server/Server.java +++ b/src/main/java/server/Server.java @@ -19,125 +19,123 @@ import worker.WorkerManager; /** - * Server - * - * @author skywalker + * Server. * + * @author skywalker */ public final class Server implements LifeCycle { - private final ExecutorService executor; - // 默认以阻塞的方式监听 - private boolean block = true; - private ServerSocketChannel serverSocketChannel; - private final SelectorManager selectorManager; - private final WorkerManager workerManager; - private final int acceptors; - private final HandlerChain handlerChain; - private final static Logger logger = LoggerFactory.getLogger(Server.class); + private final ExecutorService executor; + // 默认以阻塞的方式监听 + private boolean block = true; + private ServerSocketChannel serverSocketChannel; + private final SelectorManager selectorManager; + private final WorkerManager workerManager; + private final int acceptors; + private final HandlerChain handlerChain; + private final static Logger logger = LoggerFactory.getLogger(Server.class); - public Server() { - this(0, 0); - } + public Server() { + this(0, 0); + } - public Server(int acceptors, int workers) { - int cores = Runtime.getRuntime().availableProcessors(); - if (acceptors <= 1) { - acceptors = Math.max(2, Math.min(4, cores / 8)); - } - if (workers <= 0) { - workers = Math.max(2, Math.min(4, cores / 8)); - } - if (acceptors > cores) { - throw new IllegalArgumentException( - "The acceptors count should be less than cores."); - } - if (workers > cores) { - throw new IllegalArgumentException( - "The workers count should be less than cores."); - } - executor = Executors.newFixedThreadPool(acceptors + workers); - int selectors = (acceptors /= 2); - selectorManager = new SelectorManager(selectors, executor); - workerManager = new WorkerManager(workers, executor, new PortBasedChooseStrategy()); - this.acceptors = acceptors; - this.handlerChain = new HandlerChain(); - selectorManager.setHandlerChain(handlerChain); - selectorManager.setWorkerManager(workerManager); - } + public Server(int acceptors, int workers) { + int cores = Runtime.getRuntime().availableProcessors(); + if (acceptors <= 1) { + acceptors = Math.max(2, Math.min(4, cores / 8)); + } + if (workers <= 0) { + workers = Math.max(2, Math.min(4, cores / 8)); + } + if (acceptors > cores) { + throw new IllegalArgumentException( + "The acceptors count should be less than cores."); + } + if (workers > cores) { + throw new IllegalArgumentException( + "The workers count should be less than cores."); + } + executor = Executors.newFixedThreadPool(acceptors + workers); + int selectors = (acceptors /= 2); + selectorManager = new SelectorManager(selectors, executor); + workerManager = new WorkerManager(workers, executor, new PortBasedChooseStrategy()); + this.acceptors = acceptors; + this.handlerChain = new HandlerChain(); + selectorManager.setHandlerChain(handlerChain); + selectorManager.setWorkerManager(workerManager); + } - /** - * 设置监听方式 - */ - public Server configureBlocking(boolean block) { - this.block = block; - return this; - } + /** + * 设置监听方式. + */ + public Server configureBlocking(boolean block) { + this.block = block; + return this; + } - /** - * 监听到指定的端口 - */ - public Server bind(int port) { - if (port < 1) - throw new IllegalArgumentException( - "The param port can't be negative."); - try { - serverSocketChannel = ServerSocketChannel.open(); - serverSocketChannel.socket().setReuseAddress(true); - serverSocketChannel.bind(new InetSocketAddress(port)); - serverSocketChannel.configureBlocking(block); - } catch (IOException e) { - logger.error("The port " + port + "bind failed."); - System.exit(1); - } - return this; - } + /** + * 监听到指定的端口. + */ + public Server bind(int port) { + if (port < 1) + throw new IllegalArgumentException( + "The param port can't be negative."); + try { + serverSocketChannel = ServerSocketChannel.open(); + serverSocketChannel.socket().setReuseAddress(true); + serverSocketChannel.bind(new InetSocketAddress(port)); + serverSocketChannel.configureBlocking(block); + } catch (IOException e) { + logger.error("The port " + port + "bind failed."); + System.exit(1); + } + return this; + } - public Server setHandlers(Handler... handlers) { - for (int i = 0, l = handlers.length; i < l; i++) { - handlerChain.addHandler(handlers[i]); - } - return this; - } + public Server setHandlers(Handler... handlers) { + for (int i = 0, l = handlers.length; i < l; i++) { + handlerChain.addHandler(handlers[i]); + } + return this; + } - /** - * 启动服务器 - */ - @Override - public void start() { - workerManager.start(); - selectorManager.start(); - for (int i = 0; i < acceptors; i++) - executor.execute(new Acceptor()); - logger.info("Server starts successfully."); - } + /** + * 启动服务器 + */ + @Override + public void start() { + workerManager.start(); + selectorManager.start(); + for (int i = 0; i < acceptors; i++) + executor.execute(new Acceptor()); + logger.info("Server starts successfully."); + } - /** - * 接收客户端连接 - * - * @author skywalker - * - */ - private class Acceptor implements Runnable { + /** + * 接收客户端连接. + * + * @author skywalker + */ + private class Acceptor implements Runnable { - @Override - public void run() { - while (true) { - try { - SocketChannel channel = serverSocketChannel.accept(); - channel.configureBlocking(false); - boolean result = selectorManager.chooseOne(null).register( - channel, SelectionKey.OP_READ); - if (!result) { - // register operation failed - logger.debug("Register channel to selector failed."); - } - } catch (IOException e) { - logger.debug("Client accepted failded: " + e.getMessage()); - } - } - } + @Override + public void run() { + while (true) { + try { + SocketChannel channel = serverSocketChannel.accept(); + channel.configureBlocking(false); + boolean result = selectorManager.chooseOne(null).register( + channel, SelectionKey.OP_READ); + if (!result) { + // register operation failed + logger.debug("Register channel to selector failed."); + } + } catch (IOException e) { + logger.debug("Client accepted failed: " + e.getMessage()); + } + } + } - } + } } diff --git a/src/main/java/worker/Worker.java b/src/main/java/worker/Worker.java index 62c2ee1..e529970 100644 --- a/src/main/java/worker/Worker.java +++ b/src/main/java/worker/Worker.java @@ -4,60 +4,61 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import handler.Handler; import lifecycle.LifeCycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * 负责Handler链的调用执行 - * - * @author skywalker + * 负责{@link Handler}链的调用执行. * + * @author skywalker */ public class Worker implements Runnable, LifeCycle { - private final BlockingQueue jobs; - private static final int defaultQueueSize = 100; - private final ExecutorService executors; - private static final Logger logger = LoggerFactory.getLogger(Worker.class); - - protected Worker(ExecutorService executors) { - this(0, executors); - } - - protected Worker(int queueSize, ExecutorService executors) { - if (queueSize < 1) { - queueSize = defaultQueueSize; - } - this.jobs = new ArrayBlockingQueue<>(queueSize); - this.executors = executors; - } - - @Override - public void start() { - executors.execute(this); - } - - /** - * 向队列添加任务 - * @param task - * @return true,如果添加成功 - */ - public boolean submit(Runnable task) { - return jobs.offer(task); - } - - @Override - public void run() { - try { - while (true) { - Runnable task = jobs.take(); - task.run(); - } - } catch (InterruptedException e) { - logger.error("Thread was interrupted."); - } - } - + private final BlockingQueue jobs; + private static final int defaultQueueSize = 100; + private final ExecutorService executors; + private static final Logger logger = LoggerFactory.getLogger(Worker.class); + + protected Worker(ExecutorService executors) { + this(0, executors); + } + + protected Worker(int queueSize, ExecutorService executors) { + if (queueSize < 1) { + queueSize = defaultQueueSize; + } + this.jobs = new ArrayBlockingQueue<>(queueSize); + this.executors = executors; + } + + @Override + public void start() { + executors.execute(this); + } + + /** + * 向队列添加任务. + * + * @param task {@link Runnable} 任务 + * @return true,如果添加成功 + */ + public boolean submit(Runnable task) { + return jobs.offer(task); + } + + @Override + public void run() { + try { + while (true) { + Runnable task = jobs.take(); + task.run(); + } + } catch (InterruptedException e) { + logger.error("Thread was interrupted."); + } + } + } diff --git a/src/main/java/worker/WorkerManager.java b/src/main/java/worker/WorkerManager.java index ef0da78..1e1bd3a 100644 --- a/src/main/java/worker/WorkerManager.java +++ b/src/main/java/worker/WorkerManager.java @@ -6,25 +6,25 @@ import manager.ChooseStrategy; /** - * 管理工作线程 + * 管理工作线程。 * * @author skywalker * */ public class WorkerManager extends AbstractManager { - public WorkerManager(int s, ExecutorService executor) { - super(s, executor); - } - - public WorkerManager(int s, ExecutorService executor, - ChooseStrategy chooseStrategy) { - super(s, executor, chooseStrategy); - } + public WorkerManager(int s, ExecutorService executor) { + super(s, executor); + } - @Override - protected Worker newCandidate() { - return new Worker(executor); - } + public WorkerManager(int s, ExecutorService executor, + ChooseStrategy chooseStrategy) { + super(s, executor, chooseStrategy); + } + + @Override + protected Worker newCandidate() { + return new Worker(executor); + } } diff --git a/src/main/resource/logback.xml b/src/main/resource/logback.xml index 8564b17..a91077e 100644 --- a/src/main/resource/logback.xml +++ b/src/main/resource/logback.xml @@ -1,13 +1,13 @@ - - - - %d{yyyy-MM-dd HH:mm:ss}[%thread] %-5level %logger{36} -%msg%n - - - - - - + + + + %d{yyyy-MM-dd HH:mm:ss}[%thread] %-5level %logger{36} -%msg%n + + + + + + \ No newline at end of file diff --git a/src/test/java/bootstrap/Bootstrap.java b/src/test/java/bootstrap/Bootstrap.java index 934bd1e..a8195da 100644 --- a/src/test/java/bootstrap/Bootstrap.java +++ b/src/test/java/bootstrap/Bootstrap.java @@ -15,41 +15,41 @@ import server.Server; /** - * 启动 + * 启动。 * * @author skywalker * */ public class Bootstrap { - @Test - public void lengthFieldBasedDecoder() { - Server server = new Server(); - server.bind(8080).setHandlers(new HandlerInitializer() { - @Override - public Handler[] init() { - return new Handler[] {new LengthFieldBasedDecoder(0, 4), new StringDecoder(), new SimpleInBoundHandler()}; - } - }).start(); - } - - @Test - public void delimiterBasedDecoder() throws InterruptedException { - Server server = new Server(); - server.bind(8080).setHandlers(new HandlerInitializer() { - @Override - public Handler[] init() { - return new Handler[] {new DelimiterBasedDecoder('a'), new StringDecoder(), new SimpleInBoundHandler()}; - } - }).start(); - TimeUnit.MINUTES.sleep(10); - } - - @Test - public void response() throws InterruptedException { - Server server = new Server(); - server.bind(8080).setHandlers(new StringDecoder(), new ResponseHandler(), new StringEncoder()).start(); - TimeUnit.MINUTES.sleep(10); - } - + @Test + public void lengthFieldBasedDecoder() { + Server server = new Server(); + server.bind(8080).setHandlers(new HandlerInitializer() { + @Override + public Handler[] init() { + return new Handler[] {new LengthFieldBasedDecoder(0, 4), new StringDecoder(), new SimpleInBoundHandler()}; + } + }).start(); + } + + @Test + public void delimiterBasedDecoder() throws InterruptedException { + Server server = new Server(); + server.bind(8080).setHandlers(new HandlerInitializer() { + @Override + public Handler[] init() { + return new Handler[] {new DelimiterBasedDecoder('a'), new StringDecoder(), new SimpleInBoundHandler()}; + } + }).start(); + TimeUnit.MINUTES.sleep(10); + } + + @Test + public void response() throws InterruptedException { + Server server = new Server(); + server.bind(8080).setHandlers(new StringDecoder(), new ResponseHandler(), new StringEncoder()).start(); + TimeUnit.MINUTES.sleep(10); + } + } diff --git a/src/test/java/client/Client.java b/src/test/java/client/Client.java index 8334908..ca8ae89 100644 --- a/src/test/java/client/Client.java +++ b/src/test/java/client/Client.java @@ -14,51 +14,54 @@ import util.DataUtils; +/** + * 测试-客户端角色. + */ public class Client { - - private Socket socket; - private BufferedOutputStream bos; - - @Before - public void init() throws IOException { - socket = new Socket(); - socket.connect(new InetSocketAddress(8080)); - bos = new BufferedOutputStream(socket.getOutputStream()); - } - @Test - public void lengthFieldBasedDecoder() throws IOException, InterruptedException { - byte[] result = new byte[35]; - System.arraycopy(DataUtils.int2Bytes(31), 0, result, 0, 4); - System.arraycopy("org.apache.commons.lang.builder".getBytes(), 0, result, 4, 31); - for (int i = 0; i < 6; i++) { - bos.write(result); - } - TimeUnit.SECONDS.sleep(6); - } - - @Test - public void delimiterBasedDecoder() throws IOException { - byte[] data = "This is a beautiful world.\n".getBytes(); - for (int i = 0; i < 12; i++) { - bos.write(data); - } - bos.flush(); - } - - @Test - public void response() throws IOException { - BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); - bos.write("skywalker".getBytes()); - bos.flush(); - System.out.println(br.readLine()); - br.close(); - } - - @After - public void close() throws IOException { - bos.close(); - socket.close(); - } - + private Socket socket; + private BufferedOutputStream bos; + + @Before + public void init() throws IOException { + socket = new Socket(); + socket.connect(new InetSocketAddress(8080)); + bos = new BufferedOutputStream(socket.getOutputStream()); + } + + @Test + public void lengthFieldBasedDecoder() throws IOException, InterruptedException { + byte[] result = new byte[35]; + System.arraycopy(DataUtils.int2Bytes(31), 0, result, 0, 4); + System.arraycopy("org.apache.commons.lang.builder".getBytes(), 0, result, 4, 31); + for (int i = 0; i < 6; i++) { + bos.write(result); + } + TimeUnit.SECONDS.sleep(6); + } + + @Test + public void delimiterBasedDecoder() throws IOException { + byte[] data = "This is a beautiful world.\n".getBytes(); + for (int i = 0; i < 12; i++) { + bos.write(data); + } + bos.flush(); + } + + @Test + public void response() throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream())); + bos.write("skywalker".getBytes()); + bos.flush(); + System.out.println(br.readLine()); + br.close(); + } + + @After + public void close() throws IOException { + bos.close(); + socket.close(); + } + } diff --git a/src/test/java/handler/ResponseHandler.java b/src/test/java/handler/ResponseHandler.java index cac6eb8..89e8dda 100644 --- a/src/test/java/handler/ResponseHandler.java +++ b/src/test/java/handler/ResponseHandler.java @@ -3,16 +3,16 @@ import context.HandlerContext; /** - * 测试回应情况 + * 测试回应情况. * * @author skywalker * */ public class ResponseHandler extends InBoundHandlerAdapter { - @Override - public void channelRead(Object message, HandlerContext context) { - context.writeFlush("Hello: " + (String) message + "!\n"); - } - + @Override + public void channelRead(Object message, HandlerContext context) { + context.writeFlush("Hello: " + (String) message + "!\n"); + } + } diff --git a/src/test/java/handler/SimpleInBoundHandler.java b/src/test/java/handler/SimpleInBoundHandler.java index dbbe4eb..9289517 100644 --- a/src/test/java/handler/SimpleInBoundHandler.java +++ b/src/test/java/handler/SimpleInBoundHandler.java @@ -2,21 +2,26 @@ import context.HandlerContext; +/** + * 简单的{@link InBoundHandlerAdapter}实现,简单地打印出事件触发以及收到的消息. + * + * @author skywalker + */ public class SimpleInBoundHandler extends InBoundHandlerAdapter { - @Override - public void channelActive(HandlerContext context) { - System.out.println("channel active"); - } - - @Override - public void channelInActive(HandlerContext context) { - System.out.println("channel inActive"); - } - - @Override - public void channelRead(Object message, HandlerContext context) { - System.out.println(message.toString()); - } - + @Override + public void channelActive(HandlerContext context) { + System.out.println("channel active"); + } + + @Override + public void channelInActive(HandlerContext context) { + System.out.println("channel inActive"); + } + + @Override + public void channelRead(Object message, HandlerContext context) { + System.out.println(message.toString()); + } + } diff --git a/src/test/java/util/DataUtils.java b/src/test/java/util/DataUtils.java index 83930e4..1826f64 100644 --- a/src/test/java/util/DataUtils.java +++ b/src/test/java/util/DataUtils.java @@ -1,14 +1,24 @@ package util; +/** + * 数据相关操作工具类. + * + * @author skywalker + */ public class DataUtils { - public static byte[] int2Bytes(int i) { - byte[] arr = new byte[4]; - arr[0] = (byte) (i & 0xff); - arr[1] = (byte) ((i >> 8) & 0xff); - arr[2] = (byte) ((i >> 16) & 0xff); - arr[3] = (byte) ((i >> 24) & 0xff); - return arr; - } - + private DataUtils() {} + + /** + * 将int数字转为byte数组. + */ + public static byte[] int2Bytes(int i) { + byte[] arr = new byte[4]; + arr[0] = (byte) (i & 0xff); + arr[1] = (byte) ((i >> 8) & 0xff); + arr[2] = (byte) ((i >> 16) & 0xff); + arr[3] = (byte) ((i >> 24) & 0xff); + return arr; + } + }