1、java
一、原理圖shell
二、原理描述markdown
一個topic 能夠配置幾個partition,produce發送的消息分發到不一樣的partition中,consumer接受數據的時候是按照group來接受,kafka確保每一個partition只能同一個group中的同一個consumer消費,若是想要重複消費,那麼須要其餘的組來消費。Zookeerper中保存這每一個topic下的每一個partition在每一個group中消費的offset
新版kafka把這個offsert保存到了一個__consumer_offsert的topic下
這個__consumer_offsert 有50個分區,經過將group的id哈希值%50的值來肯定要保存到那一個分區. 這樣也是爲了考慮到zookeeper不擅長大量讀寫的緣由。
因此,若是要一個group用幾個consumer來同時讀取的話,須要多線程來讀取,一個線程至關於一個consumer實例。當consumer的數量大於分區的數量的時候,有的consumer線程會讀取不到數據。
假設一個topic test 被groupA消費了,如今啓動另一個新的groupB來消費test,默認test-groupB的offset不是0,而是沒有新創建,除非當test有數據的時候,groupB會收到該數據,該條數據也是第一條數據,groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始數據 多線程
三、查看topic-group的offsert 負載均衡
位置:zookeeper
路徑:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions
在zookeeper的topic中有一個特殊的topic __consumer_offserts
計算方法:(放入哪一個partitions)ide
int hashCode = Math.abs("ttt".hashCode());工具
int partition = hashCode % 50;oop
先計算group的hashCode,再除以分區數(50),能夠獲得partition的值 post
使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"性能
4.參數
auto.offset.reset:默認值爲largest,表明最新的消息,smallest表明從最先的消息開始讀取,當consumer剛開始建立的時候沒有offset這種狀況,若是設置了largest,則爲當收到最新的一條消息的時候開始記錄offsert,若設置爲smalert,那麼會從頭開始讀partition
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import
kafka.producer.Partitioner;
import
kafka.utils.VerifiableProperties;
public
class
JasonPartitioner<T>
implements
Partitioner {
public
JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public
int
partition(Object key,
int
numPartitions) {
try
{
int
partitionNum = Integer.parseInt((String) key);
return
Math.abs(Integer.parseInt((String) key) % numPartitions);
}
catch
(Exception e) {
return
Math.abs(key.hashCode() % numPartitions);
}
}
}
|
若是將上例中的類做爲partition.class,並經過以下代碼發送20條消息(key分別爲0,1,2,3)至topic3(包含4個Partition)。
1
2
3
4
5
6
7
8
9
10
|
public
void
sendMessage()
throws
InterruptedException{
for
(
int
i =
1
; i <=
5
; i++){
List messageList =
new
ArrayList<KeyedMessage<String, String>>();
for
(
int
j =
0
; j <
4
; j++){
messageList.add(
new
KeyedMessage<String, String>(
"topic2"
, j+
""
,
"The "
+ i +
" message for key "
+ j));
}
producer.send(messageList);
}
producer.close();
}
|
則key相同的消息會被髮送並存儲到同一個partition裏,並且key的序號正好和Partition序號相同。(Partition序號從0開始,本例中的key也從0開始)。下圖所示是經過Java程序調用Consumer後打印出的消息列表。
四、consumer group (本節全部描述都是基於Consumer hight level API而非low level API)。
使用Consumer high level API時,同一Topic的一條消息只能被同一個Consumer Group內的一個Consumer消費,但多個Consumer Group可同時消費這一消息。
這是Kafka用來實現一個Topic消息的廣播(發給全部的Consumer)和單播(發給某一個Consumer)的手段。一個Topic能夠對應多個Consumer Group。若是須要實現廣播,只要每一個Consumer有一個獨立的Group就能夠了。要實現單播只要全部的Consumer在同一個Group裏。用Consumer Group還能夠將Consumer進行自由的分組而不須要屢次發送消息到不一樣的Topic。
實際上,Kafka的設計理念之一就是同時提供離線處理和實時處理。根據這一特性,可使用Storm這種實時流處理系統對消息進行實時在線處理,同時使用Hadoop這種批處理系統進行離線處理,還能夠同時將數據實時備份到另外一個數據中心,只須要保證這三個操做所使用的Consumer屬於不一樣的Consumer Group便可。
下面這個例子更清晰地展現了Kafka Consumer Group的特性。首先建立一個Topic (名爲topic1,包含3個Partition),而後建立一個屬於group1的Consumer實例,並建立三個屬於group2的Consumer實例,最後經過Producer向topic1發送key分別爲1,2,3的消息。結果發現屬於group1的Consumer收到了全部的這三條消息,同時group2中的3個Consumer分別收到了key爲1,2,3的消息。