本文分析的Kafka代碼爲kafka-0.8.2.1。另外,因爲Kafka目前提供了兩套Producer代碼,一套是Scala版的舊版本;一套是Java版的新版本。雖然Kafka社區極力推薦你們使用Java版本的producer,但目前不少已有的程序仍是調用了Scala版的API。今天咱們就分析一下舊版producer的代碼。java
val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests") .withRequiredArg .describedAs("request required acks") .ofType(classOf[java.lang.Integer]) .defaultsTo(0) // 此處默認設置爲0
do { message = reader.readMessage() // 從LineMessageReader類中讀取消息。該類接收鍵盤輸入的一行文本做爲消息 if(message != null) producer.send(message.topic, message.key, message.message) // key默認是空,若是想要指定,需傳入參數parse.key=true,默認key和消息文本之間的分隔符是'\t' } while(message != null) // 循環接收消息,除非Ctrl+C或其餘其餘引起IOException操做跳出循環
下面代碼是Producer.scala中的發送方法: api
def send(messages: KeyedMessage[K,V]*) { lock synchronized { if (hasShutdown.get) //若是producer已經關閉了拋出異常退出 throw new ProducerClosedException recordStats(messages //更新producer統計信息 sync match { case true => eventHandler.handle(messages) //若是是同步發送,直接使用DefaultEventHandler的handle方法發送 case false => asyncSend(messages) // 不然,使用ayncSend方法異步發送消息——本文不考慮這種狀況 } } }
由上面的分析能夠看出,真正的發送邏輯實際上是由DefaultEventHandler類的handle方法來完成的。下面咱們重點分析一下這個類的代碼結構。數組
5、DefaultEventHandler與消息發送緩存
這個類的handler方法能夠同時支持同步和異步的消息發送。咱們這裏只考慮同步的代碼路徑。下面是消息發送的完整流程圖:併發
如下代碼是發送消息的核心邏輯:app
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) { // 屬性message.send.max.retries指定了消息發送的重試次數,而outstandingProducerRequests就是序列化以後待發送的消息集合 topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //將待發送消息所屬topic加入到待刷新元數據的topic集合 if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { //查看是否已過刷新元數據時間間隔 Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) // 更新topic元數據信息 sendPartitionPerTopicCache.clear() //若是消息key是空,代碼隨機選擇一個分區並記住該分區,之後該topic的消息都會往這個分區裏面發送。sendPartitionPerTopicCache就是這個緩存 topicMetadataToRefresh.clear //清空待刷新topic集合 lastTopicMetadataRefreshTime = SystemTime.milliseconds } outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) // 真正的消息發送方法 if (outstandingProduceRequests.size > 0) { // 若是還有未發送成功的消息 info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1)) // back off and update the topic metadata cache before attempting another send operation Thread.sleep(config.retryBackoffMs) // 等待一段時間並重試 // get topics of the outstanding produce requests and refresh metadata for those Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() remainingRetries -= 1 // 更新剩餘重試次數 producerStats.resendRate.mark() } }
下面具體說說各個子模塊的代碼邏輯: dom
serializedMessages += new KeyedMessage[K,Message]( topic = e.topic, key = e.key, partKey = e.partKey, message = new Message(bytes = encoder.toBytes(e.message))) // new Message時沒有指定key
構建完KeyedMessage以後返回對應的消息集合便可。異步
def updateInfo(topics: Set[String], correlationId: Int) { var topicsMetadata: Seq[TopicMetadata] = Nil // TopicMetadata = topic信息+ 一組PartitionMetadata (partitionId + leader + AR + ISR) val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) //構造TopicMetadataRequest並隨機排列全部broker,而後從第一個broker開始嘗試發送請求。一旦成功就終止後面的請求發送嘗試。 topicsMetadata = topicMetadataResponse.topicsMetadata //從response中取出zookeeper中保存的對應topic元數據信息 // throw partition specific exception topicsMetadata.foreach(tmd =>{ trace("Metadata for topic %s is %s".format(tmd.topic, tmd)) if(tmd.errorCode == ErrorMapping.NoError) { topicPartitionInfo.put(tmd.topic, tmd) //更新到broker的topic元數據緩存中 } else warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) { warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId, ErrorMapping.exceptionFor(pmd.errorCode).getClass)) } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata }) }) producerPool.updateProducer(topicsMetadata) }
關於上面代碼中的最後一行, 咱們須要着重說一下。每一個producer應用程序都會保存一個producer池對象來緩存每一個broker上對應的同步producer實例。具體格式爲brokerId -> SyncProducer。SyncProducer表示一個同步producer,其主要的方法是send,支持兩種請求的發送:ProducerRequest和TopicMetadataRequest。前者是發送消息的請求,後者是更新topic元數據信息的請求。爲何須要這份緩存呢?咱們知道,每一個topic分區都應該有一個leader副本在某個broker上,而只有leader副本才能接收客戶端發來的讀寫消息請求。對producer而言,即只有這個leader副本所在的broker才能接收ProducerRequest請求。在發送消息時候,咱們會首先找出這個消息要發給哪一個topic,而後發送更新topic元數據請求給任意broker去獲取最新的元數據信息——這部分信息中比較重要的就是要獲取topic各個分區的leader副本都在哪些broker上,這樣咱們稍後會建立鏈接那些broker的阻塞通道(blocking channel)去實現真正的消息發送。Kafka目前的作法就是重建全部topic分區的leader副本所屬broker上對應的SyncProducer實例——雖然我以爲這樣實現有線沒有必要,只更新消息所屬分區的緩存信息應該就夠了(固然,這只是個人觀點,若是有不一樣意見歡迎拍磚)。如下是更新producer緩存的一些關鍵代碼:socket
val newBrokers = new collection.mutable.HashSet[Broker] topicMetadata.foreach(tmd => { tmd.partitionsMetadata.foreach(pmd => { if(pmd.leader.isDefined) //遍歷topic元數據信息中的每一個分區元數據實例,若是存在leader副本的,添加到newBrokers中以備後面更新緩存使用 newBrokers+=(pmd.leader.get) }) }) lock synchronized { newBrokers.foreach(b => { //遍歷newBrokers中的每一個broker實例,若是在緩存中已經存在,直接關閉掉而後建立一個新的加入到緩存中;不然直接建立一個加入 if(syncProducers.contains(b.id)){ syncProducers(b.id).close() syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) } else syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b)) }) }
前面說了,若是隻發送一條消息的話,其實真正須要更新的分區leader副本所述broker對應的SyncProducer實例只有一個,但目前的代碼中會更新全部分區,不知道Java版本的producer是否也是這樣實現,這須要後面繼續調研! async
Topic | 分區 | Leader副本所在的broker ID |
test-topic | P0 | 0 |
test-topic | P1 | 1 |
test-topic | P2 | 3 |
若是基於這樣的配置,假定咱們使用producer API一次性發送4條消息,分別是M1,M2, M3和M4。如今就能夠開始分析代碼了,首先從消息分組及整理開始:
消息 | 要被髮送到的分區ID | 該分區leader副本所在broker ID |
M1 | P0 | 0 |
M2 | P0 | 0 |
M3 | P1 | 1 |
M4 | P2 | 3 |
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 隨機肯定broker id val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) // 加入緩存中以便後續使用
def startup() { ... // 建立一個請求處理的線程池,在構造時就會開啓多個線程準備接收請求 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) ... } class KafkaRequestHandlerPool { ... for(i <- 0 until numThreads) { runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis) threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i)) threads(i).start() // 啓動每一個請求處理線程 } ... }
KafkaRequestHandler其實是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,以下面的代碼所示:
class KafkaRequestHandler... extends Runnable { ... def run() { ... while (true) { ... apis.handle(request) // 調用apis.handle等待請求處理 } ... } ... }
在KafkaApis中handle的主要做用就是接收各類類型的請求。本文只關注ProducerRequest請求:
def handle(request: RequestChannel.Request) { ... request.requestId match { case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 若是接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理 case ... } ... }
如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之因此叫這個名字,是由於它同時能夠處理ProducerRequest和OffsetCommitRequest兩種請求,後者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2以後kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中咱們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,以下圖所示:
總體邏輯看上去很是簡單,以下面的代碼所示:
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { ... val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 將消息追加寫入本地提交日誌 val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 計算是否存在發送失敗的分區 if(produceRequest.requiredAcks == 0) { // request.required.acks = 0時的代碼路徑 if (numPartitionsInError != 0) { info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) // 關閉底層Socket以告知客戶端程序有發送失敗的狀況 } else { ... } } else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0時的代碼路徑,固然還有其餘兩個條件 produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 發送response給客戶端 } else { // request.required.acks = -1時的代碼路徑 // create a list of (topic, partition) pairs to use as keys for this delayed request val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce(...) // 此時須要構造延時請求進行處理,此段邏輯比較複雜,須要理解Purgatory的概念,本文暫不考慮 ... }
由上面代碼可見,不管request.required.acks是何值,都須要首先將待發送的消息集合追加寫入本地的提交日誌中。此時如何按照默認值是是0的狀況,那麼這寫入日誌後須要判斷下全部消息是否都已經發送成功了。若是出現了發送錯誤,那麼就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。如今的關鍵是追加寫是如何完成的?即方法appendToLocalLog如何實現的?該方法總體邏輯流程圖以下圖所示:
因爲邏輯很直觀,不對代碼作詳細分析,不過值得關注的是這個方法會捕獲不少異常:
異常名稱 | 具體含義 | 異常處理 |
KafakStorageException | 這多是不可恢復的IO錯誤 | 既然沒法恢復,則終止該broker上JVM進程 |
InvalidTopicException | 顯式給__consumer_offsets topic發送消息就會有這個異常拋出,不要這麼作,由於這是內部topic | 將InvalidTopicException封裝進ProduceResult返回 |
UnknownTopicOrPartitionException | topic或分區不在該broker上時拋出該異常 | 將UnknownTopicOrPartitionException封裝進ProduceResult返回 |
NotLeaderForPartitionException | 目標分區的leader副本不在該broker上 | 將NotLeaderForPartitionException封裝進ProduceResult返回 |
NotEnoughReplicasException | 只會出如今request.required.acks=-1且ISR中的副本數不知足min.insync.replicas指定的最少副本數時會拋出該異常 | 將NotEnoughReplicasException封裝進ProduceResult返回 |
其餘 | 處理ProducerRequest時發生的其餘異常 | 將對應異常封裝進ProduceResult返回 |
okay,貌似如今咱們就剩下最後一個主要的方法沒說了。分析完這個方法以後整個producer發送消息的流程應該就算是完整地走完了。最後的這個方法就是Partition的appendMessagesToLeader,其主要代碼以下:
def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = { inReadLock(leaderIsrUpdateLock) { val leaderReplicaOpt = leaderReplicaIfLocal() // 判斷目標分區的leader副本是否在該broker上 leaderReplicaOpt match { case Some(leaderReplica) => // 若是leader副本在該broker上 val log = leaderReplica.log.get // 獲取本地提交日誌文件句柄 val minIsr = log.config.minInSyncReplicas val inSyncSize = inSyncReplicas.size // Avoid writing to leader if there are not enough insync replicas to make it safe if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等於-1時纔會判斷ISR數是否不足 throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]" .format(topic,partitionId,minIsr,inSyncSize)) } val info = log.append(messages, assignOffsets = true) // 真正的寫日誌操做,因爲涉及Kafka底層寫日誌的,之後有機會寫篇文章專門探討這部分功能 // probably unblock some follower fetch requests since log end offset has been updated replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 maybeIncrementLeaderHW(leaderReplica) info case None => // 若是不在,直接拋出異常代表leader不在該broker上 throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d" .format(topic, partitionId, localBrokerId)) } }
至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,能夠發現Kafka設計的思路就是在每一個broker上啓動一個server不斷地處理從客戶端發來的各類請求,完成對應的功能並按需返回對應的response。但願本文能對但願瞭解Kafka producer機制的人有所幫助。