一、消費消息shell
kafka 的消費者以pull的方式獲取消息,同時kafka採用消費組的模式,每一個消費組都屬於某一個消費組(在建立消費者時若不指定消費組,則屬於默認消費組),消費組是一個全局概念,所以設置group.id時,要確保該值在kafka集羣中惟一。bootstrap
對於同一條消息,只能被同一個組下某一個消費者消費,但不一樣消費組的消費者能消費同一條消息。工具
1):舊版高級消費者性能
啓動消費者 kafka-console-consumer.sh 腳本時,若指定zookeeper參數,則調用舊版高級消費者。測試
kafka-console-consumer.sh --zookeepe localhost:2191 --topic kafka-action --consumer-property group.id=old-consumer-test --consumer-property consumer.id=old-consumer-id --from-beginning --delete-consumer-offsetsfetch
zookeeper 參數用於指定鏈接kafka的zookeeper地址設置。線程
topic 參數用於指定消費者消費的主題。代理
consumer-property 參數後面以鍵值對的形式指定消費者級別的配置。orm
form-beginning 參數設置從消息起始位置開始消費,默認是從最新的位置(latest)開始消費。舊版本不支持--offset參數,因此不能指定從任意偏移量開始消費。server
delete-consumer-offset 參數用於刪除在 zookeeper 中記錄的已消費的偏移量。若在一個已存在消費者的消費組中新增一個消費者,這個消費者若指定了 from-beginning 參數,則必須指定該參數,以刪除其餘消費者在 zookeeper 中記錄的已被消費的最大偏移量。
舊版本的消費者默認將消費偏移量保存在zookeeper中,若設置 offset.storage=zookeeper則將偏移量保存到zookeeper中;若這是offset.storage=kafka則將偏移量保存到kafka內部主題中,可是dual.commit.enabled=true時同時將偏移量保存在zookeeper中。
每一個消費者被建立時都會向 zookeeper 中註冊相應的元數據信息,若該消費者所屬的消費組在zookeeper中不存在,則首先在/consumer 目錄下建立一個名爲${group.id}的節點,即消費組節點,並建立名爲ids、owners、offsets的3個子節點。
ids:記錄該消費組下正在運行的消費組列表,一個新的消費者被建立時會在 zookeeper 中與之對應的消費組節點的ids子節點下注冊一個臨時節點,該臨時節點的名稱爲${group.id}_${consumer.id}。
owners:記錄該消費組消費的主題列表,owners節點的子節點中記錄該消費組所消費的主題列表以及每一個主題的每一個分區對應的消費者線程。
offsets:記錄該消費組下每一個消費者所消費主題的各個分區的偏移量,若在啓動消費者時指定了offset.storage=kafka,則偏移量會保存到kafka內部主題中,就不會有該節點。
2):舊版低級消費者
kafka-simple-consumer-shell.sh 腳本用於調用kafka的低級消費者。
低級消費者須要本身管理消費偏移量,同時只能消費某個主題的某個分區的消息;
kafka-simple-consumer-shell.sh --broker-list localhost:9092 --clientId simple-consumer-test --offset -1 --partition 0 --topic kafka-action
以上該命令啓動了一個低級消費者,從主題爲 kafka-action 的編號爲0的分區消費消息,--offset指定消費的起始位置,該參數支持任意非負整數和-一、-2兩個負整數;
3):新版高級消費者
啓動消費者 kafka-console-consumer.sh 腳本時,若不指定zookeeper參數而指定bootstrap-server參數,則調用新版高級消費者。
新版本消費者已消費消息的偏移量提交後會保存到名爲「 _consumer_offsets」的內部主題中。
啓動一個新版消費者:kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-id --topic kafka-action
new-consumer 參數直接指定調用新版的消費者,若以bootstrap-server方式啓動,則默認調用的是新版消費者。
查看消費組名信息命令:kafka-consumer-group.sh --bootstrap-server localhost:9092 --list --new-consumer
消費多主題:kafka-console-consumer.sh --bootstrap-server localhost:9092 --new-consumer --consumer-property group.id=new-consumer-test --consumer-property client.id=new-consumer-id --whitelist "kafka-action|producer-per-test"
二、單播與多播
單播:一條消息只能被某一個消費者消費的模式,實現消息單播模式,只須要讓這些消費者屬於同一個消費組便可;
多播:一條消息能夠被多個消費者消費的模式,實現多播模式,只須要讓每一個消費者均屬於不一樣的消費組。
三、查看消費偏移量
kafka提供了 kafka-consumer-group.sh 腳本查看消費偏移量;
該腳本支持 --zookeeper 和 --bootstrap-server 兩種運行方式,支持 list、describe 和 delete 三種操做方式;
list:返回與啓動方式對應的全部消費組,即如果以參數zookeeper啓動,則返回的是老版本的消費者對應的消費組信息,不然返回新版本的消費者隸屬的消費組信息;
describe:查看某個消費組當前的消費狀況;
delete:刪除消費組;支持刪除不包含任何消費者的消費組;只能刪除消費組爲老版本消費者對應的消費組;也能夠指定刪除某個主題的消費組;
四、消費者性能測試工具
kafka 提供了 kafka-consumer-perf-test.sh 腳本對新、舊兩個版本的消費者性能進行測試。
測試腳本提供瞭如下參數:
--broker-list 指定kafka代理
--threads 指定消費者的線程數
--messages 指定總控消費多少條消息
--message-size 指定每條消息的大小
--num-fetch-threads 默認值爲1
--group 指定消費組
--topic 指定消費的主題
--new-consumer 指定對新版本的消費者進行性能測試
--consumer.config 指定消費者級別的配置,以上全部的配置均可以配置在 consumer.config 指定的配置文件中