應用程序使用KafkaConsumer向Kafka訂閱主題,並從訂閱的主題上接收消息。Kafka消費者從屬於消費者羣組,一個羣組裏的消費者訂閱的是同一個主題,每一個消費者接收主題的一部分分區的消息。正則表達式
一個分區不能被一個消費者羣組裏的多個消費者消費,所以若是消費者超過主題的分區數量,那麼就有一部分消費者被閒置。apache
分區的全部權從一個消費者轉移到另外一個消費者,這樣的行爲叫作在均衡,不過在均衡期間消費者沒法讀取消息,形成整個羣組一小段時間不可用。bootstrap
消費者經過被指派爲羣組協調器的broker發送心跳來維持它們和羣組的從屬關係以及它們對分區的全部權關係。安全
在讀取消息以前,首先建立一個KafkaConsumer對象,有三個必選屬性:bootstrap.servers,key.deserializer,value.deserializer,第四個屬性group.id不是必須的,它指定了消費者屬於哪一個消費者羣組。服務器
訂閱主題consumer.subscribe()方法,能夠指定特定主題,或使用正則表達式。消息輪詢是消費者API的核心,經過一個簡單的輪詢向服務器請求數據。一旦消費者訂閱了主題,輪詢就會處理全部細節,包括羣組協調,分區再均衡,發送心跳和獲取數據。網絡
返回的每條數據都包含記錄所屬主題信息,記錄所作分區信息,記錄在分區的偏移量,以及記錄鍵值對。session
在退出以前使用consume.close()關閉消費者,網絡鏈接和socket也會隨之關閉。socket
咱們沒法讓一個線程運行多個消費者,也沒法讓多個線程安全共享一個消費者。按照規則,一個消費者使用一個線程。tcp
消費者的配置fetch
1.fetch.min.bytes
該屬性指定了消費者從服務器獲取記錄的最小字節數。
2.fetch.max.wait.ms
指定broker的等待時間,默認是500ms,這個條件和上一個條件哪個先知足,都會觸發broker向消費者發送數據。
3.max.partition.fetch.bytes
該屬性指定了服務器從每一個分區返回給消費者的最大字節數,默認是1MB.這個數值必須比max.message.size大。
4.session.timeout.ms
該屬性指定了消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認是3秒,若是消費者沒有在這個指定時間內發送心跳給羣組協調器,就會被認爲已經死亡。協調器就會觸發在平衡,把它的分區分配給羣組的其餘消費者。這個屬性與heartbeat.interval.ms緊密相關,這個屬性指定了消費者能夠多久不發送心跳。通常同時修改這兩個屬性,heartbeat.interval.ms通常是session.timeout.ms的三分之一。
5.auto.offset.reset
該屬性指定了消費者在讀取一個沒有偏移量的分區,或偏移量無效狀況下該做何處理,默認值是latest,意思是偏移量無效狀況下,消費者從最新記錄開始讀取數據,另外一個值是earliest,意思是從起始位置讀取數據。
6.enable.auto.commit
該屬性指定了消費者是否自動提交偏移量,默認是true,爲了儘可能避免重複數據和數據丟失,能夠把它設爲false,由本身控制什麼時候提交偏移量。
7.partition.assignment.strategy
分區會被分配給羣組的消費者,partitionAssignor根據給定消費者和主題,決定哪些分區應該被分配給哪一個消費者,有兩個默認分配策略:
Range:若干連續分區分配
RoundRobin:逐個分配給消費者
默認是org.apache.kafka.clients.consumer.RangeAssignor,這個類實現了Range策略,也能夠改成org.apache.kafka.clients.consumer.RoundRobinAssignor
能夠是任意字符串,broker用它標記從客戶端發送過來的消息,一般被用在日誌,度量指標和配額裏。
9.max.poll.records
該屬性用於控制單次調用call方法可以返回的記錄數量
10.receive.buffer.bytes和send.buffer.bytes
socket讀寫數據時tcp緩衝區的大小,若是=-1,就使用操做系統默認值。