摘要:不少人喜歡把RocketMQ與Kafka作對比,其實這兩款消息隊列的網絡通訊層仍是比較類似的,本文就爲你們簡要地介紹下Kafka的NIO網絡通訊模型,經過對Kafka源碼的分析來簡述其Reactor的多線程網絡通訊模型和整體框架結構,同時簡要介紹Kafka網絡通訊層的設計與具體實現。segmentfault
1、Kafka網絡通訊模型的總體框架概述後端
Kafka的網絡通訊模型是基於NIO的Reactor多線程模型來設計的。這裏先引用Kafka源碼中註釋的一段話:緩存
相信你們看了上面的這段引文註釋後,大體能夠了解到Kafka的網絡通訊層模型,主要採用了 1(1個Acceptor線程)+N(N個Processor線程)+M(M個業務處理線程) 。下面的表格簡要的列舉了下(這裏先簡單的看下後面還會詳細說明):安全
線程數線程名線程具體說明1kafka-socket-acceptor_%xAcceptor線程,負責監聽Client端發起的請求Nkafka-network-thread_%dProcessor線程,負責對Socket進行讀寫Mkafka-request-handler-_%dWorker線程,處理具體的業務邏輯並生成Response返回網絡
Kafka網絡通訊層的完整框架圖以下圖所示:數據結構
Kafka消息隊列的通訊層模型—1+N+M模型.png多線程
剛開始看到上面的這個框架圖可能會有一些不太理解,並沒關係,這裏能夠先對Kafka的網絡通訊層框架結構有一個大體瞭解。本文後面會結合Kafka的部分重要源碼來詳細闡述上面的過程。這裏能夠簡單總結一下其網絡通訊模型中的幾個重要概念:併發
(1), Acceptor :1個接收線程,負責監聽新的鏈接請求,同時註冊OP_ACCEPT 事件,將新的鏈接按照 "round robin" 方式交給對應的 Processor 線程處理;框架
(2), Processor :N個處理器線程,其中每一個 Processor 都有本身的 selector,它會向 Acceptor 分配的 SocketChannel 註冊相應的 OP_READ 事件,N 的大小由 「num.networker.threads」 決定;socket
(3), KafkaRequestHandler :M個請求處理線程,包含在線程池—KafkaRequestHandlerPool內部,從RequestChannel的全局請求隊列—requestQueue中獲取請求數據並交給KafkaApis處理,M的大小由 「num.io.threads」 決定;
(4), RequestChannel :其爲Kafka服務端的請求通道,該數據結構中包含了一個全局的請求隊列 requestQueue和多個與Processor處理器相對應的響應隊列responseQueue,提供給Processor與請求處理線程KafkaRequestHandler和KafkaApis交換數據的地方。
(5), NetworkClient :其底層是對 Java NIO 進行相應的封裝,位於Kafka的網絡接口層。Kafka消息生產者對象—KafkaProducer的send方法主要調用NetworkClient完成消息發送;
(6), SocketServer :其是一個NIO的服務,它同時啓動一個Acceptor接收線程和多個Processor處理器線程。提供了一種典型的Reactor多線程模式,將接收客戶端請求和處理請求相分離;
(7), KafkaServer :表明了一個Kafka Broker的實例;其startup方法爲實例啓動的入口;
(8), KafkaApis :Kafka的業務邏輯處理Api,負責處理不一樣類型的請求;好比 「發送消息」、 「獲取消息偏移量—offset」 和 「處理心跳請求」 等;
2、Kafka網絡通訊層的設計與具體實現
這一節將結合Kafka網絡通訊層的源碼來分析其設計與實現,這裏主要詳細介紹網絡通訊層的幾個重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源碼部分均基於Kafka的0.11.0版本。
一、SocketServer
SocketServer是接收客戶端Socket請求鏈接、處理請求並返回處理結果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這裏實現的。在KafkaServer實例啓動時會調用其startup的初始化方法,會初始化1個 Acceptor和N個Processor線程(每一個EndPoint都會初始化,通常來講一個Server只會設置一個端口),其實現以下:
二、Acceptor
Acceptor是一個繼承自抽象類AbstractServerThread的線程類。Acceptor的主要任務是監聽而且接收客戶端的請求,同時創建數據傳輸通道—SocketChannel,而後以輪詢的方式交給一個後端的Processor線程處理(具體的方式是添加socketChannel至併發隊列並喚醒Processor線程處理)。
在該線程類中主要能夠關注如下兩個重要的變量:
(1), nioSelector :經過NSelector.open()方法建立的變量,封裝了JAVA NIO Selector的相關操做;
(2), serverChannel :用於監聽端口的服務端Socket套接字對象;
下面來看下Acceptor主要的run方法的源碼:
在上面源碼中能夠看到,Acceptor線程啓動後,首先會向用於監聽端口的服務端套接字對象—ServerSocketChannel上註冊OP_ACCEPT 事件。而後以輪詢的方式等待所關注的事件發生。若是該事件發生,則調用accept()方法對OP_ACCEPT事件進行處理。這裏,Processor是經過 round robin 方法選擇的,這樣能夠保證後面多個Processor線程的負載基本均勻。
Acceptor的accept()方法的做用主要以下:
(1)經過SelectionKey取得與之對應的serverSocketChannel實例,並調用它的accept()方法與客戶端創建鏈接;
(2)調用connectionQuotas.inc()方法增長鏈接統計計數;並同時設置第(1)步中建立返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
(3)將socketChannel交給processor.accept()方法進行處理。這裏主要是將socketChannel加入Processor處理器的併發隊列newConnections隊列中,而後喚醒Processor線程從隊列中獲取socketChannel並處理。其中,newConnections會被Acceptor線程和Processor線程併發訪問操做,因此newConnections是ConcurrentLinkedQueue隊列(一個基於連接節點的無界線程安全隊列)
三、Processor
Processor同Acceptor同樣,也是一個線程類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請求中讀取數據和將KafkaRequestHandler處理完響應結果返回給客戶端。在該線程類中主要關注如下幾個重要的變量:
(1), newConnections :在上面的 Acceptor 一節中已經提到過,它是一種ConcurrentLinkedQueue[SocketChannel]類型的隊列,用於保存新鏈接交由Processor處理的socketChannel;
(2), inflightResponses :是一個Map[String, RequestChannel.Response]類型的集合,用於記錄還沒有發送的響應;
(3), selector :是一個類型爲KSelector變量,用於管理網絡鏈接;
下面先給出Processor處理器線程run方法執行的流程圖:
Kafk_Processor線程的處理流程圖.png
從上面的流程圖中可以能夠看出Processor處理器線程在其主流程中主要完成了這樣子幾步操做:
(1), 處理newConnections隊列中的socketChannel 。遍歷取出隊列中的每一個socketChannel並將其在selector上註冊OP_READ事件;
(2), 處理RequestChannel中與當前Processor對應響應隊列中的Response 。在這一步中會根據responseAction的類型(NoOpAction/SendAction/CloseConnectionAction)進行判斷,若爲「NoOpAction」,表示該鏈接對應的請求無需響應;若爲「SendAction」,表示該Response須要發送給客戶端,則會經過「selector.send」註冊OP_WRITE事件,而且將該Response從responseQueue響應隊列中移至inflightResponses集合中;「CloseConnectionAction」,表示該鏈接是要關閉的;
(3), 調用selector.poll()方法進行處理 。該方法底層即爲調用nioSelector.select()方法進行處理。
(4), 處理已接受完成的數據包隊列—completedReceives 。在processCompletedReceives方法中調用「requestChannel.sendRequest」方法將請求Request添加至requestChannel的全局請求隊列—requestQueue中,等待KafkaRequestHandler來處理。同時,調用「selector.mute」方法取消與該請求對應的鏈接通道上的OP_READ事件;
(5), 處理已發送完的隊列—completedSends 。當已經完成將response發送給客戶端,則將其從inflightResponses移除,同時經過調用「selector.unmute」方法爲對應的鏈接通道從新註冊OP_READ事件;
(6), 處理斷開鏈接的隊列 。將該response從inflightResponses集合中移除,同時將connectionQuotas統計計數減1;
四、RequestChannel
在Kafka的網絡通訊層中,RequestChannel爲Processor處理器線程與KafkaRequestHandler線程之間的數據交換提供了一個數據緩衝區,是通訊過程當中Request和Response緩存的地方。所以,其做用就是在通訊中起到了一個數據緩衝隊列的做用。Processor線程將讀取到的請求添加至RequestChannel的全局請求隊列—requestQueue中;KafkaRequestHandler線程從請求隊列中獲取並處理,處理完之後將Response添加至RequestChannel的響應隊列—responseQueue中,並經過responseListeners喚醒對應的Processor線程,最後Processor線程從響應隊列中取出後發送至客戶端。
五、KafkaRequestHandler
KafkaRequestHandler也是一種線程類,在KafkaServer實例啓動時候會實例化一個線程池—KafkaRequestHandlerPool對象(包含了若干個KafkaRequestHandler線程),這些線程以守護線程的方式在後臺運行。在KafkaRequestHandler的run方法中會循環地從RequestChannel中阻塞式讀取request,讀取後再交由KafkaApis來具體處理。
六、KafkaApis
KafkaApis是用於處理對通訊網絡傳輸過來的業務消息請求的中心轉發組件。該組件反映出Kafka Broker Server能夠提供哪些服務。
3、總結
仔細閱讀Kafka的NIO網絡通訊層的源碼過程當中仍是能夠收穫很多關於NIO網絡通訊模塊的關鍵技術。Apache的任何一款開源中間件都有其設計獨到之處,值得借鑑和學習。對於任何一位使用Kafka這款分佈式消息隊列的同窗來講,若是可以在必定實踐的基礎上,再經過閱讀其源碼能起到更爲深刻理解的效果,對於大規模Kafka集羣的性能調優和問題定位都大有裨益。
對於剛接觸Kafka的同窗來講,想要本身掌握其NIO網絡通訊層模型的關鍵設計,還須要不斷地使用本地環境進行debug調試和閱讀源碼反覆思考。
文章來源:https://segmentfault.com/a/1190000016555478
推薦閱讀:https://www.roncoo.com/course/view/a398edee6308413f904f9c82b1dfc122