應用程序使用KafkaConsul'le 「向Kafka 訂閱主題,並從訂閱的主題上接收消息。Kafka的消息讀取不一樣於從其餘消息系統讀取數據,它涉及了一些獨特的概念和想法。java
單個的消費者就跟前面的消息系統的消費者同樣,建立一個消費者對象,而後訂閱一個主題並開始接受消息,而後作本身的業務邏輯,可是Kafka天生就是支持體量很大的數據消費,若是隻是使用單個的消費者消費消息,當生產者寫入消息的速度遠遠大於了消費者的速度,大量消息堆積在消費者上可能會致使性能反而下降或撐爆消費者,因此橫向伸縮是頗有必要的,就想多個生產者能夠向相同的主題寫消息同樣,咱們也可使用多個消費者從同一個主題讀取消息,對消息進行分流,這多個消費者就從屬於一個消費者羣組。一個羣組裏的消費者訂閱的是同一個主題,每一個消費者接收主題一部分分區的消息。node
假設主題T1有四個分區,咱們建立了消費者羣組G1,建立了一個消費者C1從屬於G1,它是G1裏的惟一的消費者,此時訂閱主題狀況爲,C1將會接收到主題中四個分區中的消息,如圖:正則表達式
此時咱們在消費者羣組中新增一個消費者C2,那麼每一個消費者將分別從兩個分區接受消息,如圖:apache
若是咱們有四個消費者時,將會每一個消費者都分到一個分區。bootstrap
若是羣組中的消費者超過了主題的分區數,那麼有一部分消費者就會被閒置,不會接收任何消息。如圖:緩存
往羣組裏增長消費者是橫向伸縮消費能力的主要方式。服務器
對於多個羣組來講,每一個羣組都會從Kafka中接收到全部的消息,而且各個羣組之間是互不干擾的。因此橫向伸縮Kafka消費者和消費者羣組並不會對性能形成負面影響。簡而言之就是,爲每個須要獲取一個或多個主題所有消息的應用程序建立一個消費者羣組,而後往羣組裏添加消費者來伸縮讀取能力和處理能力,羣組裏的每一個消費者只處理一部分消息。如圖:網絡
一個新的消費者加入羣組時,它讀取的是本來由其餘消費者讀取的消息。當一個消費者被關閉或發生奔潰時,它就離開羣組,本來由它讀取的分區將由羣組裏的其餘消費者來讀取。在主題發生變化時, 好比管理員添加了新的分區,會發生分區重分配。分區的全部權從一個消費者變成了裏另外一個消費者,這樣的行爲被稱爲再均衡。再均衡很是重要, 它爲消費者羣組帶來了高可用性和伸縮性(咱們能夠放心地添加或移除消費者),不過在正常狀況下,咱們並不但願發生這樣的行爲。在再均衡期間,消費者沒法讀取消息,形成整個羣組一小段時間的不可用。另外,當分區被從新分配給另外一個消費者時,消費者當前的讀取狀態會丟失,它有可能還須要去刷新緩存,在它從新恢復狀態以前會拖慢應用程序。session
消費者經過向被指派爲羣組協調器的broker (不一樣的羣組能夠有不一樣的協調器)發送心跳來維持它們和羣組的從屬關係以及它們對分區的全部權關係。只要消費者以正常的時間間隔發送心跳,就被認爲是活躍的,說明它還在讀取分區裏的消息。消費者會在輪詢消息(爲了獲取消息)或提交偏移量時發送心跳。若是消費者中止發送心跳的時間足夠長,會話就會過時,羣組協調器認爲它已經死亡,就會觸發一次再均衡。若是一個消費者發生崩潰,井中止讀取消息,羣組協調器會等待幾秒鐘,確認它死亡了纔會觸發再均衡。在這幾秒鐘時間裏,死掉的消費者不會讀取分區裏的消息。在清理消費者時,消費者會通知協調器它將要離開羣組,協調器會當即觸發一次再均衡,儘可能下降處理停頓。socket
在建立KafkaConsumer以前,須要將消費者想要的屬性存放到Properties中,而後再將properties傳給KafkaConsumer。
Consuer也有三個必須的屬性。bootstrap.servers,這裏跟Producer同樣,另外兩個key.deserializer和value.deserializer也與Producer相似,不過一個是序列化,一個是反序列化而已。
還有一個group.id不是必須的,可是咱們一般都會指定改消費者屬於哪一個羣組,因此也能夠認爲是必須的。
設置Properties的代碼片斷以下:
Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value容許使用參數化類型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消費者所屬的羣組 kafkaPropertie.put("group.id","one");
接下來建立消費者,將Properties對象傳入到消費者,而後訂閱主題,以下:
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*訂閱主題,這裏使用的是最簡單的訂閱testTopic主題,這裏也能夠出入正則表達式,來區分想要訂閱的多個指定的主題,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("testTopic"));
接下來輪詢消息,以下:
//輪詢消息 while (true) { //獲取ConsumerRecords,一秒鐘輪訓一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消費消息,遍歷records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); }
生產者發送消息,而後查看消費者打印狀況:
KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world0 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world1 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world2 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world3 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world4 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world5 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world6 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world7 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world8 KafkaConsuerDemo - partition: KafkaConsuerDemo - topic: KafkaConsuerDemo - offset: key1:hello world9
只存在一個組羣和一個消費者時:
當咱們啓動兩個消費者,同一個組羣,並在Topic上建立兩個Partition(分區),發送消息
final ProducerRecord<String, String> record = new ProducerRecord<String, String>("one",i % 2,"key3","hello world" + i);
將消息分發到0和1兩個partition
此時兩個消費者消費的消息總和等於發送的消息的總和,使用不一樣的羣組的不一樣的訂閱同一個topic,每一個消費者羣組都能收到全部的消息。
輪詢不僅是獲取數據那麼簡單。在第一次調用新消費者的poll ()方法時,它會負責查找GroupCoordinator , 而後加入羣組,接受分配的分區。若是發生了再均衡,整個過程也是在輪詢期間進行的。固然,心跳也是從輪詢裏發送出去的。因此,咱們要確保在輪詢期間所作的任何處理工做都應該儘快完成。
消費者完整代碼以下:
package com.wangx.kafka.client; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class KafkaConsuerDemo { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsuerDemo.class); public static void main(String[] args) throws InterruptedException { Properties kafkaPropertie = new Properties(); //配置broker地址,配置多個容錯 kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093"); //配置key-value容許使用參數化類型,反序列化 kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); //指定消費者所屬的羣組 kafkaPropertie.put("group.id","1"); //建立KafkaConsumer,將kafkaPropertie傳入。 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie); /*訂閱主題,這裏使用的是最簡單的訂閱testTopic主題,這裏也能夠出入正則表達式,來區分想要訂閱的多個指定的主題,如: *Pattern pattern = new Pattern.compile("testTopic"); * consumer.subscribe(pattern); */ consumer.subscribe(Collections.singletonList("one")); //輪詢消息 while (true) { //獲取ConsumerRecords,一秒鐘輪訓一次 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); //消費消息,遍歷records for (ConsumerRecord<String, String> r : records) { LOGGER.error("partition:", r.partition()); LOGGER.error("topic:", r.topic()); LOGGER.error("offset:", r.offset()); System.out.println(r.key() + ":" + r.value()); } Thread.sleep(1000); } } }
1. fetch.min.bytes: 該屬性指定了消費者從服務器獲取記錄的最小字節數。
2. fetch.max.wait.ms:咱們經過 fetch.min.byte告訴Kafka ,等到有足夠的數據時才把它返回給消費者。
而 fetch.max.wait.ms則用於指定broker 的等待時間
3. max.partition.fetch.bytes:默認值是1MB,該屬性指定了服務器從每一個分區裏返回給消費者的最大字節數.
4. session.timeout.ms: 默認3s,該屬性指定了消費者在被認爲死亡以前能夠與服務器斷開鏈接的時
5. auto.offset.reset:該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下(因消費者長時間失效,包含偏移量的記錄已通過時井被刪除)該做何處
6. enable.auto.commit:該屬性指定了消費者是否自動提交偏移量,默認值是true。
7. partition.assignment.strategy: 分區分配給消費者羣組的分配策略,有以下兩種策略:
Range:該策略會把主題的若干個連續的分區分配給消費者.
RoundRobin:該策略把主題的全部分區逐個分配給消費.
8. client.id:該屬性能夠是任意字符串, broker 用它來標識從客戶端發送過來的消息,一般被用在日誌、度量指標和配額裏。
9. max.poll.records: 該屬性用於控制單次調用call () 方法可以返回的記錄數量,能夠幫你控制在輪詢裏須要處理的數據量。
10. receive.buffer.bytes 和send.buffer.bytes: socket 在讀寫數據時用到的TCP 緩衝區也能夠設置大小。若是它們被設爲-1,就使用操做系統的默認值。若是生產者或消費者與broker處於不一樣的數據中心內,能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。