原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94html
上一篇文章記錄了kafka的副本機制和容錯功能的說明,本篇則主要在上一篇文章的基礎上,驗證多分區Topic的消費者的功能驗證;java
目錄:redis
新建1副本,2分區的Topic作測試驗證shell
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test
查看對應的Topic分區狀況bootstrap
[root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test Topic:arnold_consumer_test PartitionCount:2 ReplicationFactor:1 Configs: Topic: arnold_consumer_test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: arnold_consumer_test Partition: 1 Leader: 2 Replicas: 2 Isr: 2
建立Topic每一個分區只設置了一個副本及主副本,因此如上可看到,各分區所在的broker節點的狀況。api
配置消費者組group.id信息爲:test-consumer-group-arnold-1緩存
修改 kafka下 config目錄下的consumer.properties,修改內容爲: bootstrap.servers=10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 group.id=test-consumer-group-arnold-1
分別在兩臺kafka服務器上的 kafka 主目錄下啓動兩個消費者,並指定對應的消費者配置爲 consumer.properties文件,消費的Topic 爲arnold_consumer_test topic服務器
10.0.6.39啓動消費者 [gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 10.0.3.17 啓動消費者 [root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 兩個消費者都是使用的相同的consumer.properties文件,即都是在一個消費組裏面(爲何要在兩臺服務器上啓動兩個消費者?不能在一個服務器上啓動兩個消費者嗎?答:均可以,我之因此用兩個不一樣的服務器啓動消費者是由於我當前39服務器啓動消費者後,當前的shell進程就已經被佔用了,處於等待狀態,除非我再開一個39服務器的會話,從新開一個消費者。)
OK,消費者啓動之後,觀察下消費者和Topic分區的對應狀況app
查看當前全部的消費組的列表信息分佈式
[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list test-consumer-group-arnold-1 test-consumer-group-arnold test-consumer-group
如上,能夠知道當前kafka服務器上已有的消費組分別是有三個,而咱們如今已經啓動了的消費者組是test-consumer-group-arnold-1,因此,詳細查看下消費組test-consumer-group-arnold-1的詳細信息
[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --describe --group test-consumer-group-arnold-1 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID arnold_consumer_test 0 19 19 0 consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae /10.0.6.39 consumer-1 arnold_consumer_test 1 19 19 0 consumer-1-917ecb37-3027-45de-b293-fe5125867432 /10.0.3.17 consumer-1
CURRENT-OFFSET: 當前消費組消費到的偏移量
LOG-END-OFFSET: 日誌最後的偏移量
CURRENT-OFFSET = LOG-END-OFFSET 說明當前消費組已經所有消費了;
LAG:表示落後未消費的數據量
能夠看到當前topic arnold_consumer_test 的Partition 0分區對應的消費者id是
consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae,該消費者對應的host是
10.0.6.39;經過上述內容就能夠很清晰的知道,當前所啓動的消費組下的兩個消費者分別對應消費的是topic的那個分區,OK進行下測試
啓動生產者生產數據
[root@dev bin]# ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_consumer_test >message1 >message2 按照kafka的消息路由策略,此時插入message1和message2兩條消息,將會採用輪訓的策略分別插入到兩個分區中;(不清楚的話能夠看下上篇的內容,這塊都有作過說明) 此時partition0分區中將會接收到 message1的消息,partition2分區中將會接受到message2的消息,而後又分別由partition0分區所對應的 10.0.6.39的消費者消費到對應的數據,partition1同理 此時查看消費者的情況以下: 10.0.6.39 [root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties message1 10.0.3.17 [gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties message2
驗證完畢,內容很簡單,可是想要表達記錄下來還真的是着實有些麻煩;因此,後續其餘的一些規則,此處就直接放總結了,再也不列出來實驗過程;
上述的內容,則也是均可以經過使用kafka-consumer-groups.sh命令,查看消費組下的消費者所對應的分區的狀況便可得知對應的結果;
此時若是一個消費組已經在消費的狀況下,此時又來了新的消費組進行消費,那就按照新的消費組規則來消費便可, 不會影響到其餘消費組;舉例,此時一個消費組三個消費者,在進行數據的消費;此時新來了一個消費組,只有一個消費者,那麼此時這個消費者會消費全部的消費分區,不會和其餘的消費組有任何的重疊,原理是,kafka的消費組其實在kafka中也是一個消費者topic分區的概念,分區中記錄各個消費組的消費的offset位移信息,以此保證全部的消費者所消費的內容的offset位移互不影響,關於這個概念後續會詳細說明一下,其實挺重要的。
另外,上述只作了部分的測試驗證,便直接給出了最終的總結內容,對於部分測試內容並無再在本篇列出來(由於不少步驟其實都是重複的);可是,無心中發現了一個老哥的博客,已經對這方面也作了詳細的測試,詳情還須要看剩下的測試方式的,能夠點擊這個連接查看;
原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94
原本這篇文章在上述的消費者和Partition的關係介紹完之後也就結束了,可是在寫完之後,翻了下博客園的首頁發現有一個推薦的kafka的帖子就順手點進去想get點技能,而後結果有點傷心,文章中對於一些kafka數據重複的問題一筆帶過。。甚至沒有說明爲何kakfa會出現數據重複的問題,只是說這是kafka的一種自我保護的機制產生的。。。這,就很傷心,因而本篇內容再對kafka數據重複的問題作一下說明,這些問題遲早也都要記錄的。
對於kafka的使用上,其實Java代碼的實現是相對簡單的,網上的內容也有不少,可是若是對於kafka的一些基本概念就不熟悉的話,在使用過程當中便會出現不少懵逼的事情,因此這篇文章包括前兩篇的文章,則都是重點在說kafka的一些機制的問題,固然後續對於kafka java的一些配置和實現,也會作一些記錄說明。
回到問題自己,爲何kafka有時候會出現消費者的數據重複問題?首先,消費者的數據自己是來自於生產者生產的數據,因此瞭解生產者所生產數據的可靠性機制,便和當前的問題有這直接的關聯了。
生產者的數據可靠性,在配置上是根據kafka 生產者的 Request.required.acks 來配置生產者消息可靠性;
Request.required.acks=-1 (ISR全量同步確認,強可靠性保證) Request.required.acks = 1(leader 確認收到,無需保證其它副本也確認收到, 默認) Request.required.acks = 0 (不確認,可是吞吐量大)
在分佈式的系統中,有一個對應的ACP理論,分別是:
可用性(Availability):在集羣中一部分節點故障後,集羣總體是否還能響應客戶端的讀寫請求。(對數據更新具有高可用性)
一致性(Consistency):在分佈式系統中的全部數據備份,在同一時刻是否一樣的值。(等同於全部節點訪問同一份最新的數據副本)
分區容忍性(Partition tolerance):以實際效果而言,分區至關於對通訊的時限要求。系統若是不能在時限內達成數據一致性,就意味着發生了分區的狀況,必須就當前操做在C和A之間作出選擇。
在分佈式系統的設計中,沒有一種設計能夠同時知足一致性,可用性,分區容錯性 3個特性;因此kafka也不例外;
若是想實現 kafka 配置爲 CP(Consistency & Partition tolerance) 系統, 配置須要以下:
request.required.acks=-1 min.insync.replicas = ${N/2 + 1} unclean.leader.election.enable = false
如圖所示,在 acks=-1 的狀況下,新消息只有被 ISR 中的全部 follower(f1 和 f2, f3) 都從 leader 複製過去纔會回 ack, ack 後,不管那種機器故障狀況(所有或部分), 寫入的 msg4,都不會丟失, 消息狀態知足一致性 C 要求。
正常狀況下,全部 follower 複製完成後,leader 回 producer ack。
異常狀況下,若是當數據發送到 leader 後部分副本(f1 和 f2 同步), leader 掛了?此時任何 follower 都有可能變成新的 leader, producer 端會獲得返回異常,此時producer端會從新發送數據,此時數據重複
解決消費重複的方式有不少啊,第一個就是你的業務場景無需在乎數據重複的問題,這個天然也就業務上解決了;第二個則是消費者本身作一層緩存過濾便可,由於生產數據重複畢竟是節點down機纔會出現的問題,在down機的這一剎那沒有被同步到follower的數據並不會不少,因此,以數據量爲緩存,或者以時間爲緩存均可以解決這個問題,好比加一個緩存區,只要判斷數據重複了則再也不重複消費便可,而後當緩存的數據超過了1M,則清除一次緩存區;或者直接緩存到redis中,使用redis api來去重,定時清理一下redis中的數據也能夠;
除了生產者數據重複外,還有一種問題是,生產者數據沒有重複,可是消費者消費的數據重複了,這種問題,則是因爲消費者offset自動提交的問題,如,消費者offset是1s提交一次,此時0.5s消費了5條數據,可是消費者尚未到1s自動提交的時候,消費者掛掉,此時已經消費的5條數據的偏移量因爲沒有提交到kafka,因此kafka中是沒有記錄到當前已經消費到的偏移量的,此時消費者重啓,則會從5條數據前從新消費,這個問題通常比較好解決,由於大多數狀況下若是使用消費者手動提交的模式,通常不會出現這種問題(手動提交的狀況下若是出現異常,沒有執行提交代碼,那麼代碼中作好數據消費的回滾操做便可,更加可控);
除了數據重複的狀況,另一種問題,則是kafka數據丟失的問題
首先按照上述的kafka的cp系統的配置方式,是絕對不會出現數據丟失的狀況的,由於要麼各節點不工做,要麼各節點數據同步完成後,纔會返回ack,此時消息不會丟失且消息狀態一致;
除了配置kakfa爲cp系統外,還能夠配置kafka爲AP(Availability & Partition tolerance)系統
request.required.acks=1 min.insync.replicas = 1 unclean.leader.election.enable = false
AP系統下生產者的吞吐量相對更高,可是因爲request.required.acks 配置爲1,即leader主副本收到消息便直接返回ack,此時若是leader接收到生產者消息後,返回了ack的標識,可是此時副本節點還都沒有進行同步,此時leader節點掛掉,從新進行leader選舉,新的follower選爲leader後,則此時消息丟失;
因此根據合適的業務場景,使用合適的kafka模式則是最佳的選擇。
原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94
新建Topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test 查看Topic詳細信息 [root@dev bin]# ./kafka-topics.sh --describe --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test Topic:arnold_consumer_test PartitionCount:2 ReplicationFactor:1 Configs: Topic: arnold_consumer_test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: arnold_consumer_test Partition: 1 Leader: 2 Replicas: 2 Isr: 2 啓動消費者 bin/kafka-console-consumer.sh --bootstrap-server 10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092 --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties 啓動生產者 ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic arnold_consumer_test 查看當前全部的消費組的列表信息 ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list test-consumer-group-arnold-1 查看消費者組的詳細信息 [root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --describe --group test-consumer-group-arnold-1 TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID arnold_consumer_test 0 19 19 0 consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae /10.0.6.39 consumer-1 arnold_consumer_test 1 19 19 0 consumer-1-917ecb37-3027-45de-b293-fe5125867432 /10.0.3.17 consumer-1