Kafka 之 async producer (2) kafka.producer.async.DefaultEventHandler

上次留下來的問題

  1. 若是消息是發給不少不一樣的topic的, async producer如何在按batch發送的同時區分topic的
  2. 它是如何用key來作partition的?
  3. 是如何實現對消息成批量的壓縮的?
  • async producer如何在按batch發送的同時區分topic的

  這個問題的答案是: DefaultEventHandler會把發給它的一個batch的消息(其實是Seq[KeyedMessage[K,V]]類型)拆開,肯定每條消息該發送給哪一個broker。對發給每一個broker的消息,會按topic和partition來組合。即:拆包=>根據metaData組裝java

這個功能是經過partitionAndCollate方法實現的緩存

def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]

  它返回一個Option對象,這個Option的元素是一個Map,Key是brokerId,value是發給這個broker的消息。對每一條消息,先肯定它要被髮給哪個topic的哪一個parition。而後肯定這個parition的leader broker,而後去Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]這個Map裏找到對應的broker,而後把這條消息填充給對應的topic+partition對應的Seq[KeyedMessage[K,Message]]。這樣就獲得了最後的結果。這個結果表示了哪些消息要以怎樣的結構發給一個broker。真正發送的時候,會按照brokerId的不一樣,把打包好的消息發給不一樣的broker。app

首先,看一下kafka protocol裏對於Producer Request結構的說明:dom

ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
   RequiredAcks => int16
   Timeout => int32
   Partition => int32
   MessageSetSize => int32

發給一個broker的消息就是這樣的結構。async

同時,在kafka wiki裏對於Produce API 有以下說明:ide

The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.函數

即在一個produce request裏,能夠同時發消息給多個topic+partition的組合。固然一個produce request是發給一個broker的。工具

使用post

send(brokerid, messageSetPerBroker)

  把消息set發給對應的brokerid。ui

  • 它是如何用key來作partition的?

首先看下KeyedMessage類的定義:

case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.") 
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }
  def hasKey = key != null
}

  當使用三個參數的構造函數時, partKey會等於key。partKey是用來作partition的,但它不會最當成消息的一部分被存儲。

前邊提到了,在肯定一個消息應該發給哪一個broker以前,要先肯定它發給哪一個partition,這樣才能根據paritionId去找到對應的leader所在的broker。

val topicPartitionsList = getPartitionListForTopic(message) //獲取這個消息發送給的topic的partition信息
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)//肯定這個消息發給哪一個partition

  注意傳給getPartition方法中時使用的是partKey。getPartition方法爲:

  private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size
    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId
          case None =>
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId
        }
      } else
        partitioner.partition(key, numPartitions)

  當partKey爲null時,首先它從sendParitionPerTopicCache裏取這個topic緩存的partitionId,這個cache是一個Map.若是以前己經使用sendPartitionPerTopicCache.put(topic, partitionId)緩存了一個,就直接取出它。不然就隨機從可用的partitionId裏取出一個,把它緩存到sendParitionPerTopicCache。這就使得當sendParitionPerTopicCache裏有一個可用的partitionId時,不少消息都會被髮送給這同一個partition。所以若全部消息的partKey都爲空,在一段時間內只會有一個partition能收到消息。之因此會說「一段」時間,而不是永久,是由於handler隔一段時間會從新獲取它發送過的消息對應的topic的metadata,這個參數經過topic.metadata.refresh.interval.ms來設置。當它從新獲取metadata以後,會消空一些緩存,就包括這個sendParitionPerTopicCache。所以,接下來就會生成另外一個隨機的被緩存的partitionId。

  if (topicMetadataRefreshInterval >= 0 && 
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  //若該refresh topic metadata 了,do the refresh
        Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }

  當partKey不爲null時,就用傳給handler的partitioner的partition方法,根據partKey和numPartitions來肯定這個消息被髮給哪一個partition。注意這裏的numPartition是topicPartitionList.size獲取的,有可能會有parition不存在可用的leader。這樣的問題將留給send時解決。實際上發生這種狀況時,partitionAndCollate會將這個消息分派給brokerId爲-1的broker。而send方法會在發送前判斷brokerId

    if(brokerId < 0) {
      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
      messagesPerTopic.keys.toSeq

  當brokerId<0時,就返回一個非空的Seq,包括了全部沒有leader的topic+partition的組合,若是重試了指定次數還不能發送,將最終致使handle方法拋出一個 FailedToSendMessageException異常。

  • 是如何實現對消息成批量的壓縮的?

這個是在

private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]])

中處理。

說明爲:

/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/

即,若是沒有設置壓縮,就全部topic對應的消息集都不壓縮。若是設置了壓縮,而且沒有設置對個別topic啓用壓縮,就對全部topic都使用壓縮;不然就只對設置了壓縮的topic壓縮。

在這個gruopMessageToSet中,並不有具體的壓縮邏輯。而是返回一個ByteBufferMessageSet對象。它的註釋爲:

/**
* A sequence of messages stored in a byte buffer
*
* There are two ways to create a ByteBufferMessageSet
*
* Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
*
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.

 看來它是對於消息集進行序列化和反序列化的工具。

在它的實現裏用到了CompressionFactory對象。從它的實現裏能夠看到Kafka只支持GZIP和Snappy兩種壓縮方式。

compressionCodec match {
      case DefaultCompressionCodec => new GZIPOutputStream(stream)
      case GZIPCompressionCodec => new GZIPOutputStream(stream)
      case SnappyCompressionCodec => 
        import org.xerial.snappy.SnappyOutputStream
        new SnappyOutputStream(stream)
      case _ =>
        throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
相關文章
相關標籤/搜索