Kafka消費者APi

Kafka客戶端從集羣中消費消息,並透明地處理kafka集羣中出現故障服務器,透明地調節適應集羣中變化的數據分區。也和服務器交互,平衡均衡消費者。html

public class KafkaConsumer<K,V> extends Object implements Consumer<K,V> 

消費者TCP長鏈接到broker來拉取消息。故障致使的消費者關閉失敗,將會泄露這些鏈接,消費者不是線程安全的,能夠查看更多關於Multi-threaded(多線程)處理的細節。java

跨版本兼容性

該客戶端能夠與0.10.0或更新版本的broker集羣進行通訊。較早的版本可能不支持某些功能。例如,0.10.0broker不支持offsetsForTimes,由於此功能是在版本0.10.1中添加的。 若是你調用broker版本不可用的API時,將報 UnsupportedVersionException 異常。正則表達式

偏移量和消費者的位置

kafka爲分區中的每條消息保存一個偏移量(offset),這個偏移量是該分區中一條消息的惟一標示符。也表示消費者在分區的位置。例如,一個位置是5的消費者(說明已經消費了0到4的消息),下一個接收消息的偏移量爲5的消息。實際上有兩個與消費者相關的「位置」概念:數據庫

消費者的位置給出了下一條記錄的偏移量。它比消費者在該分區中看到的最大偏移量要大一個。 它在每次消費者在調用poll(long)中接收消息時自動增加。apache

「已提交」的位置是已安全保存的最後偏移量,若是進程失敗或從新啓動時,消費者將恢復到這個偏移量。消費者能夠選擇按期自動提交偏移量,也能夠選擇經過調用commit API來手動的控制(如:commitSync 和 commitAsync)。bootstrap

這個區別是消費者來控制一條消息何時才被認爲是已被消費的,控制權在消費者,下面咱們進一步更詳細地討論。緩存

消費者組和主題訂閱

Kafka的消費者組概念,經過進程池瓜分消息並處理消息。這些進程能夠在同一臺機器運行,也可分佈到多臺機器上,以增長可擴展性和容錯性,相同group.id的消費者將視爲同一個消費者組安全

分組中的每一個消費者都經過subscribe API動態的訂閱一個topic列表。kafka將已訂閱topic的消息發送到每一個消費者組中。並經過平衡分區在消費者分組中全部成員之間來達到平均。所以每一個分區剛好地分配1個消費者(一個消費者組中)。全部若是一個topic有4個分區,而且一個消費者分組有隻有2個消費者。那麼每一個消費者將消費2個分區。服務器

消費者組的成員是動態維護的:若是一個消費者故障。分配給它的分區將從新分配給同一個分組中其餘的消費者。一樣的,若是一個新的消費者加入到分組,將從現有消費者中移一個給它。這被稱爲從新平衡分組,並在下面更詳細地討論。當新分區添加到訂閱的topic時,或者當建立與訂閱的正則表達式匹配的新topic時,也將從新平衡。將經過定時刷新自動發現新的分區,並將其分配給分組的成員。網絡

從概念上講,你能夠將消費者分組看做是由多個進程組成的單一邏輯訂閱者。做爲一個多訂閱系統,Kafka支持對於給定topic任何數量的消費者組,而不重複。

這是在消息系統中常見的功能的略微歸納。全部進程都將是單個消費者分組的一部分(相似傳統消息傳遞系統中的隊列的語義),所以消息傳遞就像隊列同樣,在組中平衡。與傳統的消息系統不一樣的是,雖然,你能夠有多個這樣的組。但每一個進程都有本身的消費者組(相似於傳統消息系統中pub-sub的語義),所以每一個進程都會訂閱到該主題的全部消息。

此外,當分組從新分配自動發生時,能夠經過ConsumerRebalanceListener通知消費者,這容許他們完成必要的應用程序級邏輯,例如狀態清除,手動偏移提交等。有關更多詳細信息,請參閱Kafka存儲的偏移

它也容許消費者經過使用assign(Collection)手動分配指定分區,若是使用手動指定分配分區,那麼動態分區分配和協調消費者組將失效。

發現消費者故障

訂閱一組topic後,當調用poll(long)時,消費者將自動加入到組中。只要持續的調用poll,消費者將一直保持可用,並繼續從分配的分區中接收消息。此外,消費者向服務器定時發送心跳。 若是消費者崩潰或沒法在session.timeout.ms配置的時間內發送心跳,則消費者將被視爲死亡,而且其分區將被從新分配。

還有一種可能,消費可能遇到「活鎖」的狀況,它持續的發送心跳,可是沒有處理。爲了預防消費者在這種狀況下一直持有分區,咱們使用max.poll.interval.ms活躍檢測機制。 在此基礎上,若是你調用的poll的頻率大於最大間隔,則客戶端將主動地離開組,以便其餘消費者接管該分區。 發生這種狀況時,你會看到offset提交失敗(調用commitSync()引起的CommitFailedException)。這是一種安全機制,保障只有活動成員可以提交offset。因此要留在組中,你必須持續調用poll。

消費者提供兩個配置設置來控制poll循環:

  1. max.poll.interval.ms:增大poll的間隔,能夠爲消費者提供更多的時間去處理返回的消息(調用poll(long)返回的消息,一般返回的消息都是一批)。缺點是此值越大將會延遲組從新平衡。

  2. max.poll.records:此設置限制每次調用poll返回的消息數,這樣能夠更容易的預測每次poll間隔要處理的最大值。經過調整此值,能夠減小poll間隔,減小從新平衡分組的

對於消息處理時間不可預測地的狀況,這些選項是不夠的。 處理這種狀況的推薦方法是將消息處理移到另外一個線程中,讓消費者繼續調用poll。 可是必須注意確保已提交的offset不超過實際位置。另外,你必須禁用自動提交,並只有在線程完成處理後才爲記錄手動提交偏移量(取決於你)。 還要注意,你須要pause暫停分區,不會從poll接收到新消息,讓線程處理完以前返回的消息(若是你的處理能力比拉取消息的慢,那建立新線程將致使你機器內存溢出)。

示例

這個消費者API提供了靈活性,以涵蓋各類消費場景,下面是一些例子來演示如何使用它們。

自動提交偏移量

這是個【自動提交偏移量】的簡單的kafka消費者API。

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } 

設置enable.auto.commit,偏移量由auto.commit.interval.ms控制自動提交的頻率。

集羣是經過配置bootstrap.servers指定一個或多個broker。不用指定所有的broker,它將自動發現集羣中的其他的borker(最好指定多個,萬一有服務器故障)。

在這個例子中,客戶端訂閱了主題foobar。消費者組叫test

broker經過心跳機器自動檢測test組中失敗的進程,消費者會自動ping集羣,告訴進羣它還活着。只要消費者可以作到這一點,它就被認爲是活着的,並保留分配給它分區的權利,若是它中止心跳的時間超過session.timeout.ms,那麼就會認爲是故障的,它的分區將被分配到別的進程。

這個deserializer設置如何把byte轉成object類型,例子中,經過指定string解析器,咱們告訴獲取到的消息的key和value只是簡單個string類型。

手動控制偏移量

不須要定時的提交offset,能夠本身控制offset,當消息認爲已消費過了,這個時候再去提交它們的偏移量。這個頗有用的,當消費的消息結合了一些處理邏輯,這個消息就不該該認爲是已經消費的,直到它完成了整個處理。

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); final int minBatchSize = 200; List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { buffer.add(record); } if (buffer.size() >= minBatchSize) { insertIntoDb(buffer); consumer.commitSync(); buffer.clear(); } } 

在這個例子中,咱們將消費一批消息並將它們存儲在內存中。當咱們積累足夠多的消息後,咱們再將它們批量插入到數據庫中。若是咱們設置offset自動提交(以前說的例子),消費將被認爲是已消費的。這樣會出現問題,咱們的進程可能在批處理記錄以後,但在它們被插入到數據庫以前失敗了。

爲了不這種狀況,咱們將在相應的記錄插入數據庫以後再手動提交偏移量。這樣咱們能夠準確控制消息是成功消費的。提出一個相反的可能性:在插入數據庫以後,可是在提交以前,這個過程可能會失敗(即便這可能只是幾毫秒,這是一種可能性)。在這種狀況下,進程將獲取到已提交的偏移量,並會重複插入的最後一批數據。這種方式就是所謂的「至少一次」保證,在故障狀況下,能夠重複。

若是您沒法執行這些操做,可能會使已提交的偏移超過消耗的位置,從而致使缺乏記錄。 使用手動偏移控制的優勢是,您能夠直接控制記錄什麼時候被視爲「已消耗」。

注意:使用自動提交也能夠「至少一次」。可是要求你必須下次調用poll(long)以前或關閉消費者以前,處理完全部返回的數據。若是操做失敗,這將會致使已提交的offset超過消費的位置,從而致使丟失消息。使用手動控制offset的有點是,你能夠直接控制消息什麼時候提交。、

上面的例子使用commitSync表示全部收到的消息爲」已提交",在某些狀況下,你能夠但願更精細的控制,經過指定一個明確消息的偏移量爲「已提交」。在下面,咱們的例子中,咱們處理完每一個分區中的消息後,提交偏移量。

try { while(running) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } 

注意:已提交的offset應始終是你的程序將讀取的下一條消息的offset。所以,調用commitSync(offsets)時,你應該加1個到最後處理的消息的offset。

訂閱指定的分區

在前面的例子中,咱們訂閱咱們感興趣的topic,讓kafka提供給咱們平分後的topic分區。可是,在有些狀況下,你可能須要本身來控制分配指定分區,例如:

  • 若是這個消費者進程與該分區保存了某種本地狀態(如本地磁盤的鍵值存儲),則它應該只能獲取這個分區的消息。

  • 若是消費者進程自己具備高可用性,而且若是它失敗,會自動從新啓動(可能使用集羣管理框架如YARN,Mesos,或者AWS設施,或做爲一個流處理框架的一部分)。 在這種狀況下,不須要Kafka檢測故障,從新分配分區,由於消費者進程將在另外一臺機器上從新啓動。

要使用此模式,,你只需調用assign(Collection)消費指定的分區便可:

String topic = "foo"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1)); 

一旦手動分配分區,你能夠在循環中調用poll(跟前面的例子同樣)。消費者分組仍須要提交offset,只是如今分區的設置只能經過調用assign修改,由於手動分配不會進行分組協調,所以消費者故障不會引起分區從新平衡。每個消費者是獨立工做的(即便和其餘的消費者共享GroupId)。爲了不offset提交衝突,一般你須要確認每個consumer實例的gorupId都是惟一的。

注意,手動分配分區(即,assgin)和動態分區分配的訂閱topic模式(即,subcribe)不能混合使用。

offset存儲在其餘地方

消費者能夠不使用kafka內置的offset倉庫。能夠選擇本身來存儲offset。要注意的是,將消費的offset和結果存儲在同一個的系統中,用原子的方式存儲結果和offset,但這不能保證原子,要想消費是徹底原子的,並提供的「正好一次」的消費保證比kafka默認的「至少一次」的語義要更高。你須要使用kafka的offset提交功能。

這有結合的例子。

  • 若是消費的結果存儲在關係數據庫中,存儲在數據庫的offset,讓提交結果和offset在單個事務中。這樣,事物成功,則offset存儲和更新。若是offset沒有存儲,那麼偏移量也不會被更新。

  • 若是offset和消費結果存儲在本地倉庫。例如,能夠經過訂閱一個指定的分區並將offset和索引數據一塊兒存儲來構建一個搜索索引。若是這是以原子的方式作的,常見的多是,即便崩潰引發未同步的數據丟失。索引程序從它確保沒有更新丟失的地方恢復,而僅僅丟失最近更新的消息。

每一個消息都有本身的offset,因此要管理本身的偏移,你只須要作到如下幾點:

  • 配置 enable.auto.commit=false

  • 使用提供的 ConsumerRecord 來保存你的位置。

  • 在重啓時用 seek(TopicPartition, long) 恢復消費者的位置。

當分區分配也是手動完成的(像上文搜索索引的狀況),這種類型的使用是最簡單的。 若是分區分配是自動完成的,須要特別當心處理分區分配變動的狀況。能夠經過調用subscribe(Collection,ConsumerRebalanceListener)subscribe(Pattern,ConsumerRebalanceListener)中提供的ConsumerRebalanceListener實例來完成的。例如,當分區向消費者獲取時,消費者將經過實現ConsumerRebalanceListener.onPartitionsRevoked(Collection)來給這些分區提交它們offset。當分區分配給消費者時,消費者經過ConsumerRebalanceListener.onPartitionsAssigned(Collection)爲新的分區正確地將消費者初始化到該位置。

ConsumerRebalanceListener的另外一個常見用法是清除應用已移動到其餘位置的分區的緩存。

控制消費的位置

大多數狀況下,消費者只是簡單的從頭至尾的消費消息,週期性的提交位置(自動或手動)。kafka也支持消費者去手動的控制消費的位置,能夠消費以前的消息也能夠跳過最近的消息。

有幾種狀況,手動控制消費者的位置多是有用的。

一種場景是對於時間敏感的消費者處理程序,對足夠落後的消費者,直接跳過,從最近的消費開始消費。

另外一個使用場景是本地狀態存儲系統(上一節說的)。在這樣的系統中,消費者將要在啓動時初始化它的位置(不管本地存儲是否包含)。一樣,若是本地狀態已被破壞(假設由於磁盤丟失),則能夠經過從新消費全部數據並從新建立狀態(假設kafka保留了足夠的歷史)在新的機器上從新建立。

kafka使用seek(TopicPartition, long)指定新的消費位置。用於查找服務器保留的最先和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和 seekToEnd(Collection))。

消費者流量控制

若是消費者分配了多個分區,並同時消費全部的分區,這些分區具備相同的優先級。在一些狀況下,消費者須要首先消費一些指定的分區,當指定的分區有少許或者已經沒有可消費的數據時,則開始消費其餘分區。

例如流處理,當處理器從2個topic獲取消息並把這兩個topic的消息合併,當其中一個topic長時間落後另外一個,則暫停消費,以便落後的遇上來。

kafka支持動態控制消費流量,分別在future的poll(long)中使用pause(Collection)resume(Collection) 來暫停消費指定分配的分區,從新開始消費指定暫停的分區。

多線程處理

Kafka消費者不是線程安全的。全部網絡I/O都發生在進行調用應用程序的線程中。用戶的責任是確保多線程訪問正確同步的。非同步訪問將致使ConcurrentModificationException。

此規則惟一的例外是wakeup(),它能夠安全地從外部線程來中斷活動操做。在這種狀況下,將從操做的線程阻塞並拋出一個WakeupException。這可用於從其餘線程來關閉消費者。 如下代碼段顯示了典型模式:

public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // Handle new records } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } } 

在單獨的線程中,能夠經過設置關閉標誌和喚醒消費者來關閉消費者。

closed.set(true); consumer.wakeup(); 

咱們沒有多線程模型的例子。但留下幾個操做可用來實現多線程處理消息。

  1. 每一個線程一個消費者

    每一個線程本身的消費者實例。這裏是這種方法的優勢和缺點:

    • PRO: 這是最容易實現的
    • PRO: 由於它不須要在線程之間協調,因此一般它是最快的。
    • PRO: 它按順序處理每一個分區(每一個線程只處理它接受的消息)。
    • CON: 更多的消費者意味着更多的TCP鏈接到集羣(每一個線程一個)。通常kafka處理鏈接很是的快,因此這是一個小成本。
    • CON: 更多的消費者意味着更多的請求被髮送到服務器,但稍微較少的數據批次可能致使I/O吞吐量的一些降低。
    • CON: 全部進程中的線程總數受到分區總數的限制。
  2. 解耦消費和處理

    另外一個替代方式是一個或多個消費者線程,它來消費全部數據,其消費全部數據並將ConsumerRecords實例切換到由實際處理記錄處理的處理器線程池來消費的阻塞隊列。這個選項一樣有利弊:

    • PRO: 可擴展消費者和處理進程的數量。這樣單個消費者的數據可分給多個處理器線程來執行,避免對分區的任何限制。
    • CON: 跨多個處理器的順序保證須要特別注意,由於線程是獨立的執行,後來的消息可能比遭到的消息先處理,這僅僅是由於線程執行的運氣。若是對排序沒有問題,這就不是個問題。
    • CON: 手動提交變得更困難,由於它須要協調全部的線程以確保處理對該分區的處理完成。

這種方法有多種玩法,例如,每一個處理線程能夠有本身的隊列,消費者線程可使用TopicPartitionhash到這些隊列中,以確保按順序消費,而且提交也將簡化。

做者:半獸人 連接:http://orchome.com/451 來源:OrcHome 著做權歸做者全部。商業轉載請聯繫做者得到受權,非商業轉載請註明出處。
相關文章
相關標籤/搜索