這個問題的答案是: DefaultEventHandler會把發給它的一個batch的消息(其實是Seq[KeyedMessage[K,V]]類型)拆開,肯定每條消息該發送給哪一個broker。對發給每一個broker的消息,會按topic和partition來組合。即:拆包=>根據metaData組裝java
這個功能是經過partitionAndCollate方法實現的緩存
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]
它返回一個Option對象,這個Option的元素是一個Map,Key是brokerId,value是發給這個broker的消息。對每一條消息,先肯定它要被髮給哪個topic的哪一個parition。而後肯定這個parition的leader broker,而後去Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]這個Map裏找到對應的broker,而後把這條消息填充給對應的topic+partition對應的Seq[KeyedMessage[K,Message]]。這樣就獲得了最後的結果。這個結果表示了哪些消息要以怎樣的結構發給一個broker。真正發送的時候,會按照brokerId的不一樣,把打包好的消息發給不一樣的broker。app
首先,看一下kafka protocol裏對於Producer Request結構的說明:dom
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
發給一個broker的消息就是這樣的結構。async
同時,在kafka wiki裏對於Produce API 有以下說明:ide
The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.函數
即在一個produce request裏,能夠同時發消息給多個topic+partition的組合。固然一個produce request是發給一個broker的。工具
使用post
send(brokerid, messageSetPerBroker)
把消息set發給對應的brokerid。ui
首先看下KeyedMessage類的定義:
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) def this(topic: String, key: K, message: V) = this(topic, key, key, message) def partitionKey = { if(partKey != null) partKey else if(hasKey) key else null } def hasKey = key != null }
當使用三個參數的構造函數時, partKey會等於key。partKey是用來作partition的,但它不會最當成消息的一部分被存儲。
前邊提到了,在肯定一個消息應該發給哪一個broker以前,要先肯定它發給哪一個partition,這樣才能根據paritionId去找到對應的leader所在的broker。
val topicPartitionsList = getPartitionListForTopic(message) //獲取這個消息發送給的topic的partition信息
val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)//肯定這個消息發給哪一個partition
注意傳給getPartition方法中時使用的是partKey。getPartition方法爲:
private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = { val numPartitions = topicPartitionList.size if(numPartitions <= 0) throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist") val partition = if(key == null) { // If the key is null, we don't really need a partitioner // So we look up in the send partition cache for the topic to decide the target partition val id = sendPartitionPerTopicCache.get(topic) id match { case Some(partitionId) => // directly return the partitionId without checking availability of the leader, // since we want to postpone the failure until the send operation anyways partitionId case None => val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId } } else partitioner.partition(key, numPartitions)
當partKey爲null時,首先它從sendParitionPerTopicCache裏取這個topic緩存的partitionId,這個cache是一個Map.若是以前己經使用sendPartitionPerTopicCache.put(topic, partitionId)緩存了一個,就直接取出它。不然就隨機從可用的partitionId裏取出一個,把它緩存到sendParitionPerTopicCache。這就使得當sendParitionPerTopicCache裏有一個可用的partitionId時,不少消息都會被髮送給這同一個partition。所以若全部消息的partKey都爲空,在一段時間內只會有一個partition能收到消息。之因此會說「一段」時間,而不是永久,是由於handler隔一段時間會從新獲取它發送過的消息對應的topic的metadata,這個參數經過topic.metadata.refresh.interval.ms來設置。當它從新獲取metadata以後,會消空一些緩存,就包括這個sendParitionPerTopicCache。所以,接下來就會生成另外一個隨機的被緩存的partitionId。
if (topicMetadataRefreshInterval >= 0 && SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) { //若該refresh topic metadata 了,do the refresh Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement)) sendPartitionPerTopicCache.clear() topicMetadataToRefresh.clear lastTopicMetadataRefreshTime = SystemTime.milliseconds }
當partKey不爲null時,就用傳給handler的partitioner的partition方法,根據partKey和numPartitions來肯定這個消息被髮給哪一個partition。注意這裏的numPartition是topicPartitionList.size獲取的,有可能會有parition不存在可用的leader。這樣的問題將留給send時解決。實際上發生這種狀況時,partitionAndCollate會將這個消息分派給brokerId爲-1的broker。而send方法會在發送前判斷brokerId
if(brokerId < 0) { warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(","))) messagesPerTopic.keys.toSeq
當brokerId<0時,就返回一個非空的Seq,包括了全部沒有leader的topic+partition的組合,若是重試了指定次數還不能發送,將最終致使handle方法拋出一個 FailedToSendMessageException異常。
這個是在
private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]])
中處理。
說明爲:
/** enforce the compressed.topics config here.
* If the compression codec is anything other than NoCompressionCodec,
* Enable compression only for specified topics if any
* If the list of compressed topics is empty, then enable the specified compression codec for all topics
* If the compression codec is NoCompressionCodec, compression is disabled for all topics
*/
即,若是沒有設置壓縮,就全部topic對應的消息集都不壓縮。若是設置了壓縮,而且沒有設置對個別topic啓用壓縮,就對全部topic都使用壓縮;不然就只對設置了壓縮的topic壓縮。
在這個gruopMessageToSet中,並不有具體的壓縮邏輯。而是返回一個ByteBufferMessageSet對象。它的註釋爲:
/**
* A sequence of messages stored in a byte buffer
*
* There are two ways to create a ByteBufferMessageSet
*
* Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
*
* Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.
看來它是對於消息集進行序列化和反序列化的工具。
在它的實現裏用到了CompressionFactory對象。從它的實現裏能夠看到Kafka只支持GZIP和Snappy兩種壓縮方式。
compressionCodec match { case DefaultCompressionCodec => new GZIPOutputStream(stream) case GZIPCompressionCodec => new GZIPOutputStream(stream) case SnappyCompressionCodec => import org.xerial.snappy.SnappyOutputStream new SnappyOutputStream(stream) case _ => throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)