上次咱們經過分析KafkaProducer的源碼瞭解了生產端的主要流程,今天學習下服務端的網絡層主要作了什麼,先看下 KafkaServer的總體架構圖api
由圖可見Kafka的服務端主要包括網絡層、API層、日誌子系統、副本子系統這幾個大模塊。當client端發起請求時,網絡層會收到請求,並把請求放到共享請求隊列中,而後由API層的Handler線程從隊列中取出請求,並執行請求。好比是 KafkaProducer發過來的生產消息的請求,會把消息寫到磁盤日誌中,最後把響應返回給client網絡
從上面的圖中,能夠看到Kafka服務端作的事情仍是不少的,也有不少優秀的設計,咱們後面再慢慢介紹,今天主要學習網絡層架構
網絡層主要完成和客戶端、其餘Broker的網絡鏈接,採用了Reactor模式,一種基於事件驅動的模式,以前寫過相關文章,能夠參考下(連接)這裏再也不贅述socket
網絡層的核心類是SocketServer,包含一個Acceptor用來接收新的鏈接,Acceptor對應多個Processor線程,每一個 Processor線程都有本身的Selector,用來從鏈接中讀取請求並寫回響應ide
同時一個Acceptor線程對應多個Handler線程,這纔是真正處理請求的線程,Handler線程處理完請求後把響應返回給 Processor線程,其中Processor線程和Handler線程經過RequestChannel傳遞數據,ReqeustChannel包括共享的RequestQueue和Processor私有的ResponseQueue,下面附上一張圖源碼分析
上面說的有些抽象,咱們深刻到源碼中看看Kafka服務端是如何接收請求並把響應返回給客戶端的學習
KafkaServer是Kafka服務端的主類,KafkaServer中和網絡層相關的組件包括SockerServer、KafkaApis和KafkaRequestHandlerPool。其中KafkaApis和KafkaRequestHandlerPool都是經過SocketServer提供的RequestChannel來處理網絡請求,和本次介紹的網絡層相關的代碼以下所示ui
class KafkaServer(...) {
...
var dataPlaneRequestProcessor: KafkaApis = null
...
def startup(): Unit = {
...
// SocketServer主要關注網絡層相關事宜
socketServer = new SocketServer(...)
// Processor涉及到權限相關,因此調用SocketServer.startup時,只先啓動Acceptor,初始化完權限相關的憑證後,再啓動Processor,因此startupProcessors爲false
socketServer.startup(startupProcessors = false)
...
// KafkaApis進行真正的業務邏輯處理
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel ,...)
// 建立KafkaRequestHandler線程池,其中KafkaRequestHandler主要用來從請求通道中取請求
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, config.numIoThreadsx,...)
...
// 初始化結束
...
// Processor涉及到權限相關的操做,因此要等初始化完成,再啓動Processor
socketServer.startDataPlaneProcessors()
...
}
}複製代碼
咱們把KafkaServer網絡層再用一張帶有具體的類的圖展現下this
具體步驟spa
1.客戶端(NetworkClient)發送請求被接收器(Acceptor)轉發給處理器(Processor)處理
2.處理器把請求放到請求通道(RequestChannel)的共享請求隊列中
3.請求處理器線程(KafkaRequestHandler)從請求通道的共享請求隊列中取出請求
4.業務邏輯處理器(KafkaApis)進行業務邏輯處理
5.業務邏輯處理器把響應發到請求通道中與各個處理器對應的響應隊列中
6.處理器從對應的響應隊列中取出響應
7.處理器將響應的結果返回給客戶端複製代碼
經過KafkaServer相關源碼咱們知道了總體的大概處理流程,既然今天主要學習網絡鏈接相關源碼,下面咱們看下SocketServer.startup都作了什麼
// SocketServer.startup
def startup(startupProcessors: Boolean = true): Unit = {
this.synchronized {
// 初始化控制每一個IP上的最大鏈接數的對象,底層是Map
connectionQuotas = new ConnectionQuotas(config, time)
...
// 建立接收器和處理器
createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
// 判斷是否須要啓動處理器,在KafkaServer初始化代用SocketServer.startup方法時,startipProcessors傳
// 的是false,緣由請看上面介紹KafkaServer的源碼
if (startupProcessors) {
// 啓動處理器
startControlPlaneProcessor()
...
}
}
...
} 複製代碼
// 建立接收器和處理器,同時並啓動接收器
private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,endpoints: Seq[EndPoint]): Unit = synchronized {
// 遍歷broker節點,爲每一個broker節點都建立接收器
endpoints.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
// 建立接收器
val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
// 建立處理器,並把處理器添加到RequestChannel和Acceptor的處理器列表中
addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
// 啓動接收器線程
KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
// 等待啓動完成
dataPlaneAcceptor.awaitStartup()
// 把啓動好的處理器添加到map中,並和broker作好了映射
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
}
}複製代碼
從上面能夠總結出SocketServer.startup方法主要建立了接收器和處理器,同時把接收器啓動,但並未啓動處理器,由於處理器會用到權限,須要等KafkaServer初始化完成,會單獨啓動處理器。把建立好的處理器添加到請求通道和接收器的處理器列表中
既然前面建立並啓動了接收器,那我們看下接收器都作了什麼?
def run(): Unit = {
// 將ServerSocketChannel註冊到Selector的OP_ACCEPT事件上
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
// 標誌啓動完成
startupComplete()
try {
var currentProcessorIndex = 0
while (isRunning) {
try {
// 選擇器輪訓請求
val ready = nioSelector.select(500)
if (ready > 0) {
// 獲取全部的選擇鍵
val keys = nioSelector.selectedKeys()
// 遍歷選擇鍵
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable) {
//OP_ACCEPT事件發生,獲取註冊到選擇鍵上的ServerSocketChannel,而後調用其accept方法,創建一個客戶端和服務端的鏈接通道
accept(key).foreach { socketChannel =>
// 重試的次數=該接收器對應的處理器的個數
var retriesLeft = synchronized(processors.length)
var processor: Processor = null
do {
// 重試次數減一
retriesLeft -= 1
processor = synchronized {
// 採用輪訓的方式把客戶端的鏈接通道分配給處理器即每一個處理器都會有多個 SocketChannel,對應多個客戶端的鏈接
currentProcessorIndex = currentProcessorIndex % processors.length
// 根據索引獲取對應的處理器
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
// 若是新鏈接的隊列滿了,一直阻塞直到最後一個處理器能夠可以接收鏈接
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
...複製代碼
服務端的接收器主要負責接收客戶端的鏈接,由上面的源碼可知,接收器線程啓動的時候,就註冊了OPACCEPT事件,當客戶端發起鏈接時,接收器線程就能監聽到OPACCEPT事件,而後獲取綁定到選擇鍵上的ServerSocketChannel,並調用ServerSocketChannel的accept方法,創建一個客戶端和服務端的鏈接通道,看下面的圖
1.服務端ServerSocketChannel在選擇器上註冊OP_ACCEPT事件
2.客戶端在選擇器上註冊OP_CONNECT事件
3.服務端的選擇器輪訓到OP_ACCEPT事件,接收客戶端的鏈接
4.服務端的ServerSocketChannel調用accept方法建立和客戶端的通道SocketChannel複製代碼
經過前面的介紹,咱們知道了接收器會經過輪訓的方式把客戶端的SocketChannel分配給處理器,這樣每一個處理器會對應多個SocketChannel,從而也就對應多個客戶端。處理器會把接收器分配的SocketChannel放到本身的阻塞隊列中,而後喚醒其對應的選擇器工做,下面是對應的源碼
do {
processor = synchronized {
// 採用輪訓的方式把客戶端的鏈接通道分配給處理器即每一個處理器都會有多個SocketChannel,對應多個客戶端的鏈接
currentProcessorIndex = currentProcessorIndex % processors.length
// 根據索引獲取對應的處理器
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) //把SocketChannel分配給處理器複製代碼
private def assignNewConnection(socketChannel: SocketChannel, processor: Processor, mayBlock: Boolean): Boolean = {
if (processor.accept(socketChannel, mayBlock, blockedPercentMeter)) {
...
}複製代碼
def accept(socketChannel: SocketChannel, mayBlock: Boolean,
acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = {
val accepted = {
// 往處理器的阻塞隊列中添加SocketChannel
if (newConnections.offer(socketChannel))
true
...
if (accepted)
// 喚醒選擇器線程開始輪詢,原來的輪詢由於沒有事件到來被阻塞
wakeup()
accepted
}複製代碼
下面咱們再看下Processor的run方法
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
// 把新的SocketChannel註冊OP_READ事件到選擇器上
configureNewConnections()
// 註冊OP_WRITE事件,用來給客戶端寫回響應
processNewResponses()
// 選擇器輪訓事件,用來讀取請求、發送響應,默認超時時間爲300ms
poll()
// 處理已經接收完成的客戶端請求
processCompletedReceives()
// 處理已經完成的響應
processCompletedSends()
...![file](https://user-gold-cdn.xitu.io/2019/9/15/16d34913299554be?w=600&h=600&f=jpeg&s=84948)複製代碼
private def configureNewConnections(): Unit = {var connectionsProcessed = 0
// 從隊列中取出SocketChannel
val channel = newConnections.poll()
// 註冊OP_READ事件
// 根據本地的地址和端口、遠程客戶端的地址和端口構建惟一的connectionId
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1
...複製代碼
看下register方法,一直跟到最裏面
public void register(String id, SocketChannel socketChannel) throws IOException {
registerChannel(id, socketChannel, SelectionKey.OP_READ);
}複製代碼
protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
// 把SocketChannel註冊OP_READ到選擇器上,同時綁定SelectionKey
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
// 構建Kafka通道KafkaChannel並將其綁定到選擇鍵上
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
...
return key;
}複製代碼
private KafkaChannel buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, SelectionKey key) throws IOException {
// 構建Kafka通道
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
// 將Kafka通道綁定到選擇鍵上,這樣就能夠根據選擇鍵獲取到對應的通道
key.attach(channel);
return channel;
...
}複製代碼
到這裏configureNewConnections()的任務(把新的SocketChannel註冊OP_READ事件到選擇器上)已經完成
下面看下處理器如何響應結果的
private def processNewResponses(): Unit = {var currentResponse: RequestChannel.Response = null
// 處理器還有響應要發送
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
// KafkaChannel惟一標識
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
case response: NoOpResponse =>
...
// 沒有響應發送給客戶端,須要讀取更多請求,註冊讀事件
tryUnmuteChannel(channelId)
case response: SendResponse =>
// 有響應要發送給客戶端,註冊寫事件
sendResponse(response, response.responseSend)
...複製代碼
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
val connectionId = response.request.context.connectionId
...
if (openOrClosingChannel(connectionId).isDefined) {
// 將響應經過Selector標記爲Send,實際發送經過poll輪詢完成
selector.send(responseSend)
// 添加到 inflightResponses 底層是可變的Map key:connectionId value:response
inflightResponses += (connectionId -> response)
}
}複製代碼
configureNewConnections()和processNewResponses()方法結束後,開始執行poll()方法,選擇器輪詢事件,讀取請求,寫回響應,而後處理已經完成的接收和響應
private def processCompletedReceives(): Unit = {
selector.completedReceives.asScala.foreach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
// 解析ByteBuffer 到ReuestHeader
val header = RequestHeader.parse(receive.payload)
val connectionId = receive.source
// 組裝RequestContext對象
val context = new RequestContext(header, connectionId, channel.socketAddress,...)
// 構建 RequestChannel.Request對象 包含了處理器的編號,響應對象中能夠獲取到,這樣就能保證請求和響應的處理都是在同一個處理器中完成
val req = new RequestChannel.Request(processor = id, context = context,...)
// 發送給 RequestChannel 處理
requestChannel.sendRequest(req)
// 移除OP_READ讀事件,已經接收到請求了,因此就不用再讀了
selector.mute(connectionId)
...複製代碼
private def processCompletedSends(): Unit = {
selector.completedSends.asScala.foreach { send =>
try {
// 結束的寫請求要從inflightResponses移出
val response = inflightResponses.remove(send.destination).getOrElse {...}
...
// 添加 OP_READ 讀事件,這樣就能夠繼續讀取客戶端發來的請求
tryUnmuteChannel(send.destination)
... 複製代碼
到這裏服務端和網絡鏈接相關的源碼已經介紹完了,咱們知道處理器把請求放到了請求隊列裏,同時從響應隊列裏獲取響應返回給客戶端,那誰去處理另外請求隊列裏的請求?又是誰把響應放到了處理器的響應隊列裏呢?
在前面的介紹中,咱們知道KafkaServer在初始化時會建立請求處理線程池(KafkaRequestHandlerPool),請求處理線程池會建立並啓動請求處理線程(KafkaRequestHandler),每一個請求處理線程都能能夠訪問到共享的請求隊列,這樣請求處理線程就能夠從請求隊列裏獲取請求,而後交給KafkaApis處理。
// KafkaServer 會建立 KafkaRequestHandlerPool,同時把請求隊列傳過去
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, ...)
複製代碼
// 請求處理線程的數量取決於配置 num.io.threads,默認是8
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
// 建立 KafkaRequestHandler Kafka請求處理線程
createHandler(i)
}
def createHandler(id: Int): Unit = synchronized {
// 每一個請求處理線程都是共享同一個 RequestChannel
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
// 啓動請求處理線程
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}複製代碼
def run(): Unit = {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從請求隊列裏獲取下一個請求或者阻塞到超時
val req = requestChannel.receiveRequest(300)
...
req match {
...
case request: RequestChannel.Request =>
try {
// 交給KafkaApis處理
apis.handle(request)
...複製代碼
經過源碼分析能夠看出請求處理線程任務很簡單,就是從共享的請求隊列裏取出請求,而後調用KafakaApis處理請求
在KafkaServer初始化時,初始化了真正的服務端請求處理器
// 全局的服務端入口
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel,...)複製代碼
請求處理線程調用KafkaApis的handle方法處理業務邏輯
def request.header.apiKey match {
// 處理客戶端生產消息的請求
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
...複製代碼
可見Kafka服務端的請求處理入口KafkaApis根據請求的類型選擇不一樣的處理器,至於服務端對這些請求作了什麼,咱們下次再分享
1.《Kafka技術內幕》
2.《Apache Kafka源碼剖析》
3.Kafka最新的trunk分支代碼