Netty是Reactor模式事件驅動網絡框架,Netty不只用法簡單,並且性能和可靠性都很好,因此被不少的中間件做爲網絡層使用,像dubbo, RocketMQ底層都採用了Netty。
Netty採用的是Reactor模式,由一個boss線程池充當main reactor,worker線程池充當sub reactor,main reactor負責監聽端口,接收socket鏈接;sub reactor負責socket數據的讀寫,main reactor將接收的socket鏈接分配給sub reactor,sub reactor負責管理socket,並調用Handlet處理socket的IO事件。
Netty爲何這麼高效?由於Netty使用了Nio,Nio主要由三部分構成:Channel, Buffer和Selector,相比於Bio服務器,Nio實現了非阻塞io和多路複用,充分利用多CPU的性能,因此Netty具備很好的性能。
java
啓動Netty服務器:react
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { //Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EventHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.SO_KEEPALIVE, true); // bind and start to accept incoming connections. ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
主要類分析:
類圖:
ServerBootstrap:
ServerBootstrap啓動類,經過初始化boos和worker線程池,建立ServerSocketChannel綁定端口,並初始化ServerSocketChannel配置,啓動Socket Server。數組
關鍵類和方法:
1. group(group)
group方法初始化boss和work工做線程都爲group
2. group(parentGroup, childGroup)
group方法初始化boss和work工做線程,parentGroup爲boss,childGroup爲worker
3. bind( )
AbstractBootstrap經過bind方法用boss線程綁定端口並返回ChannelFuture
4. ServerBootstrapAcceptor
ServerBootstrapAcceptor繼承ChannelInboundHandlerAdapter,做爲ServerSocketChannel的ChannelHandler處理事件
5. ServerBootstrapAcceptor channelRead( )
ServerBootstrapAcceptor的channelRead方法,處理ServerSocketChannel的read事件,將新的鏈接NioSocketChannel註冊到childGroup中,從而實現了boss分配鏈接到work。
NioEventLoopGroup:
NioEventLoopGroup線程池類,繼承MultithreadEventLoopGroup和MultithreadEventExecutorGroup,MultithreadEventExecutorGroup其中包含一個EventExecutor數組,NioEventLoopGroup經過實現newChild( )方法實例一個NioEventLoop工做線程池。關鍵類和方法:
1. newChild( )
newChild方法實現了MultithreadEventExecutorGroup的抽象方法,返回一個NioEventLoop工做
2. MultithreadEventLoopGroup register(channel)
MultithreadEventLoopGroup的register一個channel到一個NioEventLoop上
3. MultithreadEventLoopGroup next( )
MultithreadEventLoopGroup的next方法返回一個NioEventLoop
NioEventLoop:
NioEventLoop線程類,繼承SingleThreadEventLoop和SingleThreadEventExecutor,NioEventLoop包含一個Selector負責監聽加入的socket事件,SingleThreadEventExecutor包含一個Thread和task隊列,execute第一個任務時纔會啓動工做線程,線程負責依次處理監聽事件和任務隊列,因此boss只有一個線程,worker有CPU x 2個線程。
關鍵類和方法:
1. register(SelectableChannel ch)
register方法註冊一個channel到NioEventLoop的Selector上
2. run( )
NioEventLoop線程方法,依次處理Selector上的非IO事件和IO事件
3. SingleThreadEventExecutor execute(task)
SingleThreadEventExecutor的execute方法第一次被調用時纔會啓動本線程
ServerBootstrap: 將AbstractBootstrap.ServerBootstrapAcceptor綁定到ServerChannelpromise
void init(Channel channel) throws Exception { Map options = this.options(); synchronized(options) { channel.config().setOptions(options); } Map attrs = this.attrs(); synchronized(attrs) { Iterator currentChildGroup = attrs.entrySet().iterator(); while(true) { if(!currentChildGroup.hasNext()) { break; } Entry currentChildHandler = (Entry)currentChildGroup.next(); AttributeKey currentChildOptions = (AttributeKey)currentChildHandler.getKey(); channel.attr(currentChildOptions).set(currentChildHandler.getValue()); } } ChannelPipeline p = channel.pipeline(); if(this.handler() != null) { p.addLast(new ChannelHandler[]{this.handler()}); } final EventLoopGroup currentChildGroup1 = this.childGroup; final ChannelHandler currentChildHandler1 = this.childHandler; Map var9 = this.childOptions; final Entry[] currentChildOptions1; synchronized(this.childOptions) { currentChildOptions1 =(Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size())); } var9 = this.childAttrs; final Entry[] currentChildAttrs; synchronized(this.childAttrs) { currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size())); } /** * 將ServerBootstrapAcceptor做爲ChannlelHandler綁定到ServerChannel, * ServerChannel上的Selector事件將經過ServerBootstrapAcceptor處理。 */ p.addLast(new ChannelHandler[]{new ChannelInitializer() { public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup1, currentChildHandler1, currentChildOptions1, currentChildAttrs)}); } }}); }
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { public void run() { if(regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
AbstractBootstrap.ServerBootstrapAcceptor:綁定新channel到workGroup服務器
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(new ChannelHandler[]{this.childHandler}); Entry[] t = this.childOptions; int len$ = t.length; int i$; Entry e; for (i$ = 0; i$ < len$; ++i$) { e = t[i$]; try { if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) { ServerBootstrap.logger.warn("Unknown channel option: " + e); } } catch (Throwable var10) { ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10); } } t = this.childAttrs; len$ = t.length; for (i$ = 0; i$ < len$; ++i$) { e = t[i$]; child.attr((AttributeKey) e.getKey()).set(e.getValue()); } try { /** * Acceptor接受新的channel後,將channel註冊到childGroup中去, * childGroup則經過next方法返回一個eventLoop,將channel註冊到eventLoop的selector上。 */ this.childGroup.register(child).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()){ ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause()); } } }); } catch (Throwable var9) { forceClose(child, var9); } }
SingleThreadEventExecutors: 啓動線程網絡
public void execute(Runnable task) { if(task == null) { throw new NullPointerException("task"); } else { /** * 若是在本線程中,則添加新的任務到eventLoop任務隊列; * 若是不在本線程中,則檢測線程是否啓動並添加任務到eventLoop任務隊列。 */ boolean inEventLoop = this.inEventLoop(); if(inEventLoop) { this.addTask(task); } else { this.startThread(); this.addTask(task); if(this.isShutdown() && this.removeTask(task)) { reject(); } } if(!this.addTaskWakesUp) { this.wakeup(inEventLoop); } } }
NioEventLoop:線程處理框架
protected void run() { while(true) { this.oldWakenUp = this.wakenUp.getAndSet(false); try { /** * 若是task隊列上有任務,則直接wakeup。 * 若是task隊列上沒有任務,則進入select狀態,等待事件wakeup。 * task隊列上的任務主要是註冊新的Channel。 */ if(this.hasTasks()) { this.selectNow(); } else { this.select(); if(this.wakenUp.get()) { this.selector.wakeup(); } } /** * ioRatio默認爲50,nio線程用一半時間處理selector上的事件,一半處理task隊列上的任務。 */ this.cancelledKeys = 0; long t = System.nanoTime(); this.needsToSelectAgain = false; if(this.selectedKeys != null) { this.processSelectedKeysOptimized(this.selectedKeys.flip()); } else { this.processSelectedKeysPlain(this.selector.selectedKeys()); } long ioTime = System.nanoTime() - t; int ioRatio = this.ioRatio; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); if(this.isShuttingDown()) { this.closeAll(); if(this.confirmShutdown()) { return; } } } catch (Throwable var7) { logger.warn("Unexpected exception in the selector loop.", var7); try { Thread.sleep(1000L); } catch (InterruptedException var6) { } } } }