Netty 框架

    Netty是Reactor模式事件驅動網絡框架,Netty不只用法簡單,並且性能和可靠性都很好,因此被不少的中間件做爲網絡層使用,像dubbo, RocketMQ底層都採用了Netty。
    Netty採用的是Reactor模式,由一個boss線程池充當main reactor,worker線程池充當sub reactor,main reactor負責監聽端口,接收socket鏈接;sub reactor負責socket數據的讀寫,main reactor將接收的socket鏈接分配給sub reactor,sub reactor負責管理socket,並調用Handlet處理socket的IO事件。
    Netty爲何這麼高效?由於Netty使用了Nio,Nio主要由三部分構成:Channel, Buffer和Selector,相比於Bio服務器,Nio實現了非阻塞io和多路複用,充分利用多CPU的性能,因此Netty具備很好的性能。
    java

啓動Netty服務器:react

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                 //Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new EventHandler());
                 }
            })
            .option(ChannelOption.SO_BACKLOG, 128)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.SO_KEEPALIVE, true);

    // bind and start to accept incoming connections.
    ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();

    channelFuture.channel().closeFuture().sync();
} finally {
   workerGroup.shutdownGracefully();
   bossGroup.shutdownGracefully();
}


主要類分析:
類圖:

  

ServerBootstrap:
ServerBootstrap啓動類,經過初始化boos和worker線程池,建立ServerSocketChannel綁定端口,並初始化ServerSocketChannel配置,啓動Socket Server。數組

關鍵類和方法:
    1. group(group)
        group方法初始化boss和work工做線程都爲group
    2. group(parentGroup, childGroup)
        group方法初始化boss和work工做線程,parentGroup爲boss,childGroup爲worker
    3. bind( )
        AbstractBootstrap經過bind方法用boss線程綁定端口並返回ChannelFuture
    4. ServerBootstrapAcceptor
        ServerBootstrapAcceptor繼承ChannelInboundHandlerAdapter,做爲ServerSocketChannel的ChannelHandler處理事件
    5. ServerBootstrapAcceptor channelRead( )
        ServerBootstrapAcceptor的channelRead方法,處理ServerSocketChannel的read事件,將新的鏈接NioSocketChannel註冊到childGroup中,從而實現了boss分配鏈接到work。

NioEventLoopGroup:
NioEventLoopGroup線程池類,繼承MultithreadEventLoopGroup和MultithreadEventExecutorGroup,MultithreadEventExecutorGroup其中包含一個EventExecutor數組,NioEventLoopGroup經過實現newChild( )方法實例一個NioEventLoop工做線程池。關鍵類和方法:
    1. newChild( )
        newChild方法實現了MultithreadEventExecutorGroup的抽象方法,返回一個NioEventLoop工做
    2. MultithreadEventLoopGroup register(channel)
        MultithreadEventLoopGroup的register一個channel到一個NioEventLoop上
    3. MultithreadEventLoopGroup next( )
        MultithreadEventLoopGroup的next方法返回一個NioEventLoop

NioEventLoop
NioEventLoop線程類,繼承SingleThreadEventLoop和SingleThreadEventExecutor,NioEventLoop包含一個Selector負責監聽加入的socket事件,SingleThreadEventExecutor包含一個Thread和task隊列,execute第一個任務時纔會啓動工做線程,線程負責依次處理監聽事件和任務隊列,因此boss只有一個線程,worker有CPU x 2個線程。
關鍵類和方法:
    1. register(SelectableChannel ch)
        register方法註冊一個channel到NioEventLoop的Selector上
    2. run( )
        NioEventLoop線程方法,依次處理Selector上的非IO事件和IO事件
    3. SingleThreadEventExecutor execute(task)
        SingleThreadEventExecutor的execute方法第一次被調用時纔會啓動本線程

ServerBootstrap: 將AbstractBootstrap.ServerBootstrapAcceptor綁定到ServerChannelpromise

void init(Channel channel) throws Exception {
    Map options = this.options();
    synchronized(options) {
           channel.config().setOptions(options);
    }

    Map attrs = this.attrs();
    synchronized(attrs) {
       Iterator currentChildGroup = attrs.entrySet().iterator();

       while(true) {
           if(!currentChildGroup.hasNext()) {
               break;
           }
           Entry currentChildHandler = (Entry)currentChildGroup.next();
           AttributeKey currentChildOptions = (AttributeKey)currentChildHandler.getKey();
           channel.attr(currentChildOptions).set(currentChildHandler.getValue());
       }
    }

    ChannelPipeline p = channel.pipeline();
    if(this.handler() != null) {
        p.addLast(new ChannelHandler[]{this.handler()});
    } 
    
    final EventLoopGroup currentChildGroup1 = this.childGroup;
    final ChannelHandler currentChildHandler1 = this.childHandler;
    Map var9 = this.childOptions;
    final Entry[] currentChildOptions1;
    synchronized(this.childOptions) {
        currentChildOptions1 =(Entry[])this.childOptions.entrySet().toArray(newOptionArray(this.childOptions.size()));
    }

    var9 = this.childAttrs;
    final Entry[] currentChildAttrs;
    synchronized(this.childAttrs) {
        currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(this.childAttrs.size()));
    }

    /**
     * 將ServerBootstrapAcceptor做爲ChannlelHandler綁定到ServerChannel,
     * ServerChannel上的Selector事件將經過ServerBootstrapAcceptor處理。
     */
    p.addLast(new ChannelHandler[]{new ChannelInitializer() {
        public void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(currentChildGroup1, currentChildHandler1, currentChildOptions1, currentChildAttrs)});
        }
    }}); 
}

 

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if(regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}


AbstractBootstrap.ServerBootstrapAcceptor:綁定新channel到workGroup服務器

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
    Entry[] t = this.childOptions;
    int len$ = t.length;

    int i$;
    Entry e;
    for (i$ = 0; i$ < len$; ++i$) {
        e = t[i$];

        try {
            if (!child.config().setOption((ChannelOption) e.getKey(), e.getValue())) {
                ServerBootstrap.logger.warn("Unknown channel option: " + e);
            }
        } catch (Throwable var10) {
            ServerBootstrap.logger.warn("Failed to set a channel option: " + child, var10);
        }
     }

    t = this.childAttrs;
    len$ = t.length;
    for (i$ = 0; i$ < len$; ++i$) {
        e = t[i$];
        child.attr((AttributeKey) e.getKey()).set(e.getValue());
    }
        
    try {
        /**
         * Acceptor接受新的channel後,將channel註冊到childGroup中去,
         * childGroup則經過next方法返回一個eventLoop,將channel註冊到eventLoop的selector上。
         */
        this.childGroup.register(child).addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()){               
                    ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause());
                }
            }   
        }); 
    } catch (Throwable var9) {
        forceClose(child, var9);
    }
}


SingleThreadEventExecutors: 啓動線程網絡

public void execute(Runnable task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        /**
         * 若是在本線程中,則添加新的任務到eventLoop任務隊列;
         * 若是不在本線程中,則檢測線程是否啓動並添加任務到eventLoop任務隊列。
         */
        boolean inEventLoop = this.inEventLoop();
        if(inEventLoop) {
            this.addTask(task);
        } else {
            this.startThread(); 
            this.addTask(task);
            if(this.isShutdown() && this.removeTask(task)) {
                reject();
            }
        }

        if(!this.addTaskWakesUp) {
            this.wakeup(inEventLoop);
        } 
    }
}


NioEventLoop:線程處理框架

protected void run() {
    while(true) {
        this.oldWakenUp = this.wakenUp.getAndSet(false);

        try {
            /**
             * 若是task隊列上有任務,則直接wakeup。
             * 若是task隊列上沒有任務,則進入select狀態,等待事件wakeup。
             * task隊列上的任務主要是註冊新的Channel。
             */
            if(this.hasTasks()) {
                this.selectNow();
            } else {
                this.select();
                if(this.wakenUp.get()) {
                    this.selector.wakeup();
                }
            }

            /**
             * ioRatio默認爲50,nio線程用一半時間處理selector上的事件,一半處理task隊列上的任務。
             */
            this.cancelledKeys = 0;
            long t = System.nanoTime();
            this.needsToSelectAgain = false;
            if(this.selectedKeys != null) {
                this.processSelectedKeysOptimized(this.selectedKeys.flip());
            } else {
                this.processSelectedKeysPlain(this.selector.selectedKeys());
            }

            long ioTime = System.nanoTime() - t;
            int ioRatio = this.ioRatio;
            this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
            if(this.isShuttingDown()) {
                this.closeAll();
                if(this.confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable var7) {
            logger.warn("Unexpected exception in the selector loop.", var7);
            
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException var6) {
                   
            }
        }
    }
}
相關文章
相關標籤/搜索