消息丟失與重複

RQ&KA

若是說你這個是用 MQ 來傳遞很是核心的消息,好比說計費、扣費的一些消息,那必須確保這個 MQ 傳遞過程當中絕對不會把計費消息給弄丟git

RabbitMQ

生產者弄丟了數據

生產者將數據發送到 RabbitMQ 的時候,可能數據就在半路給搞丟了,由於網絡問題啥的,都有可能。github

此時能夠選擇用 RabbitMQ 提供的事務功能,就是生產者發送數據以前開啓 RabbitMQ 事務channel.txSelect,而後發送消息,若是消息沒有成功被 RabbitMQ 接收到,那麼生產者會收到異常報錯,此時就能夠回滾事務channel.txRollback,而後重試發送消息;若是收到了消息,那麼能夠提交事務channel.txCommitapi

// 開啓事務
channel.txSelect
try {
    // 這裏發送消息
} catch (Exception e) {
    channel.txRollback

    // 這裏再次重發這條消息
}

// 提交事務
channel.txCommit

可是問題是,RabbitMQ 事務機制(同步)一搞,基本上吞吐量會下來,由於太耗性能服務器

因此通常來講,若是你要確保說寫 RabbitMQ 的消息別丟,能夠開啓 confirm 模式,在生產者那裏設置開啓 confirm 模式以後,你每次寫的消息都會分配一個惟一的 id,而後若是寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了。若是 RabbitMQ 沒能處理這個消息,會回調你的一個 nack 接口,告訴你這個消息接收失敗,你能夠重試。並且你能夠結合這個機制本身在內存裏維護每一個消息 id 的狀態,若是超過必定時間還沒接收到這個消息的回調,那麼你能夠重發。網絡

事務機制和 cnofirm 機制最大的不一樣在於,事務機制是同步的,你提交一個事務以後會阻塞在那兒,可是 confirm 機制是異步的,你發送個消息以後就能夠發送下一個消息,而後那個消息 RabbitMQ 接收了以後會異步回調你的一個接口通知你這個消息接收到了。架構

因此通常在生產者這塊避免數據丟失,都是用 confirm 機制的。併發

RabbitMQ 弄丟了數據

就是 RabbitMQ 本身弄丟了數據,這個你必須開啓 RabbitMQ 的持久化,就是消息寫入以後會持久化到磁盤,哪怕是 RabbitMQ 本身掛了,恢復以後會自動讀取以前存儲的數據,通常數據不會丟。除非極其罕見的是,RabbitMQ 還沒持久化,本身就掛了,可能致使少許數據丟失,可是這個機率較小。app

設置持久化有兩個步驟異步

  • 建立 queue 的時候將其設置爲持久化

    這樣就能夠保證 RabbitMQ 持久化 queue 的元數據,可是它是不會持久化 queue 裏的數據的。
  • 第二個是發送消息的時候將消息的 deliveryMode 設置爲 2

    就是將消息設置爲持久化的,此時 RabbitMQ 就會將消息持久化到磁盤上去。

必需要同時設置這兩個持久化才行,RabbitMQ 哪怕是掛了,再次重啓,也會從磁盤上重啓恢復 queue,恢復這個 queue 裏的數據。async

注意,哪怕是你給 RabbitMQ 開啓了持久化機制,也有一種可能,就是這個消息寫到了 RabbitMQ 中,可是還沒來得及持久化到磁盤上,結果不巧,此時 RabbitMQ 掛了,就會致使內存裏的一點點數據丟失。

因此,持久化能夠跟生產者那邊的 confirm 機制配合起來,只有消息被持久化到磁盤以後,纔會通知生產者 ack 了,因此哪怕是在持久化到磁盤以前,RabbitMQ 掛了,數據丟了,生產者收不到 ack,你也是能夠本身重發的。

消費端弄丟了數據

RabbitMQ 若是丟失了數據,主要是由於你消費的時候,剛消費到,還沒處理,結果進程掛了,好比重啓了,那麼就尷尬了,RabbitMQ 認爲你都消費了,這數據就丟了。

這個時候得用 RabbitMQ 提供的 ack 機制,簡單來講,就是你必須關閉 RabbitMQ 的自動 ack,能夠經過一個 api 來調用就行,而後每次你本身代碼裏確保處理完的時候,再在程序裏 ack 一把。這樣的話,若是你還沒處理完,不就沒有 ack 了?那 RabbitMQ 就認爲你還沒處理完,這個時候 RabbitMQ 會把這個消費分配給別的 consumer 去處理,消息是不會丟的。




Kafka

消費端弄丟了數據

惟一可能致使消費者弄丟數據的狀況,就是說,你消費到了這個消息,而後消費者那邊自動提交了 offset,讓 Kafka 覺得你已經消費好了這個消息,但其實你纔剛準備處理這個消息,你還沒處理,你本身就掛了,此時這條消息就丟咯。

這不是跟 RabbitMQ 差很少嗎,你們都知道 Kafka 會自動提交 offset,那麼只要關閉自動提交 offset,在處理完以後本身手動提交 offset,就能夠保證數據不會丟。可是此時確實仍是可能會有重複消費,好比你剛處理完,還沒提交 offset,結果本身掛了,此時確定會重複消費一次,本身保證冪等性就行了。

生產環境碰到的一個問題,就是說咱們的 Kafka 消費者消費到了數據以後是寫到一個內存的 queue 裏先緩衝一下,結果有的時候,你剛把消息寫入內存 queue,而後消費者會自動提交 offset。而後此時咱們重啓了系統,就會致使內存 queue 裏還沒來得及處理的數據就丟失了。

Kafka 弄丟了數據

這塊比較常見的一個場景,就是 Kafka 某個 broker 宕機,而後從新選舉 partition 的 leader。你們想一想,要是此時其餘的 follower 恰好還有些數據沒有同步,結果此時 leader 掛了,而後選舉某個 follower 成 leader 以後,不就少了一些數據?這就丟了一些數據啊。

生產環境也遇到過,咱們也是,以前 Kafka 的 leader 機器宕機了,將 follower 切換爲 leader 以後,就會發現說這個數據就丟了。

因此此時通常是要求起碼設置以下 4 個參數:

  • 給 topic 設置 replication.factor 參數:這個值必須大於 1,要求每一個 partition 必須有至少 2 個副本。
  • 在 Kafka 服務端設置 min.insync.replicas 參數:這個值必須大於 1,這個是要求一個 leader 至少感知到有至少一個 follower 還跟本身保持聯繫,沒掉隊,這樣才能確保 leader 掛了還有一個 follower 吧。
  • 在 producer 端設置 acks=all:這個是要求每條數據,必須是寫入全部 replica 以後,才能認爲是寫成功了
  • 在 producer 端設置 retries=MAX(很大很大很大的一個值,無限次重試的意思):這個是要求一旦寫入失敗,就無限重試,卡在這裏了。

咱們生產環境就是按照上述要求配置的,這樣配置以後,至少在 Kafka broker 端就能夠保證在 leader 所在 broker 發生故障,進行 leader 切換時,數據不會丟失。

生產者會不會弄丟數據?

若是按照上述的思路設置了 acks=all,必定不會丟,要求是,你的 leader 接收到消息,全部的 follower 都同步到了消息以後,才認爲本次寫成功了。若是沒知足這個條件,生產者會自動不斷的重試,重試無限次。

本文原創地址:https://jsbintask.cn/2019/01/28/interview/interview-middleware-reliable/,轉載請註明出處。


Kafka中的消息是否會丟失和重複消費


原文:https://blog.csdn.net/u012050154/article/details/78592854

在以前的基礎上,基本搞清楚了Kafka的機制及如何運用。這裏思考一下:Kafka中的消息會不會丟失或重複消費呢?爲何呢?

        要肯定Kafka的消息是否丟失或重複,從兩個方面分析入手:消息發送和消息消費

一、消息發送

         Kafka消息發送有兩種方式:同步(sync)和異步(async),默認是同步方式,可經過producer.type屬性進行配置。Kafka經過配置request.required.acks屬性來確認消息的生產:
 

    0---表示不進行消息接收是否成功的確認;

    1---表示當Leader接收成功時確認;

    -1---表示Leader和Follower都接收成功時確認;

綜上所述,有6種消息生產的狀況,下面分狀況來分析消息丟失的場景:

 

(1)acks=0,不和Kafka集羣進行消息接收確認,則當網絡異常、緩衝區滿了等狀況時,消息可能丟失;

(2)acks=一、同步模式下,只有Leader確認接收成功後但掛掉了,副本沒有同步,數據可能丟失;

二、消息消費

        Kafka消息消費有兩個consumer接口,Low-level API和High-level API:

 

Low-level API:消費者本身維護offset等值,能夠實現對Kafka的徹底控制;

High-level API:封裝了對parition和offset的管理,使用簡單;

 

若是使用高級接口High-level API,可能存在一個問題就是當消息消費者從集羣中把消息取出來、並提交了新的消息offset值後,還沒來得及消費就掛掉了,那麼下次再消費時以前沒消費成功的消息就「詭異」的消失了;    

解決辦法:

        針對消息丟失:同步模式下,確認機制設置爲-1,即讓消息寫入Leader和Follower以後再確認消息發送成功;異步模式下,爲防止緩衝區滿,能夠在配置文件設置不限制阻塞超時時間,當緩衝區滿時讓生產者一直處於阻塞狀態;

        針對消息重複:將消息的惟一標識保存到外部介質中,每次消費時判斷是否處理過便可。

Kafka的Leader選舉機制

        Kafka將每一個Topic進行分區Patition,以提升消息的並行處理,同時爲保證高可用性,每一個分區都有必定數量的副本 Replica,這樣當部分服務器不可用時副本所在服務器就能夠接替上來,保證系統可用性。在Leader上負責讀寫,Follower負責數據的同步。當一個Leader發生故障如何從Follower中選擇新Leader呢?

        Kafka在Zookeeper上針對每一個Topic都維護了一個ISR(in-sync replica---已同步的副本)的集合,集合的增減Kafka都會更新該記錄。若是某分區的Leader不可用,Kafka就從ISR集合中選擇一個副本做爲新的Leader。這樣就能夠容忍的失敗數比較高,假如某Topic有N+1個副本,則能夠容忍N個服務器不可用。

        若是ISR中副本都不可用,有兩種處理方法:

    (1)等待ISR集合中副本復活後選擇一個可用的副本;

    (2)選擇集羣中其餘可用副本;

解決RabbitMQ消息丟失與重複消費問題

1. 背景

最近用戶反饋提交的SQL查詢一直處於長時間等待狀態,通過排查觀察,發現部分查詢請求丟失,致使用戶提交的查詢未被正常接收,繼而長時間無響應。

現象:集市SQL控制檯提交10個簡單SQL查詢 -> 消息發送方:發送10條消息至消息隊列 -> 消息消費方:只消費了7條消息

2. 現狀

2.1. 當前SQL查詢的總體流程

    生產者:PHP:
        將用戶的SQL查詢記錄在DB表,標識查詢任務狀態(f_status)爲運行中;
        將DB表中的任務id、提交人等信息發送到RabbitMQ;
    消息隊列:RabbitMQ:
        PHP消息提交到了交換機;
        交換機再把消息分發給指定的消息隊列;
    消費者:Python:
        主進程監聽消息隊列,一旦有消息就不停拉取;
        拉取一條消息,就從進程池調起一個空閒進程來處理消息;
        隨後反饋ACK給消息隊列,將消息從消息隊列中移除;

2.2. 消息發送方:Web端

結論:消息發送正常
排查步驟:查看log

2.3. 消息隊列

結論:消息數量正常
診斷步驟:
執行機安裝rabbitmq-dump-queue插件,用於dump隊列的消息;
1. 執行機:中止服務;
2. 用戶:提交10個SQL查詢:
3. 發送方:查看Web服務端的輸出日誌,肯定10個消息已經往消息隊列寫;
4. 執行機:經過rabbitmq-dump-queue查看隊列的消息,確認是正常10個消息寫入;

watch -n 1 '$GOPATH/src/rabbitmq-dump-queue/rabbitmq-dump-queue -uri="amqp://guest:guest@xxxxx:5672" -queue ph_open_task'

5. 執行機:啓動服務,消息隊列中的消息所有被接收;

2.4. 消息接收方

代碼邏輯:

try:
    pool = Pool(processes=40)

    def callback(ch, method, properties, body):
        try:
            doSomething...
            pool.apply_async(process)
        except Exception as e:
            print traceback.format_exc()
            logger_msg.info(traceback.format_exc())
        finally:
            // 這裏會有問題,即便消息未被處理也會反饋ACK給RabbitMQ
            ch.basic_ack(delivery_tag=method.delivery_tag)

    while True:
        try:
            connection = pika.BlockingConnection(
                pika.ConnectionParameters(host='xxxxxxxx'))
            channel = connection.channel()
            channel.queue_declare(queue=queue_name, durable=True)
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(callback, queue=queue_name, no_ack=False)
            channel.start_consuming()
        except pika.exceptions.ConnectionClosed as e:
            continue
except Exception as e:
    logger_msg.info(traceback.format_exc())
finally:
    channel.basic_ack(delivery_tag=method.delivery_tag)

    pool.close()
    pool.join()

結論:本例中消費者主進程將持續監聽MQ,一旦MQ有消息將會拉取,隨後從進程池中啓動子進程來處理消息,可是從進程池啓動子進程的過程並不必定成功(若當前進程池沒有空閒子進程),而主進程無論任何狀況下都給MQ發送ACK狀態碼,從而MQ將未處理的消息移除掉,致使消息丟失

3. 方案

問題是在消費者環節產生,所以對消費者作改動,須要調整消費者的架構:
* 原來邏輯:使用進程池技術,主進程負責監聽、接收MQ的消息,子進程負責執行MQ的消息,缺點是單一的主進程沒法簡單處理ACK狀態碼,不易維護;
* 現有邏輯:使用RabbitMQ自身特性(work_queue),消費者再也不維護進程池,是單進程,負責監聽、接收、處理MQ的消息,處理完了之後再反饋ACK狀態碼,進程與進程之間互不干擾,易維護,併發量大時可隨時增長消費者進程;

目前方案的問題以及解決方案:

    • 問題1:消息重複消費
      描述:用戶在頁面中止查詢時,會致使消費者進程被殺死,所以ACK狀態碼未反饋至MQ,從而消息一直存留在MQ中,當新的消費者啓動時會從新消費;
      解決方案:消費者每次執行查詢前,首先在DB上查詢任務的執行狀態,若處於「取消/失敗/成功」則表示已經由其它消費者消費過,那麼直接返回ACK狀態碼給MQ,將消息從MQ中移除;

    • 問題2:進程池如何維護? 描述:用戶在頁面中止查詢時,會致使消費者進程被殺死,致使消費者數量減小; 解決方案:維護一個監控腳本,每分鐘輪詢消費者進程數,若少於40個進程,則新啓動一個消費者,直到數量足夠;

相關文章
相關標籤/搜索