消息隊列(六)RocketMQ-RPC通訊Netty多線程模型

1、爲什麼要使用Netty做爲高性能的通訊庫? 在看RocketMQ的RPC通訊部分時候,可能有很多同窗有這樣子的疑問,RocketMQ爲什麼要選擇Netty而不直接使用JDK的NIO進行網絡編程呢?這裏有必要先來簡要介紹下Netty。react

Netty是一個封裝了JDK的NIO庫的高性能網絡通訊開源框架。它提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。算法

下面主要列舉了下通常系統的RPC通訊模塊會選擇Netty做爲底層通訊庫的理由(做者認爲RocketMQ的RPC一樣也是基於此選擇了Netty):編程

(1)Netty的編程API使用簡單,開發門檻低,無需編程者去關注和了解太多的NIO編程模型和概念;後端

(2)對於編程者來講,可根據業務的要求進行定製化地開發,經過Netty的ChannelHandler對通訊框架進行靈活的定製化擴展;設計模式

(3)Netty框架自己支持拆包/解包,異常檢測等機制,讓編程者能夠從JAVA NIO的繁瑣細節中解脫,而只須要關注業務處理邏輯;緩存

(4)Netty解決了(準確地說應該是採用了另外一種方式完美規避了)JDK NIO的Bug(Epoll bug,會致使Selector空輪詢,最終致使CPU 100%);bash

(5)Netty框架內部對線程,selector作了一些細節的優化,精心設計的reactor多線程模型,能夠實現很是高效地併發處理;服務器

6)Netty已經在多個開源項目(Hadoop的RPC框架avro使用Netty做爲通訊框架)中都獲得了充分驗證,健壯性/可靠性比較好。網絡

2、RocketMQ中RPC通訊的Netty多線程模型多線程

RocketMQ的RPC通訊部分採用了"1+N+M1+M2"的Reactor多線程模式,對網絡通訊部分進行了必定的擴展與優化,這一節主要讓咱們來看下這一部分的具體設計與實現內容。

2.一、Netty的Reactor多線程模型設計概念與簡述

這裏有必要先來簡要介紹下Netty的Reactor多線程模型。Reactor多線程模型的設計思想是分而治之+事件驅動。

(1)分而治之

通常來講,一個網絡請求鏈接的完整處理過程能夠分爲接受(accept)、數據讀取(read)、解碼/編碼(decode/encode)、業務處理(process)、發送響應(send)這幾步驟。Reactor模型將每一個步驟都映射成爲一個任務,服務端線程執行的最小邏輯單元再也不是一次完整的網絡請求,而是這個任務,且採用以非阻塞方式執行。

(2)事件驅動

每一個任務對應特定網絡事件。當任務準備就緒時,Reactor收到對應的網絡事件通知,並將任務分發給綁定了對應網絡事件的Handler執行。

2.二、RocketMQ中RPC通訊的1+N+M1+M2的Reactor多線程設計與實現

(1)RocketMQ中RPC通訊的Reactor多線程設計與流程

RocketMQ的RPC通訊採用Netty組件做爲底層通訊庫,一樣也遵循了Reactor多線程模型,同時又在這之上作了一些擴展和優化。下面先給出一張RocketMQ的RPC通訊層的Netty多線程模型框架圖,讓你們對RocketMQ的RPC通訊中的多線程分離設計有一個大體的瞭解。

從上面的框圖中能夠大體瞭解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即爲上面的1)負責監聽 TCP網絡鏈接請求,創建好鏈接後丟給Reactor 線程池(eventLoopGroupSelector,即爲上面的「N」,源碼中默認設置爲3),它負責將創建好鏈接的socket 註冊到 selector上去(RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也能夠經過參數配置),而後監聽真正的網絡數據。拿到網絡數據後,再丟給Worker線程池(defaultEventExecutorGroup,即爲上面的「M1」,源碼中默認設置爲8)。 爲了更爲高效的處理RPC的網絡請求,這裏的Worker線程池是專門用於處理Netty網絡通訊相關的(包括編碼/解碼、空閒連接管理、網絡鏈接管理以及網絡請求處理)。而處理業務操做放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變量中找到對應的 processor,而後封裝成task任務後,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息爲例,即爲上面的 「M2」)。

下面以表格的方式列舉了下上面所述的「1+N+M1+M2」Reactor多線程模型

(2)RocketMQ中RPC通訊的Reactor多線程的代碼具體實現

說完了Reactor多線程總體的設計與流程,你們應該就對RocketMQ的RPC通訊的Netty部分有了一個比較全面的理解了,那接下來就從源碼上來看下一些細節部分(在看該部分代碼時候須要讀者對JAVA NIO和Netty的相關概念與技術點有所瞭解)。 在NettyRemotingServer的實例初始化時,會初始化各個相關的變量包括serverBootstrap、nettyServerConfig參數、channelEventListener監聽器並同時初始化eventLoopGroupBoss和eventLoopGroupSelector兩個Netty的EventLoopGroup線程池(這裏須要注意的是,若是是Linux平臺,而且開啓了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;不然,就用Java NIO的NioEventLoopGroup。),具體代碼以下:

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
        this.serverBootstrap = new ServerBootstrap();
        this.nettyServerConfig = nettyServerConfig;
        this.channelEventListener = channelEventListener;
      //省略部分代碼
      //初始化時候nThreads設置爲1,說明RemotingServer端的Disptacher連接管理和分發請求的線程爲1,用於接收客戶端的TCP鏈接
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        /**
         * 根據配置設置NIO仍是Epoll來做爲Selector線程池
         * 若是是Linux平臺,而且開啓了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;不然,就用Java NIO的NioEventLoopGroup。
         * 
         */
        if (useEpoll()) {
            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
                private int threadTotal = nettyServerConfig.getServerSelectorThreads();

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
                }
            });
        }
        //省略部分代碼 
複製代碼

在NettyRemotingServer實例初始化完成後,就會將其啓動。Server端在啓動階段會將以前實例化好的1個acceptor線程(eventLoopGroupBoss),N個IO線程(eventLoopGroupSelector),M1個worker 線程(defaultEventExecutorGroup)綁定上去。前面部分也已經介紹過各個線程池的做用了。 這裏須要說明的是,Worker線程拿到網絡數據後,就交給Netty的ChannelPipeline(其採用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在建立NettyRemotingServer實例時候指定的。NettyEncoder和NettyDecoder 負責網絡傳輸數據和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼獲得的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 仍是 response來進行相應處理,根據業務請求碼封裝成不一樣的task任務後,提交給對應的業務processor處理線程池處理。

@Override
    public void start() {
        //默認的處理線程池組,使用默認的處理線程池組用於處理後面的多個Netty Handler的邏輯操做

        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
                nettyServerConfig.getServerWorkerThreads(),
                new ThreadFactory() {

                    private AtomicInteger threadIndex = new AtomicInteger(0);

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
                    }
                });
        /**
         * 首先來看下 RocketMQ NettyServer 的 Reactor 線程模型,
         * 一個 Reactor 主線程負責監聽 TCP 鏈接請求;
         * 創建好鏈接後丟給 Reactor 線程池,它負責將創建好鏈接的 socket 註冊到 selector
         * 上去(這裏有兩種方式,NIO和Epoll,可配置),而後監聽真正的網絡數據;
         * 拿到網絡數據後,再丟給 Worker 線程池;
         *
         */
        //RocketMQ-> Java NIO的1+N+M模型:1個acceptor線程,N個IO線程,M1個worker 線程。
        ServerBootstrap childHandler =
                this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        //服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列的大小
                        .option(ChannelOption.SO_REUSEADDR, true)//這個參數表示容許重複使用本地地址和端口
                        .option(ChannelOption.SO_KEEPALIVE, false)//當設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文。
                        .childOption(ChannelOption.TCP_NODELAY, true)//該參數的做用就是禁止使用Nagle算法,使用於小數據即時傳輸
                        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//這兩個參數用於操做接收緩衝區和發送緩衝區
                        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
                        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {

                                ch.pipeline()
                                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                                                new HandshakeHandler(TlsSystemConfig.tlsMode))
                                        .addLast(defaultEventExecutorGroup,
                                                new NettyEncoder(),//rocketmq解碼器,他們分別覆蓋了父類的encode和decode方法
                                                new NettyDecoder(),//rocketmq編碼器
                                                new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自帶的心跳管理器
                                                new NettyConnectManageHandler(),//鏈接管理器,他負責捕獲新鏈接、鏈接斷開、異常等事件,而後統一調度到NettyEventExecuter處理器處理。
                                                new NettyServerHandler()//當一個消息通過前面的解碼等步驟後,而後調度到channelRead0方法,而後根據消息類型進行分發 
                                        );
                            }
                        });

        if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
            childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
        }

        try {
            ChannelFuture sync = this.serverBootstrap.bind().sync();
            InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
            this.port = addr.getPort();
        } catch (InterruptedException e1) {
            throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
        }

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

        //定時掃描responseTable,獲取返回結果,而且處理超時
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    NettyRemotingServer.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);
    }
複製代碼

從上面的描述中能夠歸納得出RocketMQ的RPC通訊部分的Reactor線程池模型框圖。

總體能夠看出RocketMQ的RPC通訊藉助Netty的多線程模型,其服務端監聽線程和IO線程分離,同時將RPC通訊層的業務邏輯與處理具體業務的線程進一步相分離。時間可控的簡單業務都直接放在RPC通訊部分來完成,複雜和時間不可控的業務提交至後端業務線程池中處理,這樣提升了通訊效率和MQ總體的性能。(ps:其中抽象出NioEventLoop來表示一個不斷循環執行處理任務的線程,每一個NioEventLoop有一個selector,用於監聽綁定在其上的socket鏈路。)

note:如今很痛苦,等過陣子回頭看看,會發現其實那都不算事。

相關文章
相關標籤/搜索