1、爲什麼要使用Netty做爲高性能的通訊庫? 在看RocketMQ的RPC通訊部分時候,可能有很多同窗有這樣子的疑問,RocketMQ爲什麼要選擇Netty而不直接使用JDK的NIO進行網絡編程呢?這裏有必要先來簡要介紹下Netty。react
Netty是一個封裝了JDK的NIO庫的高性能網絡通訊開源框架。它提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。算法
下面主要列舉了下通常系統的RPC通訊模塊會選擇Netty做爲底層通訊庫的理由(做者認爲RocketMQ的RPC一樣也是基於此選擇了Netty):編程
(1)Netty的編程API使用簡單,開發門檻低,無需編程者去關注和了解太多的NIO編程模型和概念;後端
(2)對於編程者來講,可根據業務的要求進行定製化地開發,經過Netty的ChannelHandler對通訊框架進行靈活的定製化擴展;設計模式
(3)Netty框架自己支持拆包/解包,異常檢測等機制,讓編程者能夠從JAVA NIO的繁瑣細節中解脫,而只須要關注業務處理邏輯;緩存
(4)Netty解決了(準確地說應該是採用了另外一種方式完美規避了)JDK NIO的Bug(Epoll bug,會致使Selector空輪詢,最終致使CPU 100%);bash
(5)Netty框架內部對線程,selector作了一些細節的優化,精心設計的reactor多線程模型,能夠實現很是高效地併發處理;服務器
6)Netty已經在多個開源項目(Hadoop的RPC框架avro使用Netty做爲通訊框架)中都獲得了充分驗證,健壯性/可靠性比較好。網絡
2、RocketMQ中RPC通訊的Netty多線程模型多線程
RocketMQ的RPC通訊部分採用了"1+N+M1+M2"的Reactor多線程模式,對網絡通訊部分進行了必定的擴展與優化,這一節主要讓咱們來看下這一部分的具體設計與實現內容。
2.一、Netty的Reactor多線程模型設計概念與簡述
這裏有必要先來簡要介紹下Netty的Reactor多線程模型。Reactor多線程模型的設計思想是分而治之+事件驅動。
(1)分而治之
通常來講,一個網絡請求鏈接的完整處理過程能夠分爲接受(accept)、數據讀取(read)、解碼/編碼(decode/encode)、業務處理(process)、發送響應(send)這幾步驟。Reactor模型將每一個步驟都映射成爲一個任務,服務端線程執行的最小邏輯單元再也不是一次完整的網絡請求,而是這個任務,且採用以非阻塞方式執行。
(2)事件驅動
每一個任務對應特定網絡事件。當任務準備就緒時,Reactor收到對應的網絡事件通知,並將任務分發給綁定了對應網絡事件的Handler執行。
2.二、RocketMQ中RPC通訊的1+N+M1+M2的Reactor多線程設計與實現
(1)RocketMQ中RPC通訊的Reactor多線程設計與流程
RocketMQ的RPC通訊採用Netty組件做爲底層通訊庫,一樣也遵循了Reactor多線程模型,同時又在這之上作了一些擴展和優化。下面先給出一張RocketMQ的RPC通訊層的Netty多線程模型框架圖,讓你們對RocketMQ的RPC通訊中的多線程分離設計有一個大體的瞭解。
從上面的框圖中能夠大體瞭解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個 Reactor 主線程(eventLoopGroupBoss,即爲上面的1)負責監聽 TCP網絡鏈接請求,創建好鏈接後丟給Reactor 線程池(eventLoopGroupSelector,即爲上面的「N」,源碼中默認設置爲3),它負責將創建好鏈接的socket 註冊到 selector上去(RocketMQ的源碼中會自動根據OS的類型選擇NIO和Epoll,也能夠經過參數配置),而後監聽真正的網絡數據。拿到網絡數據後,再丟給Worker線程池(defaultEventExecutorGroup,即爲上面的「M1」,源碼中默認設置爲8)。 爲了更爲高效的處理RPC的網絡請求,這裏的Worker線程池是專門用於處理Netty網絡通訊相關的(包括編碼/解碼、空閒連接管理、網絡鏈接管理以及網絡請求處理)。而處理業務操做放在業務線程池中執行,根據 RomotingCommand 的業務請求碼code去processorTable這個本地緩存變量中找到對應的 processor,而後封裝成task任務後,提交給對應的業務processor處理線程池來執行(sendMessageExecutor,以發送消息爲例,即爲上面的 「M2」)。
下面以表格的方式列舉了下上面所述的「1+N+M1+M2」Reactor多線程模型
(2)RocketMQ中RPC通訊的Reactor多線程的代碼具體實現
說完了Reactor多線程總體的設計與流程,你們應該就對RocketMQ的RPC通訊的Netty部分有了一個比較全面的理解了,那接下來就從源碼上來看下一些細節部分(在看該部分代碼時候須要讀者對JAVA NIO和Netty的相關概念與技術點有所瞭解)。 在NettyRemotingServer的實例初始化時,會初始化各個相關的變量包括serverBootstrap、nettyServerConfig參數、channelEventListener監聽器並同時初始化eventLoopGroupBoss和eventLoopGroupSelector兩個Netty的EventLoopGroup線程池(這裏須要注意的是,若是是Linux平臺,而且開啓了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;不然,就用Java NIO的NioEventLoopGroup。),具體代碼以下:
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
this.nettyServerConfig = nettyServerConfig;
this.channelEventListener = channelEventListener;
//省略部分代碼
//初始化時候nThreads設置爲1,說明RemotingServer端的Disptacher連接管理和分發請求的線程爲1,用於接收客戶端的TCP鏈接
this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet()));
}
});
/**
* 根據配置設置NIO仍是Epoll來做爲Selector線程池
* 若是是Linux平臺,而且開啓了native epoll,就用EpollEventLoopGroup,這個也就是用JNI,調的c寫的epoll;不然,就用Java NIO的NioEventLoopGroup。
*
*/
if (useEpoll()) {
this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
} else {
this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
private int threadTotal = nettyServerConfig.getServerSelectorThreads();
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
}
});
}
//省略部分代碼
複製代碼
在NettyRemotingServer實例初始化完成後,就會將其啓動。Server端在啓動階段會將以前實例化好的1個acceptor線程(eventLoopGroupBoss),N個IO線程(eventLoopGroupSelector),M1個worker 線程(defaultEventExecutorGroup)綁定上去。前面部分也已經介紹過各個線程池的做用了。 這裏須要說明的是,Worker線程拿到網絡數據後,就交給Netty的ChannelPipeline(其採用責任鏈設計模式),從Head到Tail的一個個Handler執行下去,這些 Handler是在建立NettyRemotingServer實例時候指定的。NettyEncoder和NettyDecoder 負責網絡傳輸數據和 RemotingCommand 之間的編解碼。NettyServerHandler 拿到解碼獲得的 RemotingCommand 後,根據 RemotingCommand.type 來判斷是 request 仍是 response來進行相應處理,根據業務請求碼封裝成不一樣的task任務後,提交給對應的業務processor處理線程池處理。
@Override
public void start() {
//默認的處理線程池組,使用默認的處理線程池組用於處理後面的多個Netty Handler的邏輯操做
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
/**
* 首先來看下 RocketMQ NettyServer 的 Reactor 線程模型,
* 一個 Reactor 主線程負責監聽 TCP 鏈接請求;
* 創建好鏈接後丟給 Reactor 線程池,它負責將創建好鏈接的 socket 註冊到 selector
* 上去(這裏有兩種方式,NIO和Epoll,可配置),而後監聽真正的網絡數據;
* 拿到網絡數據後,再丟給 Worker 線程池;
*
*/
//RocketMQ-> Java NIO的1+N+M模型:1個acceptor線程,N個IO線程,M1個worker 線程。
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//服務端處理客戶端鏈接請求是順序處理的,因此同一時間只能處理一個客戶端鏈接,多個客戶端來的時候,服務端將不能處理的客戶端鏈接請求放在隊列中等待處理,backlog參數指定了隊列的大小
.option(ChannelOption.SO_REUSEADDR, true)//這個參數表示容許重複使用本地地址和端口
.option(ChannelOption.SO_KEEPALIVE, false)//當設置該選項之後,若是在兩小時內沒有數據的通訊時,TCP會自動發送一個活動探測數據報文。
.childOption(ChannelOption.TCP_NODELAY, true)//該參數的做用就是禁止使用Nagle算法,使用於小數據即時傳輸
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())//這兩個參數用於操做接收緩衝區和發送緩衝區
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),//rocketmq解碼器,他們分別覆蓋了父類的encode和decode方法
new NettyDecoder(),//rocketmq編碼器
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//Netty自帶的心跳管理器
new NettyConnectManageHandler(),//鏈接管理器,他負責捕獲新鏈接、鏈接斷開、異常等事件,而後統一調度到NettyEventExecuter處理器處理。
new NettyServerHandler()//當一個消息通過前面的解碼等步驟後,而後調度到channelRead0方法,而後根據消息類型進行分發
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
//定時掃描responseTable,獲取返回結果,而且處理超時
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
複製代碼
從上面的描述中能夠歸納得出RocketMQ的RPC通訊部分的Reactor線程池模型框圖。
總體能夠看出RocketMQ的RPC通訊藉助Netty的多線程模型,其服務端監聽線程和IO線程分離,同時將RPC通訊層的業務邏輯與處理具體業務的線程進一步相分離。時間可控的簡單業務都直接放在RPC通訊部分來完成,複雜和時間不可控的業務提交至後端業務線程池中處理,這樣提升了通訊效率和MQ總體的性能。(ps:其中抽象出NioEventLoop來表示一個不斷循環執行處理任務的線程,每一個NioEventLoop有一個selector,用於監聽綁定在其上的socket鏈路。)
note:如今很痛苦,等過陣子回頭看看,會發現其實那都不算事。