key爲null時Kafka會將消息發送給哪一個分區?

當你編寫kafka Producer時, 會生成KeyedMessage對象。程序員

KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

這裏的key值能夠爲空,在這種狀況下, kafka會將這個消息發送到哪一個分區上呢?依據Kafka官方的文檔, 默認的分區類會隨機挑選一個分區:緩存

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.服務器

可是這句話至關的誤導人。

從字面上來說,這句話沒有問題, 可是這裏的隨機是指在參數"topic.metadata.refresh.ms"刷新後隨機選擇一個, 這個時間段內老是使用惟一的分區。 默認狀況下每十分鐘纔可能從新選擇一個新的分區。 可是相信大部分的程序員和我同樣, 都理解成每一個消息都會隨機選擇一個分區。
能夠查看相關的代碼:dom

private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
    val numPartitions = topicPartitionList.size    if(numPartitions <= 0)
      throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
    val partition =
      if(key == null) {
        // If the key is null, we don't really need a partitioner
        // So we look up in the send partition cache for the topic to decide the target partition
        val id = sendPartitionPerTopicCache.get(topic)
        id match {
          case Some(partitionId) =>
            // directly return the partitionId without checking availability of the leader,
            // since we want to postpone the failure until the send operation anyways
            partitionId          case None => 
            val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
            if (availablePartitions.isEmpty)
              throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
            val index = Utils.abs(Random.nextInt) % availablePartitions.size
            val partitionId = availablePartitions(index).partitionId
            sendPartitionPerTopicCache.put(topic, partitionId)
            partitionId        }
      } else
        partitioner.partition(key, numPartitions)
    if(partition < 0 || partition >= numPartitions)
      throw new UnknownTopicOrPartitionException("Invalid partition id: " + partition + " for topic " + topic +
        "; Valid values are in the inclusive range of [0, " + (numPartitions-1) + "]")
    trace("Assigning message of topic %s and key %s to a selected partition %d".format(topic, if (key == null) "[none]" else key.toString, partition))
    partition  }

若是key爲null, 它會從sendPartitionPerTopicCache查選緩存的分區, 若是沒有,隨機選擇一個分區,不然就用緩存的分區。socket

LinkedIn工程師Guozhang Wang在郵件列表中解釋了這一問題,
最初kafka是按照大部分用戶理解的那樣每次都隨機選擇一個分區, 後來改爲了按期選擇一個分區, 這是爲了減小服務器段socket的數量。不過這的確很誤導用戶,據稱0.8.2版本後又改回了每次隨機選取。可是我查看0.8.2的代碼還沒看到改動。ide

因此,若是有可能,仍是爲KeyedMessage設置一個key值吧。post

當你編寫kafka Producer時, 會生成KeyedMessage對象。this

KeyedMessage<K, V> keyedMessage = new KeyedMessage<>(topicName, key, message)

這裏的key值能夠爲空,在這種狀況下, kafka會將這個消息發送到哪一個分區上呢?依據Kafka官方的文檔, 默認的分區類會隨機挑選一個分區:code

The third property  "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.orm

可是這句話至關的誤導人。

相關文章
相關標籤/搜索