【Kafka】《Kafka權威指南》——從Kafka讀取數據

應用程序使用 KafkaConsumer向 Kafka 訂閱主題,並從訂閱的主題上接收消息 。 從 Kafka 讀取數據不一樣於從其餘悄息系統讀取數據,它涉及一些獨特的概念和想法。若是不先理解 這些概念,就難以理解如何使用消費者 API。因此咱們接下來先解釋這些重要的概念,然 後再舉幾個例子,橫示如何使用消費者 API 實現不一樣的應用程序。java

消費者和消費者羣組

假設咱們有一個應用程序須要從-個 Kafka主題讀取消息井驗證這些消息,而後再把它們 保存起來。應用程序須要建立一個消費者對象,訂閱主題並開始接收消息,而後驗證消息 井保存結果。過了 一陣子,生產者往主題寫入消息的速度超過了應用程序驗證數據的速 度,這個時候該怎麼辦?若是隻使用單個消費者處理消息,應用程序會遠跟不上消息生成 的速度。顯然,此時頗有必要對消費者進行橫向伸縮。就像多個生產者能夠向相同的 主題 寫入消息同樣,咱們也可使用多個消費者從同一個主題讀取消息,對消息進行分流。正則表達式

Kafka 消費者從屬於消費者羣組。一個羣組裏的消費者訂閱的是同一個主題,每一個消費者 接收主題一部分分區的消息。數據庫

假設主題 T1 有 4 個分區,咱們建立了消費者 C1 ,它是羣組 G1 裏惟 一 的消費者,咱們用 它訂閱主題 T1。消費者 Cl1將收到主題 T1所有 4個分區的消息,如圖 4-1 所示。apache

若是在羣組 G1 裏新增一個消費者 C2,那麼每一個消費者將分別從兩個分區接收消息。我 假設消費者 C1接收分區 0 和分區 2 的消息,消費者 C2 接收分區 1 和分區 3 的消息,如圖 4-2 所示。

若是羣組 G1 有 4 個消費者,那麼每一個消費者能夠分配到 一個分區,如圖 4-3 所示。bootstrap

若是咱們往羣組裏添加更多的消費者,超過主題的分區數量,那麼多出的消費者就會被閒置,不會接收到任何消息。

往羣組裏增長消費者是橫向伸縮消費能力的主要方式。 Kafka 消費者常常會作一些高延遲的操做,好比把數據寫到數據庫或 HDFS,或者使用數據進行比較耗時的計算。在這些狀況下,單個消費者沒法跟上數據生成的速度,因此能夠增長更多的消費者,讓它們分擔負載,每一個消費者只處理部分分區的消息,這就是橫向伸縮的主要手段。咱們有必要爲主題建立大量的分區,在負載增加時能夠加入更多的消費者。不過要性意,不要讓消費者的數量超過主題分區的數量,多餘的消費者只會被閒置。數組

除了經過增長消費者來橫向伸縮單個應用程序外,還常常出現多個應用程序從同一個主題讀取數據的狀況。實際上, Kafka 設計的主要目標之一 ,就是要讓 Kafka 主題裏的數據可以知足企業各類應用場景的需求。在這些場景裏,每一個應用程序能夠獲取到全部的消息, 而不僅是其中的 一部分。只要保證每一個應用程序有本身的消費者羣組,就可讓它們獲取到主題全部的消息。不一樣於傳統的消息系統,橫向伸縮 Kafka消費者和消費者羣組並不會對性能形成負面影響。緩存

在上面的例子裏,若是新增一個只包含一個消費者的羣組 G2,那麼這個消費者將從主題 T1 上接收全部的消息,與羣組 G1 之間互不影響。羣組 G2 能夠增長更多的消費者,每一個消費者能夠消費若干個分區,就像羣組 G1 那樣,如圖 4-5 所示。總的來講,羣組 G2 仍是會接收到全部消息,無論有沒有其餘羣組存在。安全

簡而言之,爲每個須要獲取一個或多個主題所有消息的應用程序建立一個消費者羣組, 而後往羣組裏添加消費者來伸縮讀取能力和處理能力,羣組裏的每一個消費者只處理一部分消息。服務器

消費者羣組和分區再均衡

咱們已經從上一個小節瞭解到,羣組裏的消費者共同讀取主題的分區。一個新的消費者加 入羣組時,它讀取的是本來由其餘消費者讀取的消息。當一個消費者被關閉或發生崩潰時,它就離開羣組,本來由它讀取的分區將由羣組裏的其餘消費者來讀取。在主題發生變化時 , 好比管理員添加了新的分區,會發生分區重分配。網絡

分區的全部權從一個消費者轉移到另外一個消費者,這樣的行爲被稱爲再均衡。再均衡很是重要, 它爲消費者羣組帶來了高可用性和伸縮性(咱們能夠放心地添加或移除消費者), 不過在正常狀況下,咱們並不但願發生這樣的行爲。在再均衡期間,消費者沒法讀取消息,形成整個羣組一小段時間的不可用。另外,當分區被從新分配給另 一個消費者時,消費者當前的讀取狀態會丟失,它有可能還須要去刷新緩存 ,在它從新恢復狀態以前會拖慢應用程序。咱們將在本章討論如何進行安全的再均衡,以及如何避免沒必要要的再均衡。

消費者經過向被指派爲 羣組協調器的 broker (不一樣的羣組能夠有不一樣的協調器)發送 心跳 來維持它們和羣組的從屬關係以及它們對分區的全部權關係。只要消費者以正常的時間間隔發送心跳,就被認爲是活躍的,說明它還在讀取分區裏的消息。消費者會在輪詢消息 (爲了獲取消息)或提交偏移量時發送心跳。若是消費者中止發送心跳的時間足夠長,會話就會過時,羣組協調器認爲它已經死亡,就會觸發一次再均衡。

若是一個消費者發生崩潰,井中止讀取消息,羣組協調器(broker)會等待幾秒鐘,確認它死亡了纔會觸發再均衡。在這幾秒鐘時間裏,死掉的消費者不會讀取分區裏的消息。在清理消費者時,消費者會通知協調器它將要離開羣組,協調器會當即觸發一次再均衡,儘可能下降處理停頓。在本章的後續部分,咱們將討論一些用於控制發送心跳頻率和會話過時時間的配置參數,以及如何根據實際須要來配置這些參數 。

分配分區是怎樣的一個過程

當消費者要加入羣組時,它會向羣組協調器發送 一 個 JoinGroup 請求。第 一 個加入羣組的消費者將成爲「羣主」。羣主從協調器那裏得到羣組的成員列 表(列表中包含了全部最近發送過心跳的消費者,它們被認爲是活躍的), 並負責給每個消費者分配分區。它使用 一個實現了 PartitionAssignor接口的類來決定哪些分 區應該被分配給哪一個消費者 。

Kafka 內置了兩種分配策略,在後面的配置參數小節咱們將深刻討論。分配完畢以後,羣主把分配狀況列表發送給羣組協調器,協調器再把這些信息發送給全部消費者。每一個消費者只能看到本身的分配信息,只有羣 主知道羣組 裏全部消費者的分配信息。這個過程會在每次再均衡時重複發生。

建立 Kafka消費者

在讀取消息以前,須要先建立 一個 KafkaConsumer對象 。 建立 KafkaConsumer 對象與建立 KafkaProducer對象很是類似——把想要傳給消費者的屬性放在 Properties 對象裏。本章 後續部分會深刻討論全部的屬性。在這裏,咱們只須要使用 3個必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面代碼演示瞭如何建立一個KafkaConsumer對象:

Properties props = new Properties();
 
props.put("bootstrap.servers", "broker1:9092, broker2:9092");
 
props.put("group.id", "CountryCounter");
 
props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
 
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
複製代碼

deserializer使用指定的類(反序列化器)把字節數組轉成 Java對象。

group.id指定了KafkaConsumer 屬於哪個消費者羣組。 group.id不是必需的,不過咱們如今姑且認爲它是必需的。它指定了 KafkaConsumer 屬於哪個消費者羣組。建立不屬於任何一個羣組的消費者也是能夠的,只是這樣作不太常見。

訂閱主題

建立好消費者以後,下一步能夠開始訂閱主題了。subscribe()方法接受一個主題列表做爲參數

consumer.subscribe(Collections.singletonList("customerCountries"));
複製代碼

在這裏咱們建立了一個包含單個元素的列表,主題的名字叫做「customerCountries」,咱們也能夠在調用subscribe()方法時傳入一個正則表達式,正則表達式能夠匹配多個主題若是有人建立了新的主題,而且主題名與正則表達式匹配,那麼會當即觸發一次再均衡,消費者就能夠讀取新添加的主題。若是應用程序須要讀取多個主題,而且能夠處理不一樣類型的數據,那麼這種訂閱方式就很管用。在Kafka和其餘系統之間複製數據時,使用正則表達式的方式訂閱多個主題時很常見的作法。

要訂閱全部test相關的主題,能夠這樣作:consumer.subscribe("test.*");

輪詢

消息輪詢是消費者 API 的核心,經過一個簡單的輪詢向服務器請求數據。一旦消費者訂閱了主題 ,輪詢就會處理全部的細節,包括羣組協調、分區再均衡、發送心跳和獲取數據, 開發者只須要使用一組簡單的 API 來處理從分區返回的數據。消費者代碼的主要部分以下所示 :

輪詢不僅是獲取數據那麼簡單。在第一次調用新消費者的 poll() 方法時,它會負責查找 GroupCoordinator, 而後加入羣組,接受分配的分區。 若是發生了再均衡,整個過程也是在輪詢期間進行的。固然 ,心跳也是從輪詢裏發迭出去的。因此,咱們要確保在輪詢期間所作的任何處理工做都應該儘快完成。

線程安全

在同一個羣組中,咱們沒法讓一個線程運行多個消費者,也沒法讓多個線程安全地共享一個消費者。按照規則,一個消費者使用一個線程。若是要在同一個消費者羣組裏運行多個消費者,須要讓每一個消費者運行在本身的線程裏。最好是把消費者的邏輯封裝在本身的對象裏,而後使用Java的ExecutorService啓動多個線程,使每一個消費者運行在本身的線程上。

消費者的配置

到目前爲止,咱們學習瞭如何使用消費者 API,不過只介紹了幾個配置屬’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文檔列出了全部與消費者相關的配置說明。大部分參數都有合理的默認值,通常不須要修改它們,不過有一些參數與消費 者的性能和可用性有很大關係。接下來介紹這些重要的屬性。

1. fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。 broker 在收到消費者的數據請求時, 若是可用的數據量小於 fetch.min.bytes指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣能夠下降消費者和 broker 的工做負載,由於它們在主題不是很活躍的時候(或者一天裏的低谷時段)就不須要來來回回地處理消息。若是沒有不少可用數據,但消費者的 CPU 使用率卻很高,那麼就須要把該屬性的值設得比默認值大。若是消費者的數量比較多,把該屬性的值設置得大一點能夠下降 broker 的工做負載。

2. fetch.max.wait.ms

咱們經過 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才把它返回給消費者。而 fetch.max.wait.ms則用於指定 broker的等待時間,默認是 500ms。若是沒有足夠的數據流入 Kafka,消費者獲取最小數據量的要求就得不到知足,最終致使500ms的延遲。 若是要下降潛在的延遲(爲了知足 SLA),能夠把該參數值設置得小一些。若是 fetch.max.wait.ms被設 爲 100ms,而且 fetch.min.bytes 被設爲 1MB,那麼 Kafka在收到消費者的請求後,要麼返 回 1MB 數據,要麼在 100ms 後返回全部可用的數據 , 就看哪一個條件先獲得知足。

3. max.parition.fetch.bytes

該屬性指定了服務器從每一個分區裏返回給消費者的最大字節數。它的默認值是 1MB,也 就是說, KafkaConsumer.poll() 方法從每一個分區裏返回的記錄最多不超過 max.parition.fetch.bytes 指定的字節。若是一個主題有 20個分區和 5 個消費者,那麼每一個消費者須要至少 4MB 的可用內存來接收記錄。在爲消費者分配內存時,能夠給它們多分配一些,因 爲若是羣組裏有消費者發生崩潰,剩下的消費者須要處理更多的分區。 max.parition.fetch.bytes 的值必須比 broker可以接收的最大消息的字節數(經過 max.message.size屬 性配置 )大, 不然消費者可能沒法讀取這些消息,致使消費者一直掛起重試。在設置該屬性時,另外一個須要考慮的因素是消費者處理數據的時間。 消費者須要頻繁調用 poll() 方法來避免會話過時和發生分區再均衡,若是單次調用 poll() 返回的數據太多,消費者須要更多的時間來處理,可能沒法及時進行下一個輪詢來避免會話過時。若是出現這種狀況, 能夠把 max.parition.fetch.bytes 值改小 ,或者延長會話過時時間。

4. session.timeout.ms

該屬性指定了消費者在被認爲死亡以前能夠與服務器斷開鏈接的時間,默認是 3s。若是消費者沒有在 session.timeout.ms 指定的時間內發送心跳給羣組協調器,就被認爲已經死亡,協調器就會觸發再均衡,把它的分區分配給羣組裏的其餘消費者 。該屬性與 heartbeat.interval.ms緊密相關。heartbeat.interval.ms 指定了poll()方法向協調器 發送心跳的頻 率, session.timeout.ms 則指定了消費者能夠多久不發送心跳。因此, 通常須要同時修改這兩個屬性, heartbeat.interval.ms 必須比 session.timeout.ms 小, 通常是 session.timeout.ms 的三分之一。若是 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 ls。 把 session.timeout.ms 值設 得比默認值小,能夠更快地檢測和恢 復崩潰的節點,不過長時間的輪詢或垃圾收集可能致使非預期的再均衡。把該屬性的值設置得大一些,能夠減小意外的再均衡 ,不過檢測節點崩潰須要更長的時間。

5. auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下(因消費者長時間失效,包含偏移量的記錄已通過時井被刪除)該做何處理。它的默認值是latest, 意 思是說,在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據(在消費者 啓動之 後生成的記錄)。另外一個值是 earliest,意思是說,在偏移量無效的狀況下,消費者將從 起始位置讀取分區的記錄。

6. enable.auto.commit

咱們稍後將介紹 幾種 不一樣的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true。爲了儘可能避免出現重複數據和數據丟失,能夠把它設爲 false,由本身控制什麼時候提交偏移量。若是把它設爲 true,還能夠經過配置 auto.commit.interval.mls 屬性來控制提交的頻率。

7. partition.assignment.strategy

咱們知道,分區會被分配給羣組裏的消費者。 PartitionAssignor 根據給定的消費者和主題,決定哪些分區應該被分配給哪一個消費者。 Kafka 有兩個默認的分配策略 。

- Range

該策略會把主題的若干個連續的分區分配給消費者。假設悄費者 C1 和消費者 C2 同時 訂閱了主題 T1 和主題 T2,井且每一個主題有 3 個分區。那麼消費者 C1 有可能分配到這 兩個主題的分區 0 和 分區 1,而消費者 C2 分配到這兩個主題 的分區 2。由於每一個主題 擁有奇數個分區,而分配是在主題內獨立完成的,第一個消費者最後分配到比第二個消費者更多的分區。只要使用了 Range策略,並且分區數量沒法被消費者數量整除,就會出現這種狀況。

- RoundRobin

該策略把主題的全部分區逐個分配給消費者。若是使用 RoundRobin 策略來給消費者 C1 和消費者 C2分配分區,那麼消費者 C1 將分到主題 T1 的分區 0和分區 2以及主題 T2 的分區 1,消費者 C2 將分配到主題 T1 的分區 l 以及主題T2 的分區 0和分區 2。通常 來講,若是全部消費者都訂閱相同的主題(這種狀況很常見), RoundRobin策略會給所 有消費者分配相同數量 的分區(或最多就差一個分區)。

能夠經過設置 partition.assignment.strategy 來選擇分區策略。默認使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 這個類實現了 Range策略,不過也能夠 把它改爲 org.apache.kafka.clients.consumer.RoundRobinAssignor。咱們還可使用自定 義策略,在這種狀況下 , partition.assignment.strategy 屬性的值就是自定義類的名字。

8. client.id

該屬性能夠是任意字符串 , broker用它來標識從客戶端發送過來的消息,一般被用在日誌、度量指標和配額裏。

9. max.poll.records

該屬性用於控制單次調用 call() 方法可以返回的記錄數量,能夠幫你控制在輪詢裏須要處理的數據量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩衝區也能夠設置大小。若是它們被設爲-1,就使用操做系統的默認值。若是生產者或消費者與 broker處於不一樣的數據中心內,能夠適當增大這些值,由於跨數據中心的網絡通常都有 比較高的延遲和比較低的帶寬 。

做者注:歡迎關注筆者公號,按期分享IT互聯網、金融等工做經驗心得、人生感悟,歡迎交流,目前就任阿里-移動事業部,須要大廠內推的也可到公衆號砸簡歷,或查看我我的資料獲取。(公號ID:weknow619)。

相關文章
相關標籤/搜索