kafka producer 中partition 使用方式

     爲了更好的實現負載均衡和消息的順序性,kafka的producer在分發消息時能夠經過分發策略發送給指定的partition。實現分發的程序是須要制定消息的key值,而kafka經過key進行策略分發。 java

     爲了更好的弄清楚相關原理,咱們從kafka自己提供的分發函數分析:
     源代碼以下:     負載均衡

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
 
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

       上述類對key進行了模版封裝,所以key 能夠提供Int,String等類型。
       其中numPartitions是來自ZKBrokerPartitionInfo生成的數據,具體代碼是: dom

val numPartitions = brokerList.map(bid => ZkUtils.readData(zkClient, brokerTopicPath + "/" + bid).toInt)
      (Tips:從上面能夠看到,咱們能夠更多的擴展分區信息,多多利用zookeeper提供的信息,好比sortedBrokerPartitions等等)


        不少時候咱們可能要本身實現一個分區函數,具體的使用方式就是:
       
函數

private Properties props = new Properties();

...

 props.put("partitioner.class","***/***/TestPartition");//必定要寫對路徑和partitioner.class

      具體的實現代碼就是改裝自DefaultPartitioner的java實現方式,一併貼上: spa

public class TestPartition implements Partitioner<String>{

	public int partition(String key,int numPartitions)
	{
		//System.out.println("Fuck!!!!");
		System.out.print("partitions number is "+numPartitions+"   ");
		if (key == null) {
			Random random = new Random();
			System.out.println("key is null ");
			return random.nextInt(numPartitions);
		}
		else {
			int result = Math.abs(key.hashCode())%numPartitions; //很奇怪,
                     //hashCode 會生成負數,奇葩,因此加絕對值
			System.out.println("key is "+ key+ " partitions is "+ result);
			return result;
		}
	}
} 
    而發送消息使用方式:

List<String> messages = new java.util.ArrayList<String>();
      String messageString = "test-message"+Integer.toString(messageNo);
      messages.add(messageString);
      //producer.send(new ProducerData<String, String>(topic,"test_key", messageStr));
      ProducerData<String, String> data = new ProducerData<String, String>(topic, messageString, messages);
      producer.send(data);
    kafka官方文檔中直接使用
ProducerData<String, String> data = new ProducerData<String, String>(topic, 「xxx」, "XXX");
      producer.send(data);
    可是我沒有實現,第三個參數用String會報錯。
相關文章
相關標籤/搜索