Kafka若是丟了消息,怎麼處理的?

來源:https://blog.dogchao.cn/?p=305html

Kafka存在丟消息的問題,消息丟失會發生在Broker,Producer和Consumer三種。java

Broker

Broker丟失消息是因爲Kafka自己的緣由形成的,kafka爲了獲得更高的性能和吞吐量,將數據異步批量的存儲在磁盤中。消息的刷盤過程,爲了提升性能,減小刷盤次數,kafka採用了批量刷盤的作法。即,按照必定的消息量,和時間間隔進行刷盤。這種機制也是因爲linux操做系統決定的。將數據存儲到linux操做系統種,會先存儲到頁緩存(Page cache)中,按照時間或者其餘條件進行刷盤(從page cache到file),或者經過fsync命令強制刷盤。數據在page cache中時,若是系統掛掉,數據會丟失。linux

Broker在linux服務器上高速讀寫以及同步到Replicaweb

上圖簡述了broker寫數據以及同步的一個過程。broker寫數據只寫到PageCache中,而pageCache位於內存。這部分數據在斷電後是會丟失的。pageCache的數據經過linux的flusher程序進行刷盤。刷盤觸發條件有三:ajax

  • 主動調用sync或fsync函數
  • 可用內存低於閥值
  • dirty data時間達到閥值。dirty是pagecache的一個標識位,當有數據寫入到pageCache時,pagecache被標註爲dirty,數據刷盤之後,dirty標誌清除。

Broker配置刷盤機制,是經過調用fsync函數接管了刷盤動做。從單個Broker來看,pageCache的數據會丟失。數據庫

Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實現,實現原理是將異步刷盤的流程進行阻塞,等待響應,相似ajax的callback或者是java的future。下面是一段rocketmq的源碼。apache

GroupCommitRequest request = new  GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤

也就是說,理論上,要徹底讓kafka保證單個broker不丟失消息是作不到的,只能經過調整刷盤機制的參數緩解該狀況。好比,減小刷盤間隔,減小刷盤數據量大小。時間越短,性能越差,可靠性越好(儘量可靠)。這是一個選擇題。bootstrap

爲了解決該問題,kafka經過producer和broker協同處理單個broker丟失參數的狀況。一旦producer發現broker消息丟失,便可自動進行retry。除非retry次數超過閥值(可配置),消息纔會丟失。此時須要生產者客戶端手動處理該狀況。那麼producer是如何檢測到數據丟失的呢?是經過ack機制,相似於http的三次握手的方式。緩存

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.服務器

http://kafka.apache.org/20/documentation.html

以上的引用是kafka官方對於參數acks的解釋(在老版本中,該參數是request.required.acks)。

  • acks=0,producer不等待broker的響應,效率最高,可是消息極可能會丟。

  • acks=1,leader broker收到消息後,不等待其餘follower的響應,即返回ack。也能夠理解爲ack數爲1。此時,若是follower尚未收到leader同步的消息leader就掛了,那麼消息會丟失。按照上圖中的例子,若是leader收到消息,成功寫入PageCache後,會返回ack,此時producer認爲消息發送成功。但此時,按照上圖,數據尚未被同步到follower。若是此時leader斷電,數據會丟失。

  • acks=-1,leader broker收到消息後,掛起,等待全部ISR列表中的follower返回結果後,再返回ack。-1等效與all。這種配置下,只有leader寫入數據到pagecache是不會返回ack的,還須要全部的ISR返回「成功」纔會觸發ack。若是此時斷電,producer能夠知道消息沒有被髮送成功,將會從新發送。若是在follower收到數據之後,成功返回ack,leader斷電,數據將存在於原來的follower中。在從新選舉之後,新的leader會持有該部分數據。數據從leader同步到follower,須要2步:

    1. 數據從pageCache被刷盤到disk。由於只有disk中的數據才能被同步到replica。
    2. 數據同步到replica,而且replica成功將數據寫入PageCache。在producer獲得ack後,哪怕是全部機器都停電,數據也至少會存在於leader的磁盤內。

上面第三點提到了ISR的列表的follower,須要配合另外一個參數才能更好的保證ack的有效性。ISR是Broker維護的一個「可靠的follower列表」,in-sync Replica列表,broker的配置包含一個參數:min.insync.replicas。該參數表示ISR中最少的副本數。若是不設置該值,ISR中的follower列表可能爲空。此時至關於acks=1。

如上圖中:

  • acks=0,總耗時f(t) = f(1)。
  • acks=1,總耗時f(t) = f(1) + f(2)。
  • acks=-1,總耗時f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能依次遞減,可靠性依次升高。

Producer

Producer丟失消息,發生在生產者客戶端。

爲了提高效率,減小IO,producer在發送數據時能夠將多個請求進行合併後發送。被合併的請求咋發送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤相似,producer能夠將請求打包成「塊」或者按照時間間隔,將buffer中的數據發出。經過buffer咱們能夠將生產者改造爲異步的方式,而這能夠提高咱們的發送效率。

可是,buffer中的數據就是危險的。在正常狀況下,客戶端的異步調用能夠經過callback來處理消息發送失敗或者超時的狀況,可是,一旦producer被非法的中止了,那麼buffer中的數據將丟失,broker將沒法收到該部分數據。又或者,當Producer客戶端內存不夠時,若是採起的策略是丟棄消息(另外一種策略是block阻塞),消息也會被丟失。抑或,消息產生(異步產生)過快,致使掛起線程過多,內存不足,致使程序崩潰,消息丟失。

producer

根據上圖,能夠想到幾個解決的思路:

  • 異步發送消息改成同步發送消。或者service產生消息時,使用阻塞的線程池,而且線程數有必定上限。總體思路是控制消息產生速度。
  • 擴大Buffer的容量配置。這種方式能夠緩解該狀況的出現,但不能杜絕。
  • service不直接將消息發送到buffer(內存),而是將消息寫到本地的磁盤中(數據庫或者文件),由另外一個(或少許)生產線程進行消息發送。至關因而在buffer和service之間又加了一層空間更加富裕的緩衝層。

Consumer

Consumer消費消息有下面幾個步驟:

  1. 接收消息
  2. 處理消息
  3. 反饋「處理完畢」(commited)

Consumer的消費方式主要分爲兩種:

  • 自動提交offset,Automatic Offset Committing
  • 手動提交offset,Manual Offset Control

Consumer自動提交的機制是根據必定的時間間隔,將收到的消息進行commit。commit過程和消費消息的過程是異步的。也就是說,可能存在消費過程未成功(好比拋出異常),commit消息已經提交了。此時消息就丟失了。

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id""test");
// 自動提交開關
props.put("enable.auto.commit""true");
// 自動提交的時間間隔,此處是1s
props.put("auto.commit.interval.ms""1000");
props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo""bar"));
while (true) {
        // 調用poll後,1000ms後,消息狀態會被改成 committed
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records)
  insertIntoDB(record); // 將消息入庫,時間可能會超過1000ms
}

上面的示例是自動提交的例子。若是此時,insertIntoDB(record)發生異常,消息將會出現丟失。接下來是手動提交的例子:

Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id""test");
// 關閉自動提交,改成手動提交
props.put("enable.auto.commit""false");
props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo""bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
        // 調用poll後,不會進行auto commit
 ConsumerRecords<String, String> records = consumer.poll(100);
 for (ConsumerRecord<String, String> record : records) {
  buffer.add(record);
 }
 if (buffer.size() >= minBatchSize) {
  insertIntoDb(buffer);
                // 全部消息消費完畢之後,才進行commit操做
  consumer.commitSync();
  buffer.clear();
 }
}

將提交類型改成手動之後,能夠保證消息「至少被消費一次」(at least once)。但此時可能出現重複消費的狀況,重複消費不屬於本篇討論範圍。

上面兩個例子,是直接使用Consumer的High level API,客戶端對於offset等控制是透明的。也能夠採用Low level API的方式,手動控制offset,也能夠保證消息不丟,不過會更加複雜。

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             // 精確控制offset
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }




  點擊加入【技術交流羣


本文分享自微信公衆號 - 肥朝(feichao_java)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索