可根據須要配置線程模型:單線程Reactor、多線程Reactor、多層線程Reactorbootstrap
不管幾個線程,都經過單一的Acceptor接收客戶端請求,能夠建立更多的NioEventLoop來處理IO操做。promise
EventLoop和EventLoopGroup實際繼承了Java的ScheduledExecutorService,使其具有了線程池的特性,其線程數量可動態配置。例如配置單線程模型,設置線程數量爲1便可。安全
Future即異步操做
future操做能夠被close,但結果是未知的;調用get能夠獲取操做結果,可是會被阻塞;isDone可判斷是否完成操做。
ChannelFuture是爲了獲取異步返回結果而設計
能夠經過ChannelFutureListener接口得到回調,無需等待get方法返回。多線程
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture> { ChannelFutureListener CLOSE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { future.channel().close(); } }; ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().close(); } } }; ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { if (!future.isSuccess()) { future.channel().pipeline().fireExceptionCaught(future.cause()); } } }; }
鏈接超時和channel超時配置
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);異步
注意:
一、謹慎調用await,可能致使死鎖。
二、ChannelFuture超時後若是調用了業務代碼重連,而此時IO未超時,將可能致使多條鏈接並存,設置IO超時時間建議小於業務代碼超時時間。ide
升級版的future,可寫可操做(對回調過程)。future比如古代飛鴿傳書,只能等鴿子回來或者不回來,不可控;promise就像現代快遞員,送快遞送一半能夠打電話給他叫他不要送了或者中途請他幫忙買個餅。
例如:
DefaultPromise類
awaitUninterruptibly()可手動打斷回調,使進程等待。oop
public Promise<V> awaitUninterruptibly() { if (this.isDone()) { return this; } else { boolean interrupted = false; synchronized(this) { while(!this.isDone()) { this.checkDeadLock(); this.incWaiters(); try { this.wait(); } catch (InterruptedException var9) { interrupted = true; } finally { this.decWaiters(); } } } if (interrupted) { Thread.currentThread().interrupt(); } return this; } }
進行了死鎖判斷,避免已存在相同任務;並限制了最大等待數量32767this
protected void checkDeadLock() { EventExecutor e = this.executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(this.toString()); } } private void incWaiters() { if (this.waiters == 32767) { throw new IllegalStateException("too many waiters: " + this); } else { ++this.waiters; } }
Channel負責對外提供操做IO的接口,而UnSafe是Channel的內部接口類,如其名同樣是不安全的操做,因此封裝在接口內部不讓外部調用,而實際的操做IO最終都是在Unsafe中執行。線程
//Channel調用鏈接爲例,跟蹤實現鏈接請求的過程 ChannelFuture connect(SocketAddress var1); //DefaultChannelPipeline中執行,實際是調用尾部的pipeline public ChannelFuture connect(SocketAddress remoteAddress) { return this.tail.connect(remoteAddress); } //AbstractChannelHandlerContext是Pipeline容器中的對象, //持續尋找全部handler執行對象,直到所有被調用 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { AbstractChannelHandlerContext next = this.findContextOutbound(); next.invoker().invokeConnect(next, remoteAddress, localAddress, promise); return promise; } private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while(!ctx.outbound); return ctx; } //而真實的執行是尋找到UnSafe的Invoker public ChannelHandlerInvoker invoker() { return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker; } public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { if (remoteAddress == null) { throw new NullPointerException("remoteAddress"); } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) { if (this.executor.inEventLoop()) { ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise); } else { this.safeExecuteOutbound(new OneTimeTask() { public void run() { ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise); } }, promise); } } }