consumer是底層採用的是一個阻塞隊列,只要一有producer生產數據,那consumer就會將數據消費。固然這裏會產生一個很嚴重的問題,若是你重啓一消費者程序,那你連一條數據都抓不到,可是log文件中明明能夠看到全部數據都好好的存在。換句話說,一旦你消費過這些數據,那你就沒法再次用同一個groupid消費同一組數據了。html
緣由:消費者消費了數據並不從隊列中移除,只是記錄了offset偏移量。同一個consumergroup的全部consumer合起來消費一個topic,而且他們每次消費的時候都會保存一個offset參數在zookeeper的root上。若是此時某個consumer掛了或者新增一個consumer進程,將會觸發kafka的負載均衡,暫時性的重啓全部consumer,從新分配哪一個consumer去消費哪一個partition,而後再繼續經過保存在zookeeper上的offset參數繼續讀取數據。注意:offset保存的是consumer 組消費的消息偏移。 算法
要消費同一組數據,你能夠api
1 採用不一樣的group。併發
2 經過一些配置,就能夠將線上產生的數據同步到鏡像中去,而後再由特定的集羣區處理大批量的數據。負載均衡
(2) 如何自定義去消費已經消費過的數據socket
Conosumer.properties配置文件中有兩個重要參數fetch
auto.commit.enable:若是爲true,則consumer的消費偏移offset會被記錄到zookeeper。下次consumer啓動時會今後位置繼續消費。spa
auto.offset.reset 該參數只接受兩個常量largest和Smallest,分別表示將當前offset指到日誌文件的最開始位置和最近的位置。 .net
若是進一步想控制時間,則須要調用SimpleConsumer,本身去設置相關參數。比較重要的參數是 kafka.api.OffsetRequest.EarliestTime()和kafka.api.OffsetRequest.LatestTime()分別表示從日誌(數據)的開始位置讀取和只讀取最新日誌。線程
如何使用SimpleConsumer
首先,你必須知道讀哪一個topic的哪一個partition
而後,找到負責該partition的broker leader,從而找到存有該partition副本的那個broker
再者,本身去寫request並fetch數據
最終,還要注意須要識別和處理brokerleader的改變
1. 若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數 。
2. 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻 。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目 。
3. 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣
4. 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化
Kafka儘可能將全部的Partition均勻分配到整個集羣上。一個典型的部署方式是一個Topic的Partition數量大於Broker的數量。
(1) 如何分配副本:
Producer在發佈消息到某個Partition時,先經過ZooKeeper找到該Partition的Leader,而後不管該Topic的Replication Factor爲多少(也即該Partition有多少個Replica),Producer只將該消息發送到該Partition的Leader。Leader會將該消息寫入其本地Log。每一個Follower都從Leader pull數據。這種方式上,Follower存儲的數據順序與Leader保持一致。
(2) Kafka分配Replica的算法以下:
(1)將全部Broker(假設共n個Broker)和待分配的Partition排序
(2)將第i個Partition分配到第(imod n)個Broker上
(3)將第i個Partition的第j個Replica分配到第((i + j) mode n)個Broker上
日誌文件的刪除策略很是簡單:啓動一個後臺線程按期掃描log file列表,把保存時間超過閥值的文件直接刪除(根據文件的建立時間).清理參數在server.properties文件中:
(1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
(2) Broker端使用zookeeper用來註冊broker信息,以及監測partition leader存活性.
(3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息.
Server.properties配置文件中的一個參數
##是否自動建立topic
##若是broker中沒有topic的信息,當producer/consumer操做topic時,是否自動建立.
##若是爲false,則只能經過API或者command建立topic
auto.create.topics.enable=true
原文地址:https://blog.csdn.net/zgc625238677/article/details/52162202