kafka partition(分區)與 group

1、java

一、原理圖shell

二、原理描述markdown

一個topic 能夠配置幾個partition,produce發送的消息分發到不一樣的partition中,consumer接受數據的時候是按照group來接受,kafka確保每一個partition只能同一個group中的同一個consumer消費,若是想要重複消費,那麼須要其餘的組來消費。Zookeerper中保存這每一個topic下的每一個partition在每一個group中消費的offset 
新版kafka把這個offsert保存到了一個__consumer_offsert的topic下 
這個__consumer_offsert 有50個分區,經過將group的id哈希值%50的值來肯定要保存到那一個分區.  這樣也是爲了考慮到zookeeper不擅長大量讀寫的緣由。
因此,若是要一個group用幾個consumer來同時讀取的話,須要多線程來讀取,一個線程至關於一個consumer實例。當consumer的數量大於分區的數量的時候,有的consumer線程會讀取不到數據。 
假設一個topic test 被groupA消費了,如今啓動另一個新的groupB來消費test,默認test-groupB的offset不是0,而是沒有新創建,除非當test有數據的時候,groupB會收到該數據,該條數據也是第一條數據,groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始數據 多線程

三、查看topic-group的offsert 負載均衡

位置:zookeeper 
路徑:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions 
在zookeeper的topic中有一個特殊的topic __consumer_offserts 
計算方法:(放入哪一個partitions)ide

int hashCode = Math.abs("ttt".hashCode());工具

int partition = hashCode % 50;oop

先計算group的hashCode,再除以分區數(50),能夠獲得partition的值 post

使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"性能

4.參數 
auto.offset.reset:默認值爲largest,表明最新的消息,smallest表明從最先的消息開始讀取,當consumer剛開始建立的時候沒有offset這種狀況,若是設置了largest,則爲當收到最新的一條消息的時候開始記錄offsert,若設置爲smalert,那麼會從頭開始讀partition

 
2、
一、Topic
     Topic在邏輯上能夠被認爲是一個queue,每條消費都必須指定它的Topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠線性提升,物理上把Topic分紅一個或多個Partition,每一個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的全部消息和索引文件。若建立topic1和topic2兩個topic,且分別有13個和19個分區,則整個集羣上會相應會生成共32個文件夾(本文所用集羣共8個節點,此處topic1和topic2 replication-factor均爲1),以下圖所示。
二、對於傳統的message queue而言,通常會刪除已經被消費的消息,而Kafka集羣會保留全部的消息,不管其被消費與否。固然,由於磁盤限制,不可能永久保留全部數據(實際上也不必),
     所以Kafka提供兩種策略刪除舊數據。一是基於時間,二是基於Partition文件大小。
     例如能夠經過配置$KAFKA_HOME/config/server.properties,讓Kafka刪除一週前的數據,也可在Partition文件超過1GB時刪除舊數據,配置以下所示。
   這裏要注意,由於Kafka讀取特定消息的時間複雜度爲O(1),即與文件大小無關,因此這裏刪除過時文件與提升Kafka性能無關。選擇怎樣的刪除策略只與磁盤以及具體的需求有關。另外,Kafka會爲每個Consumer Group保留一些metadata信息——當前消費的消息的position,也即offset。這個offset由Consumer控制。正常狀況下Consumer會在消費完一條消息後遞增該offset。固然,Consumer也可將offset設成一個較小的值,從新消費一些消息。由於offet由Consumer控制,因此Kafka broker是無狀態的,它不須要標記哪些消息被哪些消費過,也不須要經過broker去保證同一個Consumer Group只有一個Consumer能消費某一條消息,所以也就不須要鎖機制,這也爲Kafka的高吞吐率提供了有力保障。
 
 三、producer
Producer發送消息到broker時,會根據Paritition機制選擇將其存儲到哪個Partition。若是Partition機制設置合理,全部消息能夠均勻分佈到不一樣的Partition裏,這樣就實現了負載均衡。若是一個Topic對應一個文件,那這個文件所在的機器I/O將會成爲這個Topic的性能瓶頸,而有了Partition後,不一樣的消息能夠並行寫入不一樣broker的不一樣Partition裏,極大的提升了吞吐率。能夠在$KAFKA_HOME/config/server.properties中經過配置項num.partitions來指定新建Topic的默認Partition數量,也可在建立Topic時經過參數指定,同時也能夠在Topic建立以後經過Kafka提供的工具修改。
 
在發送一條消息時,能夠指定這條消息的key,Producer根據這個key和Partition機制來判斷應該將這條消息發送到哪一個Parition。Paritition機制能夠經過指定Producer的paritition. class這一參數來指定,該class必須實現kafka.producer.Partitioner接口。本例中若是key能夠被解析爲整數則將對應的整數與Partition總數取餘,該消息會被髮送到該數對應的Partition。(每一個Parition都會有個序號,序號從0開始)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import  kafka.producer.Partitioner;
import  kafka.utils.VerifiableProperties;
 
public  class  JasonPartitioner<T>  implements  Partitioner {
 
     public  JasonPartitioner(VerifiableProperties verifiableProperties) {}
 
     @Override
     public  int  partition(Object key,  int  numPartitions) {
         try  {
             int  partitionNum = Integer.parseInt((String) key);
             return  Math.abs(Integer.parseInt((String) key) % numPartitions);
         catch  (Exception e) {
             return  Math.abs(key.hashCode() % numPartitions);
         }
     }
}

  若是將上例中的類做爲partition.class,並經過以下代碼發送20條消息(key分別爲0,1,2,3)至topic3(包含4個Partition)。

1
2
3
4
5
6
7
8
9
10
public  void  sendMessage()  throws  InterruptedException{
   for ( int  i =  1 ; i <=  5 ; i++){
        List messageList =  new  ArrayList<KeyedMessage<String, String>>();
         for ( int  j =  0 ; j <  4 ; j++){
            messageList.add( new  KeyedMessage<String, String>( "topic2" , j+ "" "The "  + i +  " message for key "  + j));
        }
        producer.send(messageList);
     }
  producer.close();
}

  則key相同的消息會被髮送並存儲到同一個partition裏,並且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是經過Java程序調用Consumer後打印出的消息列表。

四、consumer group   (本節全部描述都是基於Consumer hight level API而非low level API)。

     使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。

這是Kafka用來實現一個Topic消息的廣播(發給全部的Consumer)和單播(發給某一個Consumer)的手段。一個Topic能夠對應多個Consumer Group。若是須要實現廣播,只要每一個Consumer有一個獨立的Group就能夠了。要實現單播只要全部的Consumer在同一個Group裏。用Consumer Group還能夠將Consumer進行自由的分組而不須要屢次發送消息到不一樣的Topic。

實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer屬於不一樣的Consumer Group便可。

下面這個例子更清晰地展現了Kafka Consumer Group的特性。首先建立一個Topic (名爲topic1,包含3個Partition),而後建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,最後經過Producer向topic1發送key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時group2中的3個Consumer分別收到了key爲1,2,3的消息。

相關文章
相關標籤/搜索