大數據框架exactly-once底層實現原理,看這篇文章就夠了

1、大數據框架三種語義‍‍‍‍

在分佈式系統中,如kafka、spark、flink等構成系統的任何節點都是被定義爲能夠彼此獨立失敗的。好比在 Kafka 中,broker 可能會 crash,在 producer 推送數據至 topic 的過程當中也可能會遇到網絡問題。根據 producer 處理此類故障所採起的提交策略類型,有以下三種(以kafka爲例):算法

at-least-once:若是 producer 收到來自 Kafka broker 的確認(ack)或者 acks = all,則表示該消息已經寫入到 Kafka。但若是 producer ack 超時或收到錯誤,則可能會重試發送消息,客戶端會認爲該消息未寫入 Kafka。若是 broker 在發送 Ack 以前失敗,但在消息成功寫入 Kafka 以後,此重試將致使該消息被寫入兩次,所以消息會被不止一次地傳遞給最終 consumer,這種策略可能致使重複的工做和不正確的結果。數據庫

at-most-once:若是在 ack 超時或返回錯誤時 producer 不重試,則該消息可能最終不會寫入 Kafka,所以不會傳遞給 consumer。在大多數狀況下,這樣作是爲了不重複的可能性,業務上必須接收數據傳遞可能的丟失。安全

exactly-once:即便 producer 重試發送消息,消息也會保證最多一次地傳遞給最終consumer。該語義是最理想的,但也難以實現,由於它須要消息系統自己與生產和消費消息的應用程序進行協做。網絡

2、大數據框架故障階段(kafka爲例)

理想情況,網絡良好,代碼沒有錯誤,則 Kafka 能夠保證 exactly-once,但生產環境錯綜複雜,故障幾乎沒法避免,主要有:app

1 框架自身故障(Broker):Kafka 做爲一個高可用、持久化系統,保證每條消息被持久化而且冗餘多份(假設是 n 份),因此 Kafka 能夠容忍 n-1 個 broker 故障,意味着一個分區只要至少有一個 broker 可用,分區就可用。Kafka 的副本協議保證了只要消息被成功寫入了主副本,它就會被複制到其餘全部的可用副本(ISR)。框架

2 客戶端發送框架失敗(Producer 到 Broker 的 RPC):Kafka 的持久性依賴於生產者接收broker 的 ack 。沒有接收成功 ack 不表明生產請求自己失敗了。broker 可能在寫入消息後,發送 ack 給生產者的時候掛了,甚至 broker 也可能在寫入消息前就掛了。因爲生產者沒有辦法知道錯誤是什麼形成的,因此它就只能認爲消息沒寫入成功,而且會重試發送。在一些狀況下,這會形成一樣的消息在 Kafka 分區日誌中重複,進而形成消費端屢次收到這條消息。分佈式

3 客戶端也失敗(Producer):Exactly-once delivery 也必須考慮客戶端失敗的狀況。可是如何去區分客戶端是真的掛了(永久性宕機)仍是說只是暫時丟失心跳?追求正確性的話,broker 應該丟棄由 zombie producer 發送的消息。 consumer 也是如此,一旦新的客戶端實例已經啓動,它必須可以從失敗實例的任何狀態中恢復,並從安全點( safe checkpoint )開始處理,這意味着消費的偏移量必須始終與生成的輸出保持同步。ide

4 框架發送消費端失敗(Broker到 Consumer 的 RPC)性能

3、Exactly-once底層實現原理

3.一、依賴業務控制大數據

  • 對生產者:

每一個分區只有一個生產者寫入消息,當出現異常或超時,生產者查詢此分區最後一個消息,用於決定後續操做時重傳仍是繼續發送。

爲每一個消息增長惟一主鍵,生產者不作處理,由消費者根據主鍵去重。

  • 對消費者:

關閉自動提交 offset 的功能,不使用 Offsets Topic 這個內部 Topic 記錄其 offset,而是由消費者自動保存 offset。將 offset 和消息處理放在一個事務裏面,事務執行成功認爲消息被消費,不然事務回滾須要從新處理。當出現消費者重啓或者 Rebalance 操做,能夠從數據庫找到對應的 offset,而後調用 KafkaConsumer.seek() 設置消費者位置,今後 offset 開始消費。

3.二、依賴 Kafka

3.2.一、冪等性:每一個分區中精確一次且有序(Idempotence: Exactly-once in order semantics per partition)

Kafka 在0.11.0.0以前的版本中只支持 At Least Once 和 At Most Once 語義,尚不支持 Exactly Once 語義。

Kafka 0.11.0.0版本引入了冪等語義。 一個冪等性的操做就是一種被執行屢次形成的影響和只執行一次形成的影響同樣的操做。

若是出現致使生產者重試的錯誤,一樣的消息,仍由一樣的生產者發送屢次,將只被寫到 Kafka broker 的日誌中一次。

對於單個分區,冪等生產者不會由於生產者或 broker 故障而產生多條重複消息。

想要開啓這個特性,得到每一個分區內的精確一次語義,也就是說沒有重複,沒有丟失,而且有序的語義,只須要 producer 配置 enable.idempotence=true。

這個特性是怎麼實現的呢?每一個新的 Producer 在初始化的時候會被分配一個惟一的 PID,該PID對用戶徹底透明而不會暴露給用戶。在底層,它和 TCP 的工做原理有點像,每一批發送到 Kafka 的消息都將包含 PID 和一個從 0 開始單調遞增序列號。

Broker 將使用這個序列號來刪除重複的發送。和只能在瞬態內存中的鏈接中保證不重複的 TCP 不一樣,這個序列號被持久化到副本日誌,因此,即便分區的 leader 掛了,其餘的 broker 接管了leader,新 leader 仍能夠判斷從新發送的是否重複了。這種機制的開銷很是低:每批消息只有幾個額外的字段。這種特性比非冪等的生產者只增長了可忽略的性能開銷。

若是消息序號比 Broker 維護的序號大 1 以上,說明中間有數據還沒有寫入,也即亂序,此時 Broker 拒絕該消息。

若是消息序號小於等於 Broker 維護的序號,說明該消息已被保存,即爲重複消息,Broker直接丟棄該消息。

總結來講,producer 端發送消息時,生成全局惟一自增pid,和broker中數據的pid進行對比,多則刪除,少則通知producer端從新發送。

3.2.二、事務:跨分區原子寫入

上述冪等設計只能保證單個 Producer 對於同一個 <Topic, Partition> 的 Exactly Once 語義。

Kafka 如今經過新的事務 API 支持跨分區原子寫入。這將容許一個生產者發送一批到不一樣分區的消息,這些消息要麼所有對任何一個消費者可見,要麼對任何一個消費者都不可見。這個特性也容許在一個事務中處理消費數據和提交消費偏移量,從而實現端到端的精確一次語義。

爲了實現這種效果,應用程序必須提供一個穩定的(重啓後不變)惟一的 ID,也即Transaction ID 。 Transactin ID 與 PID 可能一一對應。區別在於 Transaction ID 由用戶提供,將生產者的 transactional.id 配置項設置爲某個惟一ID。而 PID 是內部的實現對用戶透明。

另外,爲了保證新的 Producer 啓動後,舊的具備相同 Transaction ID 的 Producer 失效,每次 Producer 經過 Transaction ID 拿到 PID 的同時,還會獲取一個單調遞增的 epoch。因爲舊的 Producer 的 epoch 比新 Producer 的 epoch 小,Kafka 能夠很容易識別出該 Producer 是老的 Producer 並拒絕其請求。

有了Transaction ID後,Kafka可保證:

  • 跨Session的數據冪等發送。當具備相同Transaction ID的新的Producer實例被建立且工做時,舊的且擁有相同Transaction ID的Producer將再也不工做。

  • 跨Session的事務恢復。若是某個應用實例宕機,新的實例能夠保證任何未完成的舊的事務要麼Commit要麼Abort,使得新實例從一個正常狀態開始工做。

須要注意的是,上述的事務保證是從Producer的角度去考慮的。從Consumer的角度來看,該保證會相對弱一些。尤爲是不能保證全部被某事務Commit過的全部消息都被一塊兒消費,由於:

  • 對於壓縮的Topic而言,同一事務的某些消息可能被其它版本覆蓋

  • 事務包含的消息可能分佈在多個Segment中(即便在同一個Partition內),當老的Segment被刪除時,該事務的部分數據可能會丟失

  • Consumer在一個事務內可能經過seek方法訪問任意Offset的消息,從而可能丟失部分消息

  • Consumer可能並不須要消費某一事務內的全部Partition,所以它將永遠不會讀取組成該事務的全部消息

4、事務中Offset的提交

許多基於Kafka的應用,尤爲是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取消息,後者負責將處理完的數據寫回Kafka的其它Topic中。

爲了實現該場景下的事務的原子性,Kafka須要保證對Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。不然,若是在兩者Commit中間發生異常,根據兩者Commit的順序可能會形成數據丟失和數據重複:

  • 若是先Commit Producer發送數據的事務再Commit Consumer的Offset,即At Least Once語義,可能形成數據重複。

  • 若是先Commit Consumer的Offset,再Commit Producer數據發送事務,即At Most Once語義,可能形成數據丟失。

5、分佈式事務經見實現機制

5.1 兩階段提交

Kafka的事務機制與《分佈式事務(一)兩階段提交及JTA》一文中所介紹的兩階段提交機制看似類似,都分PREPARE階段和最終COMMIT階段,但又有很大不一樣。

  • Kafka事務機制中,PREPARE時即要指明是PREPARE_COMMIT仍是PREPARE_ABORT而且只須在Transaction Log中標記便可,無須其它組件參與。而兩階段提交的PREPARE須要發送給全部的分佈式事務參與方,而且事務參與方須要儘量準備好,並根據準備狀況返回Prepared或Non-Prepared狀態給事務管理器。

  • ​Kafka事務中,一但發起PREPARE_COMMIT或PREPARE_ABORT則肯定該事務最終的結果應該是被COMMIT或ABORT。而分佈式事務中,PREPARE後由各事務參與方返回狀態,只有全部參與方均返回Prepared狀態纔會真正執行COMMIT,不然執行ROLLBACK

  • Kafka事務機制中,某幾個Partition在COMMIT或ABORT過程當中變爲不可用,隻影響該Partition不影響其它Partition。兩階段提交中,若惟一收到COMMIT命令參與者Crash,其它事務參與方沒法判斷事務狀態從而使得整個事務阻塞

  • Kafka事務機制引入事務超時機制,有效避免了掛起的事務影響其它事務的問題

  • Kafka事務機制中存在多個Transaction Coordinator實例,而分佈式事務中只有一個事務管理器

兩階段提交原理

二階段提交的算法思路能夠歸納爲:協調者詢問參與者是否準備好了提交,並根據全部參與者的反饋狀況決定向全部參與者發送commit或者rollback指令(協調者向全部參與者發送相同的指令)。

所謂的兩個階段是指

  • 準備階段

     又稱投票階段。在這一階段,協調者詢問全部參與者是否準備好提交,參與者若是已經準備好提交則回覆Prepared,不然回覆Non-Prepared。

  • 提交階段

    又稱執行階段。協調者若是在上一階段收到全部參與者回覆的Prepared,則在此階段向全部參與者發送commit指令,全部參與者當即執行commit操做;不然協調者向全部參與者發送rollback指令,參與者當即執行rollback操做。

5.2 Zookeeper

Zookeeper的原子廣播協議與兩階段提交以及Kafka事務機制有類似之處,但又有各自的特色

  • Kafka事務可COMMIT也可ABORT。而Zookeeper原子廣播協議只有COMMIT沒有ABORT。固然,Zookeeper不COMMIT某消息也即等效於ABORT該消息的更新。

  • Kafka存在多個Transaction Coordinator實例,擴展性較好。而Zookeeper寫操做只能在Leader節點進行,因此其寫性能遠低於讀性能。

  • Kafka事務是COMMIT仍是ABORT徹底取決於Producer即客戶端。而Zookeeper原子廣播協議中某條消息是否被COMMIT取決因而否有一大半FOLLOWER ACK該消息。

編輯搜圖

 

請點擊輸入圖片描述(最多18字)

相關文章
相關標籤/搜索