kafka 消費組功能驗證以及消費者數據重複數據丟失問題說明 3

原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94html

背景

上一篇文章記錄了kafka的副本機制和容錯功能的說明,本篇則主要在上一篇文章的基礎上,驗證多分區Topic的消費者的功能驗證;java

目錄:redis

消費組功能驗證

新建1副本,2分區的Topic作測試驗證shell

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test

查看對應的Topic分區狀況bootstrap

[root@dev bin]# ./kafka-topics.sh --describe  --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test
Topic:arnold_consumer_test      PartitionCount:2        ReplicationFactor:1     Configs:
Topic: arnold_consumer_test     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
Topic: arnold_consumer_test     Partition: 1    Leader: 2       Replicas: 2     Isr: 2

建立Topic每一個分區只設置了一個副本及主副本,因此如上可看到,各分區所在的broker節點的狀況。api

配置消費者組group.id信息爲:test-consumer-group-arnold-1緩存

修改 kafka下 config目錄下的consumer.properties,修改內容爲:

bootstrap.servers=10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092
group.id=test-consumer-group-arnold-1

分別在兩臺kafka服務器上的 kafka 主目錄下啓動兩個消費者,並指定對應的消費者配置爲 consumer.properties文件,消費的Topic 爲arnold_consumer_test topic服務器

10.0.6.39啓動消費者

[gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server  10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092  --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties

10.0.3.17 啓動消費者

[root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server  10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092  --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties

兩個消費者都是使用的相同的consumer.properties文件,即都是在一個消費組裏面(爲何要在兩臺服務器上啓動兩個消費者?不能在一個服務器上啓動兩個消費者嗎?答:均可以,我之因此用兩個不一樣的服務器啓動消費者是由於我當前39服務器啓動消費者後,當前的shell進程就已經被佔用了,處於等待狀態,除非我再開一個39服務器的會話,從新開一個消費者。)

OK,消費者啓動之後,觀察下消費者和Topic分區的對應狀況app

查看當前全部的消費組的列表信息分佈式

[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list
test-consumer-group-arnold-1
test-consumer-group-arnold
test-consumer-group

如上,能夠知道當前kafka服務器上已有的消費組分別是有三個,而咱們如今已經啓動了的消費者組是test-consumer-group-arnold-1,因此,詳細查看下消費組test-consumer-group-arnold-1的詳細信息

[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092  --describe --group  test-consumer-group-arnold-1

TOPIC                        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
arnold_consumer_test             0          19              19           0               consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae            /10.0.6.39      consumer-1

arnold_consumer_test             1          19              19           0               consumer-1-917ecb37-3027-45de-b293-fe5125867432            /10.0.3.17      consumer-1

CURRENT-OFFSET: 當前消費組消費到的偏移量
LOG-END-OFFSET: 日誌最後的偏移量
CURRENT-OFFSET = LOG-END-OFFSET 說明當前消費組已經所有消費了;
LAG:表示落後未消費的數據量

能夠看到當前topic arnold_consumer_test 的Partition 0分區對應的消費者id是
consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae,該消費者對應的host是
10.0.6.39;經過上述內容就能夠很清晰的知道,當前所啓動的消費組下的兩個消費者分別對應消費的是topic的那個分區,OK進行下測試

啓動生產者生產數據

[root@dev bin]# ./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic  arnold_consumer_test
>message1
>message2

按照kafka的消息路由策略,此時插入message1和message2兩條消息,將會採用輪訓的策略分別插入到兩個分區中;(不清楚的話能夠看下上篇的內容,這塊都有作過說明)

此時partition0分區中將會接收到 message1的消息,partition2分區中將會接受到message2的消息,而後又分別由partition0分區所對應的 10.0.6.39的消費者消費到對應的數據,partition1同理

此時查看消費者的情況以下:

10.0.6.39
[root@dev kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server  10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092  --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties
message1

10.0.3.17
[gangtise@yinhua-ca000003 kafka_2.11-2.1.0]$ bin/kafka-console-consumer.sh --bootstrap-server  10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092  --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties
message2

驗證完畢,內容很簡單,可是想要表達記錄下來還真的是着實有些麻煩;因此,後續其餘的一些規則,此處就直接放總結了,再也不列出來實驗過程;

消費者與分區的對應關係總結

  • topic 3個分區的狀況,啓動一個消費者組且只有一個消費者,則該消費者會消費topic的3個分區;
  • topic 3個分區的狀況,啓動一個消費者組且只有兩個消費者c1,c2,則將會有一個消費者負責消費兩個分區,另一個消費者負責消費一個分區;
  • topic 3個分區的狀況,啓動一個消費者組且有三個消費者c1,c2,c3,則正常對應分區消費,一個消費者對應一個partition分區;
  • topic 3個分區的狀況,啓動一個消費者組且有四個消費者c1,c2,c3,c4,則通常狀況下沒有人這樣作。。。太愚蠢了。。。因此我也就沒作這個測試,可是按照kafka的規則來看,會有第四個消費者消費不到對應的分區,也就是不會消費到任何數據。。

上述的內容,則也是均可以經過使用kafka-consumer-groups.sh命令,查看消費組下的消費者所對應的分區的狀況便可得知對應的結果;

此時若是一個消費組已經在消費的狀況下,此時又來了新的消費組進行消費,那就按照新的消費組規則來消費便可, 不會影響到其餘消費組;舉例,此時一個消費組三個消費者,在進行數據的消費;此時新來了一個消費組,只有一個消費者,那麼此時這個消費者會消費全部的消費分區,不會和其餘的消費組有任何的重疊,原理是,kafka的消費組其實在kafka中也是一個消費者topic分區的概念,分區中記錄各個消費組的消費的offset位移信息,以此保證全部的消費者所消費的內容的offset位移互不影響,關於這個概念後續會詳細說明一下,其實挺重要的。

另外,上述只作了部分的測試驗證,便直接給出了最終的總結內容,對於部分測試內容並無再在本篇列出來(由於不少步驟其實都是重複的);可是,無心中發現了一個老哥的博客,已經對這方面也作了詳細的測試,詳情還須要看剩下的測試方式的,能夠點擊這個連接查看;

原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94

消費者數據重複問題說明

原本這篇文章在上述的消費者和Partition的關係介紹完之後也就結束了,可是在寫完之後,翻了下博客園的首頁發現有一個推薦的kafka的帖子就順手點進去想get點技能,而後結果有點傷心,文章中對於一些kafka數據重複的問題一筆帶過。。甚至沒有說明爲何kakfa會出現數據重複的問題,只是說這是kafka的一種自我保護的機制產生的。。。這,就很傷心,因而本篇內容再對kafka數據重複的問題作一下說明,這些問題遲早也都要記錄的。

對於kafka的使用上,其實Java代碼的實現是相對簡單的,網上的內容也有不少,可是若是對於kafka的一些基本概念就不熟悉的話,在使用過程當中便會出現不少懵逼的事情,因此這篇文章包括前兩篇的文章,則都是重點在說kafka的一些機制的問題,固然後續對於kafka java的一些配置和實現,也會作一些記錄說明。

回到問題自己,爲何kafka有時候會出現消費者的數據重複問題?首先,消費者的數據自己是來自於生產者生產的數據,因此瞭解生產者所生產數據的可靠性機制,便和當前的問題有這直接的關聯了。

生產者的可靠性保證

生產者的數據可靠性,在配置上是根據kafka 生產者的 Request.required.acks 來配置生產者消息可靠性;

Request.required.acks=-1 (ISR全量同步確認,強可靠性保證)
Request.required.acks = 1(leader 確認收到,無需保證其它副本也確認收到, 默認)
Request.required.acks = 0 (不確認,可是吞吐量大)

在分佈式的系統中,有一個對應的ACP理論,分別是:

  • 可用性(Availability):在集羣中一部分節點故障後,集羣總體是否還能響應客戶端的讀寫請求。(對數據更新具有高可用性)

  • 一致性(Consistency):在分佈式系統中的全部數據備份,在同一時刻是否一樣的值。(等同於全部節點訪問同一份最新的數據副本)

  • 分區容忍性(Partition tolerance):以實際效果而言,分區至關於對通訊的時限要求。系統若是不能在時限內達成數據一致性,就意味着發生了分區的狀況,必須就當前操做在C和A之間作出選擇。

在分佈式系統的設計中,沒有一種設計能夠同時知足一致性,可用性,分區容錯性 3個特性;因此kafka也不例外;

Kafka 生產者CP系統

若是想實現 kafka 配置爲 CP(Consistency & Partition tolerance) 系統, 配置須要以下:

request.required.acks=-1
min.insync.replicas = ${N/2 + 1}
unclean.leader.election.enable = false

如圖所示,在 acks=-1 的狀況下,新消息只有被 ISR 中的全部 follower(f1 和 f2, f3) 都從 leader 複製過去纔會回 ack, ack 後,不管那種機器故障狀況(所有或部分), 寫入的 msg4,都不會丟失, 消息狀態知足一致性 C 要求。

正常狀況下,全部 follower 複製完成後,leader 回 producer ack。

異常狀況下,若是當數據發送到 leader 後部分副本(f1 和 f2 同步), leader 掛了?此時任何 follower 都有可能變成新的 leader, producer 端會獲得返回異常,此時producer端會從新發送數據,此時數據重複

解決消費重複的方式有不少啊,第一個就是你的業務場景無需在乎數據重複的問題,這個天然也就業務上解決了;第二個則是消費者本身作一層緩存過濾便可,由於生產數據重複畢竟是節點down機纔會出現的問題,在down機的這一剎那沒有被同步到follower的數據並不會不少,因此,以數據量爲緩存,或者以時間爲緩存均可以解決這個問題,好比加一個緩存區,只要判斷數據重複了則再也不重複消費便可,而後當緩存的數據超過了1M,則清除一次緩存區;或者直接緩存到redis中,使用redis api來去重,定時清理一下redis中的數據也能夠;


除了生產者數據重複外,還有一種問題是,生產者數據沒有重複,可是消費者消費的數據重複了,這種問題,則是因爲消費者offset自動提交的問題,如,消費者offset是1s提交一次,此時0.5s消費了5條數據,可是消費者尚未到1s自動提交的時候,消費者掛掉,此時已經消費的5條數據的偏移量因爲沒有提交到kafka,因此kafka中是沒有記錄到當前已經消費到的偏移量的,此時消費者重啓,則會從5條數據前從新消費,這個問題通常比較好解決,由於大多數狀況下若是使用消費者手動提交的模式,通常不會出現這種問題(手動提交的狀況下若是出現異常,沒有執行提交代碼,那麼代碼中作好數據消費的回滾操做便可,更加可控);

除了數據重複的狀況,另一種問題,則是kafka數據丟失的問題
首先按照上述的kafka的cp系統的配置方式,是絕對不會出現數據丟失的狀況的,由於要麼各節點不工做,要麼各節點數據同步完成後,纔會返回ack,此時消息不會丟失且消息狀態一致;

Kafka 生產者AP系統

除了配置kakfa爲cp系統外,還能夠配置kafka爲AP(Availability & Partition tolerance)系統

request.required.acks=1
min.insync.replicas = 1
unclean.leader.election.enable = false

AP系統下生產者的吞吐量相對更高,可是因爲request.required.acks 配置爲1,即leader主副本收到消息便直接返回ack,此時若是leader接收到生產者消息後,返回了ack的標識,可是此時副本節點還都沒有進行同步,此時leader節點掛掉,從新進行leader選舉,新的follower選爲leader後,則此時消息丟失;

因此根據合適的業務場景,使用合適的kafka模式則是最佳的選擇。

原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94

上述命令彙總

新建Topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic arnold_consumer_test

查看Topic詳細信息
[root@dev bin]# ./kafka-topics.sh --describe  --zookeeper 10.0.3.17:2181 --topic arnold_consumer_test
Topic:arnold_consumer_test      PartitionCount:2        ReplicationFactor:1     Configs:
Topic: arnold_consumer_test     Partition: 0    Leader: 1       Replicas: 1     Isr: 1
Topic: arnold_consumer_test     Partition: 1    Leader: 2       Replicas: 2     Isr: 2

啓動消費者
bin/kafka-console-consumer.sh --bootstrap-server  10.0.3.17:9092,10.0.6.39:9092,10.0.3.18:9092  --from-beginning --topic arnold_consumer_test --consumer.config config/consumer.properties

啓動生產者
./kafka-console-producer.sh --broker-list 10.0.3.17:9092 --topic  arnold_consumer_test

查看當前全部的消費組的列表信息
./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092 --list test-consumer-group-arnold-1

查看消費者組的詳細信息
[root@dev bin]# ./kafka-consumer-groups.sh --bootstrap-server 10.0.3.17:9092  --describe --group  test-consumer-group-arnold-1

TOPIC                        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
arnold_consumer_test             0          19              19           0               consumer-1-2da16674-5b57-4e6c-9660-ab45de6ed5ae            /10.0.6.39      consumer-1

arnold_consumer_test             1          19              19           0               consumer-1-917ecb37-3027-45de-b293-fe5125867432            /10.0.3.17      consumer-1

文章來源於本人的印象筆記,如出現格式問題可訪問該連接查看原文

原創聲明:做者:Arnold.zhao 博客園地址:https://www.cnblogs.com/zh94

相關文章
相關標籤/搜索