【原創】Kafka producer原理 (Scala版同步producer)

本文分析的Kafka代碼爲kafka-0.8.2.1。另外,因爲Kafka目前提供了兩套Producer代碼,一套是Scala版的舊版本;一套是Java版的新版本。雖然Kafka社區極力推薦你們使用Java版本的producer,但目前不少已有的程序仍是調用了Scala版的API。今天咱們就分析一下舊版producer的代碼。java

 producer還分爲同步和異步模式,由屬性producer.type指定,默認是sync,即同步發送模式。本文主要關注於同步發送的代碼走讀。下面以console-producer爲例——console producer是Kafka自帶的一個工具,它能夠很方便地以鍵盤輸入的方式接收消息併發送給指定的topic,很是適合做爲咱們學習的一個起點。
 
1、運行console-producer命令
咱們的第一步是要啓動一個console-producer實例。最簡單的方式就是使用下面的命令:
 
除了絕對必要的topic, borker-list屬性,咱們並無指定其餘的參數。這幾乎是最簡單的啓動方式了。
【刊誤】console-producer若是不指定--sync默認應該是異步發送消息而非同步的,筆者以前說錯了,因此命令應該調整爲:
  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --sync
 
2、構建Producer配置信息
producer的第一步就是要構造producer的配置信息,好比metadata.broker.list和request.required.acks等,完整的參數列表能夠查詢Kafka官網,這些參數部分能夠由啓動console-producer時候指定,部分是有默認值的。舉例來講,對於metadata.broker.list這樣必需要指定的參數,在調用console-producer時候就必須傳入broker-list的值給它賦值;而像request.required.acks這樣的參數,雖然從名稱上來看也是必要參數,但console-producer代碼中提供了默認值,所以咱們能夠選擇不顯式提供request-required-acks的值,以下面代碼所示:
 val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
      .withRequiredArg
      .describedAs("request required acks")
      .ofType(classOf[java.lang.Integer])
      .defaultsTo(0) // 此處默認設置爲0
3、構建JVM Shutdownhook
console-producer代碼此處添加了一個JVM關閉鉤子,用於確保producer的關閉。
 
4、發送消息
代碼此處循環從鍵盤中接收一行文本做爲消息發送。須要注意的時,默認狀況下構造的消息是沒有key的。因爲是同步發送,每條消息都會在Producer的send方法中調用DefaultEventHandler的send方法進行發送,如下代碼是ConsoleProducer.scala中消息發送部分代碼:
do {
          message = reader.readMessage()    // 從LineMessageReader類中讀取消息。該類接收鍵盤輸入的一行文本做爲消息
          if(message != null)
            producer.send(message.topic, message.key, message.message) // key默認是空,若是想要指定,需傳入參數parse.key=true,默認key和消息文本之間的分隔符是'\t'
} while(message != null) // 循環接收消息,除非Ctrl+C或其餘其餘引起IOException操做跳出循環

下面代碼是Producer.scala中的發送方法:  api

def send(messages: KeyedMessage[K,V]*) {
    lock synchronized {
      if (hasShutdown.get) //若是producer已經關閉了拋出異常退出
        throw new ProducerClosedException
      recordStats(messages //更新producer統計信息
      sync match {
        case true => eventHandler.handle(messages) //若是是同步發送,直接使用DefaultEventHandler的handle方法發送
        case false => asyncSend(messages) // 不然,使用ayncSend方法異步發送消息——本文不考慮這種狀況
      }
    }
  }

由上面的分析能夠看出,真正的發送邏輯實際上是由DefaultEventHandler類的handle方法來完成的。下面咱們重點分析一下這個類的代碼結構。數組

 

5、DefaultEventHandler與消息發送緩存

這個類的handler方法能夠同時支持同步和異步的消息發送。咱們這裏只考慮同步的代碼路徑。下面是消息發送的完整流程圖:併發

  

如下代碼是發送消息的核心邏輯:app

while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {  // 屬性message.send.max.retries指定了消息發送的重試次數,而outstandingProducerRequests就是序列化以後待發送的消息集合
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic) //將待發送消息所屬topic加入到待刷新元數據的topic集合
      if (topicMetadataRefreshInterval >= 0 &&    
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { //查看是否已過刷新元數據時間間隔
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) // 更新topic元數據信息
        sendPartitionPerTopicCache.clear() //若是消息key是空,代碼隨機選擇一個分區並記住該分區,之後該topic的消息都會往這個分區裏面發送。sendPartitionPerTopicCache就是這個緩存
        topicMetadataToRefresh.clear //清空待刷新topic集合
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests) // 真正的消息發送方法
      if (outstandingProduceRequests.size > 0) { // 若是還有未發送成功的消息
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        // back off and update the topic metadata cache before attempting another send operation
        Thread.sleep(config.retryBackoffMs) // 等待一段時間並重試
        // get topics of the outstanding produce requests and refresh metadata for those
        Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        remainingRetries -= 1 // 更新剩餘重試次數
        producerStats.resendRate.mark()
      }
    }

下面具體說說各個子模塊的代碼邏輯:  dom

5.1 serialize方法
該方法雖然是叫序列化,但其實主要的做用就是將字節數組格式的消息體轉成KeyedMessage格式。因爲默認狀況下咱們沒有指定key,所以在構造KeyedMessage時就只須要指定消息體就行了,以下面的代碼所示:
serializedMessages += 
    new KeyedMessage[K,Message](
    topic = e.topic, 
    key = e.key, 
    partKey = e.partKey, 
    message = new Message(bytes = encoder.toBytes(e.message))) // new Message時沒有指定key

構建完KeyedMessage以後返回對應的消息集合便可。異步

5.2 更新topic元數據信息
Kafka是如何刷新某些topic的元數據信息的呢?它會向任意一個broker發送TopicMetadataRequest請求(TopicMetadataRequest是惟一一個能發給任意broker的請求API),使用獲取的響應來更新連入broker的緩存。TopicMetadataRequest的響應信息包括對應topic的Leader、AR、ISR信息。
具體到代碼而言,BrokerPartitionInfo的updateInfo方法就是作這件事情的,這個方法代碼很少,咱們逐行分析下:
def updateInfo(topics: Set[String], correlationId: Int) {
    var topicsMetadata: Seq[TopicMetadata] = Nil // TopicMetadata = topic信息+ 一組PartitionMetadata (partitionId + leader + AR + ISR)
    val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId) //構造TopicMetadataRequest並隨機排列全部broker,而後從第一個broker開始嘗試發送請求。一旦成功就終止後面的請求發送嘗試。
    topicsMetadata = topicMetadataResponse.topicsMetadata //從response中取出zookeeper中保存的對應topic元數據信息
    // throw partition specific exception
    topicsMetadata.foreach(tmd =>{
      trace("Metadata for topic %s is %s".format(tmd.topic, tmd))
      if(tmd.errorCode == ErrorMapping.NoError) {
        topicPartitionInfo.put(tmd.topic, tmd) //更新到broker的topic元數據緩存中
      } else
        warn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))
      tmd.partitionsMetadata.foreach(pmd =>{
        if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {
          warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,
            ErrorMapping.exceptionFor(pmd.errorCode).getClass))
        } // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata
      })
    })
    producerPool.updateProducer(topicsMetadata)
  }

關於上面代碼中的最後一行, 咱們須要着重說一下。每一個producer應用程序都會保存一個producer池對象來緩存每一個broker上對應的同步producer實例。具體格式爲brokerId -> SyncProducer。SyncProducer表示一個同步producer,其主要的方法是send,支持兩種請求的發送:ProducerRequest和TopicMetadataRequest。前者是發送消息的請求,後者是更新topic元數據信息的請求。爲何須要這份緩存呢?咱們知道,每一個topic分區都應該有一個leader副本在某個broker上,而只有leader副本才能接收客戶端發來的讀寫消息請求。對producer而言,即只有這個leader副本所在的broker才能接收ProducerRequest請求。在發送消息時候,咱們會首先找出這個消息要發給哪一個topic,而後發送更新topic元數據請求給任意broker去獲取最新的元數據信息——這部分信息中比較重要的就是要獲取topic各個分區的leader副本都在哪些broker上,這樣咱們稍後會建立鏈接那些broker的阻塞通道(blocking channel)去實現真正的消息發送。Kafka目前的作法就是重建全部topic分區的leader副本所屬broker上對應的SyncProducer實例——雖然我以爲這樣實現有線沒有必要,只更新消息所屬分區的緩存信息應該就夠了(固然,這只是個人觀點,若是有不一樣意見歡迎拍磚)。如下是更新producer緩存的一些關鍵代碼:socket

val newBrokers = new collection.mutable.HashSet[Broker]
    topicMetadata.foreach(tmd => {    
      tmd.partitionsMetadata.foreach(pmd => {    
        if(pmd.leader.isDefined) //遍歷topic元數據信息中的每一個分區元數據實例,若是存在leader副本的,添加到newBrokers中以備後面更新緩存使用
          newBrokers+=(pmd.leader.get)
      })
    })
    lock synchronized {
      newBrokers.foreach(b => { //遍歷newBrokers中的每一個broker實例,若是在緩存中已經存在,直接關閉掉而後建立一個新的加入到緩存中;不然直接建立一個加入
        if(syncProducers.contains(b.id)){ 
          syncProducers(b.id).close()
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
        } else
          syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))
      })
    }

前面說了,若是隻發送一條消息的話,其實真正須要更新的分區leader副本所述broker對應的SyncProducer實例只有一個,但目前的代碼中會更新全部分區,不知道Java版本的producer是否也是這樣實現,這須要後面繼續調研!  async

5.3 發送消息
更新完topic元數據信息以後就該真正地發送消息了,這是由dispatchSerializedData方法來實現的。該方法接收一組KeyedMessage消息集合並返回發送失敗的消息集合。若是返回None天然表示發送成功。該方法主要的邏輯以下圖所示:

  

爲了更加直觀地說明上圖是如何完成消息發送的,咱們先對Kafka環境作一些基本的假設。假設咱們的Kafka環境有5個broker,ID分別爲0, 1, 2, 3, 4。咱們還定義了一個topic,名字是test-topic(其實名字不重要)。該topic有3個分區,分區ID分別是0, 1, 2,並假設每一個分區的leader replica都是存在的。如今假設leader與broker的對應關係假定以下:
Topic 分區 Leader副本所在的broker ID
test-topic P0 0
test-topic P1 1
test-topic P2 3

 

 

 

若是基於這樣的配置,假定咱們使用producer API一次性發送4條消息,分別是M1,M2, M3和M4。如今就能夠開始分析代碼了,首先從消息分組及整理開始:

5.3.1 partitionAndCollate方法
瞭解一個方法最簡單的方式就是學習它的輸入,分析它的輸出。該方法接收一組待發送的消息集合——用Scala表示的話就是Seq[KeyedMessage[K, Message]],在咱們的例子中很顯然這個集合中有4條消息。這個方法的輸出比較複雜,完整的寫法是:
Option[Map[Int, Map[TopicAndPartition, Seq[KeyedMessage[K, Message]]]]]
熟悉Scala語法的朋友可能會知道,這個返回值類型表示該方法可能會返回None——這表示producer代碼無法對你要發送的消息按照broker進行分組或在分組過程當中遇到了嚴重的錯誤,只能返回None由上層代碼來處理這種狀況。若是確實返回了值,這個值長的是什麼樣子呢?拿咱們的例子來講,假定每條消息去被髮送到的分區以下:(這裏的對應關係是假設的,其實在partitionAndCollate方法中會爲每條消息都分配它要去的分區!)
消息 要被髮送到的分區ID 該分區leader副本所在broker ID
M1 P0 0
M2 P0 0
M3 P1 1
M4 P2 3

 


 
  那麼這個方法返回的結果就是:
0 - > {test-topic + P0 -> {M1, M2}},
1 -> {test-topic + P1 -> {M3}},
2 -> {test-topic + P2 -> {M4}} 
}

 

該方法的效果就是將全部待發送的消息首先按照broker進行分組,而後再按照分區進行整理。

 

固然了,上面咱們假定了每條消息要去的分區,其實這也是在partitionAndCollate方法中被計算出來的。主要的邏輯是:

 

1. 首先判斷每條消息的分區key是否指定,若是指定了調用默認的分區類Partitioner的partition計算目標分區就是了。

 

2. 若是沒有指定key,就像默認使用console-producer的狀況,代碼會首先從緩存中判斷之前是否保存該topic的信息——即該topic下全部沒有key的消息默認會被髮送到同一個分區下。若是存在直接找出來就行了;不然隨機挑選一個返回並把它加入到緩存中,以下面代碼所示:
val index = Utils.abs(Random.nextInt) % availablePartitions.size // 隨機肯定broker id
val partitionId = availablePartitions(index).partitionId
sendPartitionPerTopicCache.put(topic, partitionId) // 加入緩存中以便後續使用
5.3.2 groupMessagesToSet方法
經過上一步中將待發送消息集合按照broker和topic分區進行分組,Kafka對要發送的消息進行了分區。該操做完成以後代碼就須要遍歷整理過的消息數據,獲取消息數據中每一個broker對應的分區消息映射,也就是相似於{test-topic + P0 -> {M1, M2}}這樣的數據。而後將每一個映射轉換爲這樣的格式:
{(topic + 分區,  ByteBufferMessageSet(message),  (topic + 分區, ByteBufferMessage(message) }。仍是以咱們的例子而言,通過groupMessageToSet以後,每一個broker對應的數據變爲:
{
(topic + P0, ByteBufferMessageSet(M1, M2)),
(topic + P1, ByteBufferMessageSet(M3)),
(topic + P2, ByteBufferMessageSet(M4)),
}
這個方法還考慮壓縮的狀況,即producer的屬性compression.codec中指定的壓縮策略。若是啓用了壓縮,追加寫當前日誌段的時候會先解壓縮消息再寫入(詳見Log.scala的append方法)。
 
5.3.3 send發送消息
這個方法基於上一步中構造的(topic+分區, ByteBufferMessageSet)元組構造ProducerRequest發送給對應的broker,並返回發送失敗的topic分區集合。具體的邏輯以下:
1. 判斷要發送到的broker id是否合法,若是小於0的話(一般是-1),說明消息要發送到的分區沒有leader。這種狀況下直接記錄一個警告信息並直接返回未發送的消息集合
2. 若是broker id是合法的,那麼還須要再判斷一下要發送的消息是否爲空,若是爲空天然也不須要作什麼,直接返回空集合就行了
3. 若是上一步中的確有要發送的消息,那麼就根據request.required.acks以及超時時間等配置構造一個ProducerRequest將消息封裝進這個請求中。
4. 獲取這個broker上的syncProducer——這個也是從producer池緩存中拿到的,若是池緩存中沒有的話也只是記錄爲一個警告,下次重試的時候刷新一下topic元數據信息就可以建立出來了。
5. 一旦拿到目標broker上的syncProducer,就可使用它來發送請求了,即調用syncProducer.send(producerRequest)
6. 請求被Kafka server處理以後(如何處理的下面會有詳細介紹)會發送一個對應的響應(response)給eventHandler。
7. 拿到response以後須要判斷一下response是否爲空。這其實還要看下request.required.acks的設置。當該值是默認值0時表示producer不須要等待broker的應答(acknowledgement),這能夠帶來最低的延遲但持久性也最差,由於若是一個broker宕機了有可能會丟失數據。若是該值是0, 那麼Kafka處理完ProducerRequest以後並不發送任何response。所以若發現response是空,那麼天然表示全部數據已經被髮送了,返回空集合表示沒有發送失敗的分區消息
8. 但假若request.required.acks是1(其實還有兩種狀況,好比分區數是0等——這裏不作討論),那麼就表示producer在leader副本得到數據後須要等待broker的應答。這個值的設置有更好的持久化效果。假設request.required.acks是1的話,那麼Kafka處理完請求後悔發送response,所以代碼還要繼續解析response中的數據以肯定到底有無失敗消息
9. 在開始解析response代碼以前,先來講說ProduceResponse的格式,以下圖所示:

  

 

response中比較重要的信息是topic下面多個分區對應的錯誤碼和消息待追加的第一條消息的位移。

 

所以,在拿到response以後,須要先判斷一下response中總的分區數是否和請求中的分區數同樣,若是不一樣的話說明在返回的response不完整,Kafka代碼會拋出異常。不然,就從response中找出那些有錯誤的分區(即錯誤碼不是NoError的)並返回。

 

至此,客戶端的producer程序就已經執行完畢了。可能有些人會感到奇怪?貌似消息只是以請求的方式被髮送到Kafka server上,但消息不是還要被寫入到日誌中嗎?這部分功能又是在哪裏作的呢? 下面咱們來看看Kafka server是如何處理ProducerRequest的?
 
6、 KafkaServer處理請求

 

Kafka server在啓動的時候會開啓N個線程來處理請求。其中N是由num.io.threads屬性指定,默認是8。Kafka推薦你設置該值至少是機器上磁盤數。在KafkaServer的startup方法中,如代碼所示:
def startup() {
    ...
    // 建立一個請求處理的線程池,在構造時就會開啓多個線程準備接收請求
    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) 
    ...
}

class KafkaRequestHandlerPool {
    ...
    for(i <- 0 until numThreads) {
        runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)
        threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
        threads(i).start() // 啓動每一個請求處理線程
    }
    ...
}

KafkaRequestHandler其實是一個Runnable,它的run核心方法中以while (true)的方式調用api.handle(request)不斷地接收請求處理,以下面的代碼所示:  

 

class KafkaRequestHandler... extends Runnable {
    ...
    def run() {
        ...
        while (true) {
            ...
            apis.handle(request) // 調用apis.handle等待請求處理
        }
        ...
    }
    ...    
}

在KafkaApis中handle的主要做用就是接收各類類型的請求。本文只關注ProducerRequest請求:  

 
def handle(request: RequestChannel.Request) {
    ...
    request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request) // 若是接收到ProducerRequest交由handleProducerOrOffsetCommitRequest處理
        case ...
    }
    ...
}

如此看來,核心的方法就是handleProducerOrOffsetCommitRequest了。這個方法之因此叫這個名字,是由於它同時能夠處理ProducerRequest和OffsetCommitRequest兩種請求,後者其實也是一種特殊的ProducerRequest。從Kafka 0.8.2以後kafka使用一個特殊的topic來保存提交位移(commit offset)。這個topic名字是__consumer_offsets。本文中咱們關注的是真正的ProducerRequest。下面來看看這個方法的邏輯,以下圖所示:

總體邏輯看上去很是簡單,以下面的代碼所示:

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    ...
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) // 將消息追加寫入本地提交日誌
    val numPartitionsInError = localProduceResults.count(_.error.isDefined) // 計算是否存在發送失敗的分區
    if(produceRequest.requiredAcks == 0) { // request.required.acks = 0時的代碼路徑
      if (numPartitionsInError != 0) {
        info(("Send the close connection response due to error handling produce request " +
          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
        requestChannel.closeConnection(request.processor, request) // 關閉底層Socket以告知客戶端程序有發送失敗的狀況
      } else {
        ...
      }
    } else if (produceRequest.requiredAcks == 1 || // request.required.acks = 0時的代碼路徑,固然還有其餘兩個條件
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {
      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetMetadataMaxSize))
                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))
      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) // 發送response給客戶端
    } else { //  request.required.acks = -1時的代碼路徑
      // create a list of (topic, partition) pairs to use as keys for this delayed request
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(...) // 此時須要構造延時請求進行處理,此段邏輯比較複雜,須要理解Purgatory的概念,本文暫不考慮
        ...
}

由上面代碼可見,不管request.required.acks是何值,都須要首先將待發送的消息集合追加寫入本地的提交日誌中。此時如何按照默認值是是0的狀況,那麼這寫入日誌後須要判斷下全部消息是否都已經發送成功了。若是出現了發送錯誤,那麼就將關閉連入broker的Socket Server以通知客戶端程序錯誤的發生。如今的關鍵是追加寫是如何完成的?即方法appendToLocalLog如何實現的?該方法總體邏輯流程圖以下圖所示:

  

因爲邏輯很直觀,不對代碼作詳細分析,不過值得關注的是這個方法會捕獲不少異常:

異常名稱 具體含義 異常處理
KafakStorageException 這多是不可恢復的IO錯誤 既然沒法恢復,則終止該broker上JVM進程
InvalidTopicException 顯式給__consumer_offsets topic發送消息就會有這個異常拋出,不要這麼作,由於這是內部topic 將InvalidTopicException封裝進ProduceResult返回
UnknownTopicOrPartitionException topic或分區不在該broker上時拋出該異常 將UnknownTopicOrPartitionException封裝進ProduceResult返回
NotLeaderForPartitionException 目標分區的leader副本不在該broker上 將NotLeaderForPartitionException封裝進ProduceResult返回
NotEnoughReplicasException 只會出如今request.required.acks=-1且ISR中的副本數不知足min.insync.replicas指定的最少副本數時會拋出該異常 將NotEnoughReplicasException封裝進ProduceResult返回
其餘 處理ProducerRequest時發生的其餘異常 將對應異常封裝進ProduceResult返回

okay,貌似如今咱們就剩下最後一個主要的方法沒說了。分析完這個方法以後整個producer發送消息的流程應該就算是完整地走完了。最後的這個方法就是Partition的appendMessagesToLeader,其主要代碼以下:

def appendMessagesToLeader(messages: ByteBufferMessageSet, requiredAcks: Int=0) = {
    inReadLock(leaderIsrUpdateLock) {
      val leaderReplicaOpt = leaderReplicaIfLocal() // 判斷目標分區的leader副本是否在該broker上
      leaderReplicaOpt match {
        case Some(leaderReplica) => // 若是leader副本在該broker上
          val log = leaderReplica.log.get // 獲取本地提交日誌文件句柄
          val minIsr = log.config.minInSyncReplicas
          val inSyncSize = inSyncReplicas.size

          // Avoid writing to leader if there are not enough insync replicas to make it safe
          if (inSyncSize < minIsr && requiredAcks == -1) { //只有request.required.acks等於-1時纔會判斷ISR數是否不足
            throw new NotEnoughReplicasException("Number of insync replicas for partition [%s,%d] is [%d], below required minimum [%d]"
              .format(topic,partitionId,minIsr,inSyncSize))
          }
          val info = log.append(messages, assignOffsets = true) // 真正的寫日誌操做,因爲涉及Kafka底層寫日誌的,之後有機會寫篇文章專門探討這部分功能
          // probably unblock some follower fetch requests since log end offset has been updated
          replicaManager.unblockDelayedFetchRequests(new TopicAndPartition(this.topic, this.partitionId))
          // we may need to increment high watermark since ISR could be down to 1
          maybeIncrementLeaderHW(leaderReplica) 
          info
        case None => // 若是不在,直接拋出異常代表leader不在該broker上
          throw new NotLeaderForPartitionException("Leader not local for partition [%s,%d] on broker %d"
            .format(topic, partitionId, localBrokerId))
      }
    }

至此,一個最簡單的scala版同步producer的代碼走讀就算正式完成了,能夠發現Kafka設計的思路就是在每一個broker上啓動一個server不斷地處理從客戶端發來的各類請求,完成對應的功能並按需返回對應的response。但願本文能對但願瞭解Kafka producer機制的人有所幫助。

相關文章
相關標籤/搜索