[Kafka] [All about it]

Overview

  • 設計目標:
    • 以O(1) 常數級時間複雜度的訪問性能,提供消息持久化能力。
    • 高吞吐率
    • 支持 kafka server 間的消息分區,及分佈式消費,同時保證每一個partition內部的消息順序傳輸
    • scale out:支持在線水平擴展。
  • 爲什麼使用消息系統:
    • 解耦
    • 冗餘(持久化)
    • 擴展性
    • 順序保證
    • 緩衝
    • 異步通訊
  • 經常使用message queue對比
    • RabbitMQ: 重量級
    • redis:基於 k-v 對的NoSQL數據庫,但自己支持MQ功能,能夠做爲一個輕量級的隊列服務使用。

發佈訂閱

  • 拓撲:推拉結合。

Zookeeper

 

 

常數級訪問性能

  •  ref2

 kafka文件存儲機制

  • 一個partition爲一個文件夾
  • segment
    • segment的意義:把topic中一個大文件分紅多個小文件段,就容易按期清除已消費完的文件。
    • partition內部segment爲一個個文件,segment命令方式爲在文件後加上上一個segment的最後offset值。
    • 物理結構上,一個segment file由兩大部分組成,分別爲index file和data file,這兩個文件一一對應。
      • index索引文件: 存儲 "offset --> 物理偏移"。index文件採用的是稀疏存儲的方式,每隔必定字節的數據創建一條索引,從而避免索引文件佔用過多的空間,從而能夠將索引保留在內存中。不過沒有索引的數據間隔中還須要一次順序掃描,不過範圍很小。
      • log數據文件
  • 而每條消息長這樣
  • 如何在partition中經過offset查找message
    1. 查找segment file:根據offset二分查找,可快速定位到具體文件
    2. 經過segment file查找message

 

高吞吐率

  • 每一條消息都是被append到partiton中,屬於順序寫磁盤,所以效率很是高。
  • 基於partition,producer會根據partition機制選擇將其存儲到哪個partition,不一樣的消息能夠並行寫入不一樣partition。

 

Delivery Guarantee

  • consumer端:consumer在從broker讀取消息後,能夠選擇commit,該操做會在zk中保存該partition中讀取消息的offset。能夠設置爲auto commit。
    • 能夠看到,這一過程當中,數據處理與commit的順序會決定消息從broker到consumer的delivery guarantee semantic。
      1. 讀完消息先commit再處理:若是commit以後還未處理時consumer crash,就屬於At most once。
      2. 處理完消息再commit: 若是處理完以後crash,就屬於At least once。
  • kafka's guarantees are stronger in 3 ways:
    • Idempotent producer
    • Transactions
    • Exactly-once stream processing

Idempotent producer

  • 因爲producer重發數據形成的duplicates:在kafka 0.11.0 後基於producer 冪等性解決。解決方案是:Producer ID + <topic, partition> 做爲一個相似主鍵的東西解決。
  • 實現方式:相似於TCP,發送到kafka的每批消息將包含一個序列號,該序列號用於重複數據的刪除。與TCP不一樣的是,TCP只能在transient in-memory中提供保證。而序列號將被持久化存儲到topic中,所以即便leader replica失敗,接管的任何其餘broker也能感知到消息是否重複。並且這種機制開銷至關低,只需在每批消息中添加幾個額外的字段。
  • 優勢:
    • works transparently -- only one config change
    • sequence numbers and producer ids are in the log
    • resilient to broker failuers, producer retries, etc.
    • 開銷低,只需在每批消息中添加幾個額外的字段。

Transactions

  • Atomic writes across partitions
  • 在一個事務內的消息要麼所有可見,要麼全都不可見。
  • consumers必須被配置成,可跳過未提交的消息。
  • transaction api
    producer.initTransactions();
    try {
      producer.beginTransaction();
      producer.send(record1);
      producer.send(record2);
      producer.commitTransaction();
    } catch(ProducerFencedException e) {
      producer.close();
    } catch(KafkaException e) {
      producer.abortTransaction();
    }
  • producers須要使用新的producer API for transactions。
  • consumers須要可以過濾掉 uncommited or aborted transactional messages。

Exactly-once stream processing

  • Exactly-once stream processing across read-process-write tasks.
  • 基於producer冪等性 和 事務原子性,經過Streams API實現 exactly-once 流處理成爲可能。

+ Spark Streaming

  • kafka + spark streaming 的應用場景很是常見。
  • 整個系統對exactly once的保證,歷來都不是靠系統中某一部分來實現就能搞定的,須要整個流式系統共同努力。
  • spark streaming部分的exactly once的實現:使用WAL實現(注意不是checkpoint和replication,這二者是failover機制,不是專門解決exactly once的)。
  • 輸出操做對exactly once的實現:須要輸出結果保證冪等性 or 提供事務支持。參見官方文檔:
    • In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets(See Semantics of output operations in the main programming guide for further information).
    • Exactly-once with Idempotent writes:
      • 若是屢次寫產生的數據同樣,那麼這個輸出操做就是冪等的。好比saveAsTextFile就是一個典型的冪等的更新。好比messages with unique keys can be written to database without duplication.
      • 實現:
        • set enable.auto.commit = false。缺省狀況下,kafka DStream將會在收到數據後立刻commit the consumer offsets。咱們但願推遲這個操做直到the batch被徹底處理掉。(這樣能夠實現at least once)
        • 打開spark streaming的checkpointing來存儲kafka offsets。可是若是應用程序代碼改變了,checkpointed data是不可重用的,所以有second option以下:
        • commit kafka offsets after outputs。kafka提供一個 commitAsync API,以及 HasOffsetRanges 類也能夠被用來從initialRDD中提取offsets。
          messages.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter =>
              // output to database
            }
            messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
          }
    • Exactly-once with transactional writes:
      • transactional updates 須要一個unique identifier,咱們可使用batch time,partition id or kafka offsets來當作identifier,而後把結果 along with identifier一塊兒在同一個事務中寫入external storage。
      • 這個原子操做能夠提供exactly-once語義。若是offsets更新失敗,或者經過 offset != $fromOffset 檢測到duplicate offset,那麼整個事務就會rollback,這也就包含智能了exactly-once語義。
  • SS提供了三種storing offsets的方法,以下三種方法按順序reliability是遞增的,但同時code complexity也是遞增的。
    • checkpoints
      • 缺點:
        • output operation必須是冪等的,否則會有repeated outputs。
        • 若是你的程序代碼更改過,那麼就沒法從checkpoint處recover。 [這裏的緣由是:checkpointing刷到外部存儲的是類Checkpoint對象序列化後的數據。那麼在Spark Streaming Application從新編譯後,再去反序列化checkpointing的數據就會失敗。這個時候就必須新建SparkContext。]
    • kafka itself:kafka有一個offset commit API 用來存儲offsets。缺省狀況下,the new consumers會週期性地auto-commit offsets。可是很明顯,你pull過來的數據不必定來得及被你消費,就會resulting in undefined semantics。
      • 所以,咱們可使用 commitAsync API,來確保處理完數據以後再commit。
      • 缺點:kafka is not transactional,所以你的輸出仍然須要冪等。
    • Your own data store
      • 對於支持事務的data store,咱們能夠在同一個事務中保存offsets和results,從而保證二者in sync,即便是在failure的狀況下。
      • 若是你仔細地檢查repeated or skipped offset ranges,那麼就能夠經過回滾事務來防止重複or丟失數據。這就保證了exactly-once
  • Spark streaming failover的實現,主要三種方式:
    • checkpoint:在driver實現,用於在driver崩潰後,恢復driver的現場。
    • replication:在receiver中用於解決單臺executor掛掉後,未保存的數據丟失的問題。
    • WAL:在driver和receiver中實現,用於解決:
      • driver掛掉,全部executor都會掛掉,那麼全部未保存的數據都會丟失,replication就無論用了
      • driver掛掉後,哪些block在掛掉前註冊到了driver中,以及掛掉前哪些block分配給了當前正在運行的batch job,這些信息就都丟失了。因此須要WAL對這些信息作持久化
  • Conlusion:exactly-once在stream processing中是一個很強的語義,它會不可避免地給你的程序帶來一些開銷,影響吞吐量。並且不適用於windowed operations。

Kafka HA

  • 多replica
    • 數據一致性
      • 只有leader直接與producer & consumer交互,其餘replica做爲follower從leader複製數據。(這也減小了保證數據一致性的工做,不然須要保證replica之間有N*N條數據通路進行數據同步)。
      • in-sync replica(ISR): leader所追蹤的與其保持同步的replica列表。若是一個follower宕機(經過與zk之間的session來斷定)or落後太多(可配置的條數),leader將其從ISR中移除。
      • 複製機制:kafka的複製機制既不是徹底的同步複製,也不是單純的異步複製。
        • 同步複製:要求全部能工做的follower都複製完,該消息纔會commit,極大地影響了吞吐率。
        • 異步複製:只要被leader寫入log就認爲已被commit,若是leader宕機就會丟失數據。
      • 這也引出了問題:
        • 如何propagate消息
          1. producer首先經過zk找到該partition的leader
          2. producer將消息發送給該partition的leader
          3. leader將消息寫入本地log
          4. 每一個follower都從leader pull數據
          5. follower收到消息(並寫入log後),向leader發ack。(爲了提升性能,每一個follower在接收到數據後便ack,所以對已經commit的消息,只能保證已被存於多個replica的內存中。顯然,這是爲了性能作了必定犧牲的。)
          6. 一旦leader收到全部replica的ack,該消息就被認爲已經commit了,leader將增長HW並向producer發送ack。(這裏有一個問題是leader須要收到多少個follower的ack就向producer返回ack,以下)
        • 在向producer發送ACK以前須要保證多少個replica已經收到該消息
        • 怎樣處理某個replica不工做的狀況
        • 怎樣處理failed replica恢復回來的狀況
    • leader selection
      這部分的難點就在於,由於follower可能落後許多或者crash了,因此必須確保選擇"最新"的follower做爲新的leader。基本的原則就是新的leader必須擁有原來的leader commit過的全部消息
      • 一個經常使用的leader selection方式是"Majority Vote"
        • 若是有 2f + 1 個replica,那麼在commit以前必須保證有f + 1 個replica複製完消息。爲了保證正確的選出新的leader,fail的replica不能超過f個。
        • 劣勢:所容忍的fail的follower個數比較少。(e.g: 若是要容忍1個follower掛掉,必需要有5個以上的replica)。
      • 其餘經常使用的leader selection算法包括:zk的ZAB,Raft和Viewstamped Replication
      • kafka採用的算法:在zk中動態維護了一個ISR,該ISR裏的全部replica都跟上了leader,只有其內的成員纔有被選爲leader的可能。
    • 如何分配replica:
      • 要求:
        • load balance
        • 容錯能力:replica不能都在一個機器上
      • 分配replica算法
        1. 將broker(共n個)和待分配的partition排序
        2. 將第i個partition分配到第 (i mod n) 個broker上
        3. 將第i個partition的第j個replica分配到第 (i + j) mod n 個broker上
  • broker failover
    • controller在zk的 /brokers/ids 節點上註冊watch。 (一旦broker宕機,zk對於的znode會自動被刪除:ephemeral node,zk會fire controller註冊的watch,controller便可獲取最新的倖存的Broker列表)
    • controller決定set_p,該集合包含了宕機的全部broker上的全部partition。
    • 對set_p中的每個partition:
      • 從 /brokers/topics/[topic]/partitions/[partition]/state 讀取該Partition當前的ISR
      • 決定該partition的新leader。若當前ISR中至少有一個replica還倖存,則選擇其中一個做爲新的leader,新的ISR則包含當前ISR中全部倖存的replica。不然選擇該partition中任意一個倖存的replica做爲新leader以及ISR(顯然,該場景下可能會有潛在的數據丟失)。
      • 將新的leader,ISR和新的 leader_epoch 及 controller_epoch 寫入 /brokers/topics/[topic]/partitions/[partition]/state。
    •  直接經過RPC向set_p相關的Broker發送 LeaderAndISRRequest 命令。controller能夠在一個RPC操做中發送多個命令從而提升效率。
  •  因爲上述controller的引入,所以也須要 Controller Failover:
    • 每一個broker都會在controller path (/controller) 上註冊一個watch。當前controller失敗時,watch被fire,全部alive的broker都會去競選成爲新的controller,由zk保證只有一個競選成功。

 

Consumer Design

High level consumer

  • 適用於client只但願從kafka讀取數據,不太關心消息offset的處理。
  • High level consumer將從某個partition讀取的最後一條消息的offset存於zk中(kafka從0.8.2版本開始同時支持將offset存於zk中,與將offset存於專用的kafka topic中)。
  • 這個offset基於client提供的 consumer group 來保存。
  • 注意,consumer group是整個kafka集羣共享的,而非某個topic的。
  • 在consumer group內部,某個partition的數據只會被某一個特定的consumer實例所消費。每一個consumer實例能夠消費一個或多個特定partition的數據。
    • 劣勢:沒法保證同一個consumer group裏的consumer均勻消費數據。
    • 優點:1. 每一個consumer不用跟大量的broker通訊,減小通訊開銷,同時下降了分配難度,實現也更簡單。 2. 另外,每一個partition內部數據是有序的,這種設計能夠保持有序消費。
  • consumer rebalance
    • 算法:
      • 將目標topic下的全部partition排序,存於Pt
      • 對某個consumer group下全部的consumer排序,存於Cg,第i個consumer記爲Ci
      • N = size(Pt) / size(Cg),向上取整
      • 解除Ci對原來分配的partition的消費權
      • 將第 i * N 到 i * (i + 1) * N - 1 個partition分配給Ci
    • 實現:是由每個consumer經過在zk上註冊watch完成的。每一個consumer被建立時會觸發consumer group的rebalance。

Low Level Consumer

  • 適合於用戶但願更好的控制數據消費的場景,好比:
    • 同一條消息讀屢次
    • 只讀取某個topic的部分partition
    • 管理事務,從而確保每條消息被處理且僅被處理一次。
  •  low level consumer所須要的額外工做:
    • 必須在應用程序中追蹤offset,從而肯定下一條應該消費哪條數據
    • 應用程序須要經過程序獲知每一個partition的leader是誰
    • 必須處理leader的變化
  • 通常的使用流程:
    • 查找一個alive broker,而且找出每一個partition的leader
    • 找出每一個partition的follower
    • 定義好請求,該請求應該能描述應用程序須要哪些數據
    • fetch數據
    • 識別leader變化,並對之作出必要的響應

 

高性能

宏觀架構層面

  • 利用partition實現並行處理
    • 組織架構:topic只是一個邏輯概念,每一個topic都包含一個或多個partiiton,不一樣partition可位於不一樣節點。同時partition在物理上對應一個本地文件夾,每一個partition包含一個or多個segment,每一個segment包含一個數據文件和一個與之對應的索引文件。在邏輯上,能夠把一個partition當作一個很是長的數組,可經過這個數組的索引(offset)去訪問其數據
    • 關於並行:
      • 一方面,因爲不一樣的partition可位於不一樣的機器,所以能夠充分利用集羣優點,實現機器間的並行處理。
      • 另外一方面,因爲partition在物理上對應一個文件夾,即便多個partition位於同一個節點,也能夠經過配置讓同一個節點的不一樣partition置於不一樣的disk driver上,從而實現磁盤間的並行處理,充分發揮多磁盤的優點。
    • partition是最小併發粒度
  • ISR實現可用性與數據一致性的動態平衡:考慮CAP理論。
    • 常見數據複製及一致性方案
      • Master-slave:
        • RDBMS的讀寫分離即爲典型的master-slave方案
        • 同步複製可保證強一致性但會影響可用性
        • 異步複製可提供高可用但會下降一致性
      • WNR
        • 主要用於去中心化的分佈式系統中。
        • N表明副本總數,W表明每次寫操做要保證的最少寫成功副本數,R表明每次讀至少要讀取的副本數。
        • 當W + R > N 時,可保證每次讀取的數據至少有一個副本擁有最新的數據
        • 多個寫操做的順序難以保證,可能致使多副本間的寫操做順序不一致。
      • Paxos及其變種
        • Google的Chubby,zk的Zab,Raft等
      • 基於ISR的數據複製方案 (kafka)
        • 既非徹底的同步複製,也不是徹底的異步複製,而是基於ISR的動態複製方案
        • ISR是由Leader動態維護的。若是follower不能緊跟上leader,它將被leader從ISR中移除,直到從新跟上後再次被加入ISR。
        • ISR會在每次改變時持久化到zk中。
        • 如何判斷是否跟上?
          • 對0.8.*版本,若是follower在 replica.lag.time.max.ms 時間內未向leader發送fetch請求,則會被移除。而即便某follower持續向leader發送fetch請求,follower與leader的數據差距在replcia.lag.max.messages以上,也會被移除。 
          • 從0.9.0.0開始,replcia.lag.max.messages被移除。同時leader會考慮follower是否在時間內與之保持一致。
        • 使用ISR的緣由
          • 與同步複製相比,可避免最慢的follower拖慢總體速度,提升了系統可用性
          • ISR中全部的follower都包含了全部commit過的消息,而只有Commit過的消息纔會被consumer消費,故從consume扔的角度而言,ISR中的全部replica都始終處於同步狀態,從而與異步複製方案相比提升了數據一致性。
          • ISR可動態調整,極限狀況下,能夠只包含leader,極大提升了可容忍的宕機的follower個數。

具體實現層面

  • 高效使用磁盤:順序寫磁盤
    • 設計上,partition至關於一個很是長的數組,consumer經過offset順序消費這些數據,而且不刪除已經消費的數據。
    • 刪除過程,並不是使用"讀-寫"模式去修改文件,而是將partition分爲多個segment,每一個segment對應一個物理文件,經過刪除整個文件的方式去刪除partition內的數據。
  • 充分使用Page Cache
    • 好處以下
      • I/O Scheduler 會將連續的小塊寫組裝成大塊的物理寫從而提升性能
      • I/O Scheduler 會嘗試將一些寫操做從新按順序排好,從而減小磁盤頭的移動時間
      • 充分利用全部空閒內存(非JVM內存)。若是使用應用層Cache(即JVM堆內存),會增長GC負擔
      • 讀操做可直接在Page Cache內進行。若是消費和生產速度至關,甚至不須要經過物理磁盤(直接經過Page Cache)交換數據
      • 若是進程重啓,JVM內的Cache會失效,但Page Cache仍然可用
  • 支持多disk drive
    • broker的 log.dirs 配置項,容許配置多個文件夾。若是機器上有多個disk drive,可將不一樣的disk掛載到不一樣目錄。
  • 零拷貝
    • kafka中存在大量的網絡數據持久化到磁盤 和 磁盤文件經過網絡發送的過程。這一過程的性能直接影響kafka的總體吞吐量。
  • 減小網絡開銷

 

Reference

  1. 系列文章
  2. 美團-kafka文件存儲那些事
  3. kafka exactly-once
  4. kafka+SparkStreaming
  5. kafka-streaming integration
相關文章
相關標籤/搜索