Kafka原理總結

Kafka

Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源 項目。
 
1.前言
消息隊列的性能好壞,其文件存儲機制設計是衡量一個消息隊列服務技術水平和最關鍵指標之一。下面將從Kafka文件存儲機制和物理結構角度,分析Kafka是如何實現高效文件存儲,及實際應用效果。
 
 1.1  Kafka的特性:
- 高吞吐量、低延遲:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒,每一個topic能夠分多個partition, consumer group 對partition進行consume操做。
- 可擴展性:kafka集羣支持熱擴展
- 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失
- 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗)
- 高併發:支持數千個客戶端同時讀寫
 
1.2   Kafka的使用場景:
- 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
- 消息系統:解耦和生產者和消費者、緩存消息等。
- 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
- 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
- 流式處理:好比spark streaming和storm
- 事件源
 
1.3  Kakfa的設計思想
- Kakfa Broker Leader的選舉:Kakfa Broker集羣受Zookeeper管理。全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。這個Controller會監聽其餘的Kafka Broker的全部信息,若是這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時全部的kafka broker又會一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上全部的partition在zookeeper上的狀態,並選取ISR列表中的一個replica做爲partition leader(若是ISR列表中的replica全掛,選一個倖存的replica做爲leader; 若是該partition的全部的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica「活」過來,而且選它做爲Leader;或選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其餘的kafka broker。
 
這裏曾經發生過一個bug,TalkingData使用Kafka0.8.1的時候,kafka controller在Zookeeper上註冊成功後,它和Zookeeper通訊的timeout時間是6s,也就是若是kafka controller若是有6s中沒有和Zookeeper作心跳,那麼Zookeeper就認爲這個kafka controller已經死了,就會在Zookeeper上把這個臨時節點刪掉,那麼其餘Kafka就會認爲controller已經沒了,就會再次搶着註冊臨時節點,註冊成功的那個kafka broker成爲controller,而後,以前的那個kafka controller就須要各類shut down去關閉各類節點和事件的監聽。可是當kafka的讀寫流量都很是巨大的時候,TalkingData的一個bug是,因爲網絡等緣由,kafka controller和Zookeeper有6s中沒有通訊,因而從新選舉出了一個新的kafka controller,可是原來的controller在shut down的時候老是不成功,這個時候producer進來的message因爲Kafka集羣中存在兩個kafka controller而沒法落地。致使數據淤積。
 
這裏曾經還有一個bug,TalkingData使用Kafka0.8.1的時候,當ack=0的時候,表示producer發送出去message,只要對應的kafka broker topic partition leader接收到的這條message,producer就返回成功,無論partition leader 是否真的成功把message真正存到kafka。當ack=1的時候,表示producer發送出去message,同步的把message存到對應topic的partition的leader上,而後producer就返回成功,partition leader異步的把message同步到其餘partition replica上。當ack=all或-1,表示producer發送出去message,同步的把message存到對應topic的partition的leader和對應的replica上以後,才返回成功。可是若是某個kafka controller 切換的時候,會致使partition leader的切換(老的 kafka controller上面的partition leader會選舉到其餘的kafka broker上),可是這樣就會致使丟數據。
 
-  Consumergroup:各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,若是一個message能夠被多個consumer(consumer 線程)消費的話,那麼這些consumer必須在不一樣的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啓動一個新的consumer group。因此若是想同時對一個topic作消費的話,啓動多個consumer group就能夠了,可是要注意的是,這裏的多個consumer的消費都必須是順序讀取partition裏面的message,新啓動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣能夠多個BET做爲consumer去互斥的(for update悲觀鎖)併發處理message,這是由於多個BET去消費一個Queue中的數據的時候,因爲要保證不能多個線程拿同一條message,因此就須要行級別悲觀所(for update),這就致使了consume的性能降低,吞吐量不夠。而kafka爲了保證吞吐量,只容許同一個consumer group下的一個consumer線程去訪問一個partition。若是以爲效率不高的時候,能夠加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。若是想多個不一樣的業務都須要這個topic的數據,起多個consumer group就行了,你們都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就造成了分佈式消費的概念。
    當啓動一個consumer group去消費一個topic的時候,不管topic裏面有多個少個partition,不管咱們consumer group裏面配置了多少個consumer thread,這個consumer group下面的全部consumer thread必定會消費所有的partition;即使這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費全部的partition。所以,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。
    同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不可以一個consumer group的多個consumer同時消費一個partition。
    一個consumer group下,不管有多少個consumer,這個consumer group必定回去把這個topic下全部的partition都消費了。當consumer group裏面的consumer數量小於這個topic下的partition數量的時候,以下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的狀況,總之是這個topic下的partition都會被消費。若是consumer group裏面的consumer數量等於這個topic下的partition數量的時候,以下圖groupC,此時效率是最高的,每一個partition都有一個consumer thread去消費。當consumer group裏面的consumer數量大於這個topic下的partition數量的時候,以下圖GroupD,就會有一個consumer thread空閒。所以,咱們在設定consumer group的時候,只須要指明裏面有幾個consumer數量便可,無需指定對應的消費partition序號,consumer會自動進行rebalance。
    多個Consumer Group下的consumer能夠消費同一條message,可是這種消費也是以o(1)的方式順序的讀取message去消費,,因此必定會重複消費這批message的,不能向AMQ那樣多個BET做爲consumer消費(對message加鎖,消費的時候不能重複消費message)
- Consumer Rebalance的觸發條件:(1)Consumer增長或刪除會觸發 Consumer Group的Rebalance(2)Broker的增長或者減小都會觸發 Consumer Rebalance
- Consumer: Consumer處理partition裏面的message的時候是o(1)順序讀取的。因此必須維護着上一次讀到哪裏的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由本身維護。通常來講都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。
通常狀況下,必定是一個consumer group處理一個topic的message。Best Practice是這個consumer group裏面consumer的數量等於topic裏面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。若是這個consumer group裏面consumer的數量小於topic裏面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,咱們不用指定),可是總之這個topic裏面的全部partition都會被處理到的。。若是這個consumer group裏面consumer的數量大於topic裏面partition的數量,多出的consumer thread就會閒着啥也不幹,剩下的是一個consumer thread處理一個partition,這就形成了資源的浪費,由於一個partition不可能被兩個consumer thread去處理。因此咱們線上的分佈式多個service服務,每一個service裏面的kafka consumer數量都小於對應的topic的partition數量,可是全部服務的consumer數量只和等於partition的數量,這是由於分佈式service服務的全部consumer都來自一個consumer group,若是來自不一樣的consumer group就會處理重複的message了(同一個consumer group下的consumer不能處理同一個partition,不一樣的consumer group能夠處理同一個topic,那麼都是順序處理message,必定會處理重複的。通常這種狀況都是兩個不一樣的業務邏輯,纔會啓動兩個consumer group來處理一個topic)。
 
若是producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增長topic下的partition,同時增長這個consumer group下的consumer。
                 
- Delivery Mode : Kafka producer 發送message不用維護message的offsite信息,由於這個時候,offsite就至關於一個自增id,producer就儘管發送message就行了。並且Kafka與AMQ不一樣,AMQ大都用在處理業務邏輯上,而Kafka大都是日誌,因此Kafka的producer通常都是大批量的batch發送message,向這個topic一次性發送一大批message,load balance到一個partition上,一塊兒插進去,offsite做爲自增id本身增長就好。可是Consumer端是須要維護這個partition當前消費到哪一個message的offsite信息的,這個offsite信息,high level api是維護在Zookeeper上,low level api是本身的程序維護。(Kafka管理界面上只能顯示high level api的consumer部分,由於low level api的partition offsite信息是程序本身維護,kafka是不知道的,沒法在管理界面上展現 )當使用high level api的時候,先拿message處理,再定時自動commit offsite+1(也能夠改爲手動), 而且kakfa處理message是沒有鎖操做的。所以若是處理message失敗,此時尚未commit offsite+1,當consumer thread重啓後會重複消費這個message。可是做爲高吞吐量高併發的實時處理系統,at least once的狀況下,至少一次會被處理到,是能夠容忍的。若是沒法容忍,就得使用low level api來本身程序維護這個offsite信息,那麼想何時commit offsite+1就本身搞定了。
 
- Topic & Partition:Topic至關於傳統消息系統MQ中的一個隊列queue,producer端發送的message必須指定是發送到哪一個topic,可是不須要指定topic下的哪一個partition,由於kafka會把收到的message進行load balance,均勻的分佈在這個topic下的不一樣的partition上( hash(message) % [broker數量]  )。物理上存儲上,這個topic會分紅一個或多個partition,每一個partiton至關因而一個子queue。在物理結構上,每一個partition對應一個物理的目錄(文件夾),文件夾命名是[topicname]_[partition]_[序號],一個topic能夠有無數多的partition,根據業務需求和數據量來設置。在kafka配置文件中可隨時更高num.partitions參數來配置更改topic的partition數量,在建立Topic時經過參數指定parittion數量。Topic建立以後經過Kafka提供的工具也能夠修改partiton數量。
   通常來講,(1)一個Topic的Partition數量大於等於Broker的數量,能夠提升吞吐率。(2)同一個Partition的Replica儘可能分散到不一樣的機器,高可用。
  當add a new partition的時候,partition裏面的message不會從新進行分配,原來的partition裏面的message數據不會變,新加的這個partition剛開始是空的,隨後進入這個topic的message就會從新參與全部partition的load balance
- Partition Replica:每一個partition能夠在其餘的kafka broker節點上存副本,以便某個kafka broker節點宕機不會影響這個kafka集羣。存replica副本的方式是按照kafka broker的順序存。例若有5個kafka broker節點,某個topic有3個partition,每一個partition存2個副本,那麼partition1存broker1,broker2,partition2存broker2,broker3。。。以此類推(replica副本數目不能大於kafka broker節點的數目,不然報錯。這裏的replica數其實就是partition的副本總數,其中包括一個leader,其餘的就是copy副本)。這樣若是某個broker宕機,其實整個kafka內數據依然是完整的。可是,replica副本數越高,系統雖然越穩定,可是回來帶資源和性能上的降低;replica副本少的話,也會形成系統丟數據的風險。
  (1)怎樣傳送消息:producer先把message發送到partition leader,再由leader發送給其餘partition follower。(若是讓producer發送給每一個replica那就太慢了)
  (2)在向Producer發送ACK前須要保證有多少個Replica已經收到該消息:根據ack配的個數而定
  (3)怎樣處理某個Replica不工做的狀況:若是這個部工做的partition replica不在ack列表中,就是producer在發送消息到partition leader上,partition leader向partition follower發送message沒有響應而已,這個不會影響整個系統,也不會有什麼問題。若是這個不工做的partition replica在ack列表中的話,producer發送的message的時候會等待這個不工做的partition replca寫message成功,可是會等到time out,而後返回失敗由於某個ack列表中的partition replica沒有響應,此時kafka會自動的把這個部工做的partition replica從ack列表中移除,之後的producer發送message的時候就不會有這個ack列表下的這個部工做的partition replica了。 
  (4)怎樣處理Failed Replica恢復回來的狀況:若是這個partition replica以前不在ack列表中,那麼啓動後從新受Zookeeper管理便可,以後producer發送message的時候,partition leader會繼續發送message到這個partition follower上。若是這個partition replica以前在ack列表中,此時重啓後,須要把這個partition replica再手動加到ack列表中。(ack列表是手動添加的,出現某個部工做的partition replica的時候自動從ack列表中移除的)
- Partition leader與follower:partition也有leader和follower之分。leader是主partition,producer寫kafka的時候先寫partition leader,再由partition leader push給其餘的partition follower。partition leader與follower的信息受Zookeeper控制,一旦partition leader所在的broker節點宕機,zookeeper會衝其餘的broker的partition follower上選擇follower變爲parition leader。
- Topic分配partition和partition replica的算法:(1)將Broker(size=n)和待分配的Partition排序。(2)將第i個Partition分配到第(i%n)個Broker上。(3)將第i個Partition的第j個Replica分配到第((i + j) % n)個Broker上
 
- 消息投遞可靠性
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都無論,發送出去就看成成功,這種狀況固然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當Master和全部Slave都接收到消息時,纔算投遞成功,這種模型提供了最高的投遞可靠性,可是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數狀況下都會中和可靠性和性能選擇第三種模型
  消息在broker上的可靠性,由於消息會持久化到磁盤上,因此若是正常stop一個broker,其上的數據不會丟失;可是若是不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這能夠經過配置flush頁面緩存的週期、閾值緩解,可是一樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際狀況配置。
  消息消費的可靠性,Kafka提供的是「At least once」模型,由於消息的讀取進度由offset提供,offset能夠由消費者本身維護也能夠維護在zookeeper裏,可是當消息消費後consumer掛掉,offset沒有即時寫回,就有可能發生重複讀的狀況,這種狀況一樣能夠經過調整commit offset週期、閾值緩解,甚至消費者本身把消費和commit offset作成一個事務解決,可是若是你的應用不在意重複消費,那就乾脆不要解決,以換取最大的性能。
 
- Partition ack:當ack=1,表示producer寫partition leader成功後,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其餘一個follower成功的時候,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer所有寫成功的時候,纔算成功,kafka broker才返回成功信息。這裏須要注意的是,若是ack=1的時候,一旦有個broker宕機致使partition的follower和leader切換,會致使丟數據。
  
- message狀態:在Kafka中,消息的狀態被保存在consumer中,broker不會關心哪一個消息被消費了被誰消費了,只記錄一個offset值(指向partition中下一個要被消費的消息位置),這就意味着若是consumer處理很差的話,broker上的一個消息可能會被消費屢次。
- message持久化:Kafka中會把消息持久化到本地文件系統中,而且保持o(1)極高的效率。咱們衆所周知IO讀取是很是耗資源的性能也是最慢的,這就是爲了數據庫的瓶頸常常在IO上,須要換SSD硬盤的緣由。可是Kafka做爲吞吐量極高的MQ,卻能夠很是高效的message持久化到文件。這是由於Kafka是順序寫入o(1)的時間複雜度,速度很是快。也是高吞吐量的緣由。因爲message的寫入持久化是順序寫入的,所以message在被消費的時候也是按順序被消費的,保證partition的message是順序消費的。通常的機器,單機每秒100k條數據。
- message有效期:Kafka會長久保留其中的消息,以便consumer能夠屢次消費,固然其中不少細節是可配置的。
- Produer : Producer向Topic發送message,不須要指定partition,直接發送就行了。kafka經過partition ack來控制是否發送成功並把信息返回給producer,producer能夠有任意多的thread,這些kafka服務器端是不care的。Producer端的delivery guarantee默認是At least once的。也能夠設置Producer異步發送實現At most once。Producer能夠用主鍵冪等性實現Exactly once
- Kafka高吞吐量: Kafka的高吞吐量體如今讀寫上,分佈式併發的讀和寫都很是快,寫的性能體如今以o(1)的時間複雜度進行順序寫入。讀的性能體如今以o(1)的時間複雜度進行順序讀取, 對topic進行partition分區,consume group中的consume線程能夠以很高能性能進行順序讀。
- Kafka delivery guarantee(message傳送保證):(1)At most once消息可能會丟,絕對不會重複傳輸;(2)At least once 消息絕對不會丟,可是可能會重複傳輸;(3)Exactly once每條信息確定會被傳輸一次且僅傳輸一次,這是用戶想要的。
- 批量發送:Kafka支持以消息集合爲單位進行批量發送,以提升push效率。
- push-and-pull : Kafka中的Producer和consumer採用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管從broker pull消息,二者對消息的生產和消費是異步的。
- Kafka集羣中broker之間的關係:不是主從關係,各個broker在集羣中地位同樣,咱們能夠隨意的增長或刪除任何一個broker節點。
- 負載均衡方面: Kafka提供了一個 metadata API來管理broker之間的負載(對Kafka0.8.x而言,對於0.7.x主要靠zookeeper來實現負載均衡)。
- 同步異步:Producer採用異步push方式,極大提升Kafka系統的吞吐率(能夠經過參數控制是採用同步仍是異步方式)。
- 分區機制partition:Kafka的broker端支持消息分區partition,Producer能夠決定把消息發到哪一個partition,在一個partition 中message的順序就是Producer發送消息的順序,一個topic中能夠有多個partition,具體partition的數量是可配置的。partition的概念使得kafka做爲MQ能夠橫向擴展,吞吐量巨大。partition能夠設置replica副本,replica副本存在不一樣的kafka broker節點上,第一個partition是leader,其餘的是follower,message先寫到partition leader上,再由partition leader push到parition follower上。因此說kafka能夠水平擴展,也就是擴展partition。
- 離線數據裝載:Kafka因爲對可拓展的數據持久化的支持,它也很是適合向Hadoop或者數據倉庫中進行數據裝載。
- 實時數據與離線數據:kafka既支持離線數據也支持實時數據,由於kafka的message持久化到文件,並能夠設置有效期,所以能夠把kafka做爲一個高效的存儲來使用,能夠做爲離線數據供後面的分析。固然做爲分佈式實時消息系統,大多數狀況下仍是用於實時的數據處理的,可是當cosumer消費能力降低的時候能夠經過message的持久化在淤積數據在kafka。
- 插件支持:如今很多活躍的社區已經開發出很多插件來拓展Kafka的功能,如用來配合Storm、Hadoop、flume相關的插件。
- 解耦:  至關於一個MQ,使得Producer和Consumer之間異步的操做,系統之間解耦
- 冗餘:  replica有多個副本,保證一個broker node宕機後不會影響整個服務
- 擴展性:  broker節點能夠水平擴展,partition也能夠水平增長,partition replica也能夠水平增長
- 峯值:  在訪問量劇增的狀況下,kafka水平擴展, 應用仍然須要繼續發揮做用
- 可恢復性:  系統的一部分組件失效時,因爲有partition的replica副本,不會影響到整個系統。
- 順序保證性:因爲kafka的producer的寫message與consumer去讀message都是順序的讀寫,保證了高效的性能。
- 緩衝:因爲producer那面可能業務很簡單,然後端consumer業務會很複雜並有數據庫的操做,所以確定是producer會比consumer處理速度快,若是沒有kafka,producer直接調用consumer,那麼就會形成整個系統的處理速度慢,加一層kafka做爲MQ,能夠起到緩衝的做用。
- 異步通訊:做爲MQ,Producer與Consumer異步通訊

2.Kafka文件存儲機制

2.1 Kafka部分名詞解釋以下:
 
     Kafka中發佈訂閱的對象是topic。咱們能夠爲每類數據建立一個topic,把向topic發佈消息的客戶端稱做producer,從topic訂閱消息的客戶端稱做consumer。Producers和consumers能夠同時從多個topic讀寫數據。一個kafka集羣由一個或多個broker服務器組成,它負責持久化和備份具體的kafka消息。
  • Broker:Kafka節點,一個Kafka節點就是一個broker,多個broker能夠組成一個Kafka集羣。
  • Topic:一類消息,消息存放的目錄即主題,例如page view日誌、click日誌等均可以以topic的形式存在,Kafka集羣可以同時負責多個topic的分發。
  • Partition:topic物理上的分組,一個topic能夠分爲多個partition,每一個partition是一個有序的隊列
  • Segment:partition物理上由多個segment組成,每一個Segment存着message信息
  • Producer : 生產message發送到topic
  • Consumer : 訂閱topic消費message, consumer做爲一個線程來消費
  • Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置文件中配置好的。各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group ) 中的一個consumer(consumer 線程 )消費,若是一個message能夠被多個consumer(consumer 線程 ) 消費的話,那麼這些consumer必須在不一樣的組。Kafka不支持一個partition中的message由兩個或兩個以上的consumer thread來處理,即使是來自不一樣的consumer group的也不行。它不能像AMQ那樣能夠多個BET做爲consumer去處理message,這是由於多個BET去消費一個Queue中的數據的時候,因爲要保證不能多個線程拿同一條message,因此就須要行級別悲觀所(for update),這就致使了consume的性能降低,吞吐量不夠。而kafka爲了保證吞吐量,只容許一個consumer線程去訪問一個partition。若是以爲效率不高的時候,能夠加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就造成了分佈式消費的概念。

  • 2.2 kafka一些原理概念
1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的自己特性.且不管任何OS下,對文件系統自己的優化是很是艱難的.文件緩存/直接內存映射等是經常使用的手段.由於kafka是對日誌文件進行append操做,所以磁盤檢索的開支是較小的;同時爲了減小磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提高.
 
2.性能
除磁盤IO以外,咱們還須要考慮網絡IO,這直接關係到kafka的吞吐量問題.kafka並無提供太多高超的技巧;對於producer端,能夠將消息buffer起來,當消息的條數達到必定閥值時,批量發送給broker;對於consumer端也是同樣,批量fetch多條消息.不過消息量的大小能夠經過配置文件來指定.對於kafka broker端,彷佛有個sendfile系統調用能夠潛在的提高網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域便可,而無需進程再次copy和交換(這裏涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩衝區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,所以啓用消息壓縮機制是一個良好的策略;壓縮須要消耗少許的CPU資源,不過對於kafka而言,網絡IO更應該須要考慮.能夠將任何在網絡上傳輸的消息都通過壓縮.kafka支持gzip/snappy等多種壓縮方式.
 
3.負載均衡
kafka集羣中的任何一個broker,均可以向producer提供metadata信息,這些metadata中包含"集羣中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢總體的網絡延遲,批量延遲發送事實上提高了網絡效率;不過這也有必定的隱患,好比當producer失效時,那些還沒有發送的消息將會丟失。
 
4.Topic模型
其餘JMS實現,消息消費的位置是有prodiver保留,以便避免重複發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker須要太多額外的工做.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有複雜的消息確認機制,可見kafka broker端是至關輕量級的.當消息被consumer接收以後,consumer能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.因而可知,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不只提升了consumer端的靈活性,也適度的減輕了broker端設計的複雜度;這是和衆多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不一樣,kafka中的消息是批量(一般以消息的條數或者chunk的尺寸爲單位)發送給consumer,當消息消費成功後,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬鬆"的設計,將會有"丟失"消息/"消息重發"的危險.
 
5.消息傳輸一致
Kafka提供3種消息傳輸一致性語義:最多1次,最少1次,剛好1次。
最少1次:可能會重傳數據,有可能出現數據被重複處理的狀況;
最多1次:可能會出現數據丟失狀況;
剛好1次:並非指真正只傳輸1次,只不過有一個機制。確保不會出現「數據被重複處理」和「數據丟失」的狀況。
 
at most once: 消費者fetch消息,而後保存offset,而後處理消息;當client保存offset以後,可是在消息處理過程當中consumer進程失效(crash),致使部分消息未能繼續處理.那麼此後可能其餘consumer會接管,可是由於offset已經提早保存,那麼新的consumer將不能fetch到offset以前的消息(儘管它們尚沒有被處理),這就是"at most once".
at least once: 消費者fetch消息,而後處理消息,而後保存offset.若是消息處理成功以後,可是在保存offset階段zookeeper異常或者consumer失效,致使保存offset操做未能執行成功,這就致使接下來再次fetch時可能得到上次已經處理過的消息,這就是"at least once".
"Kafka Cluster"到消費者的場景中能夠採起如下方案來獲得「剛好1次」的一致性語義:
最少1次+消費者的輸出中額外增長已處理消息最大編號:因爲已處理消息最大編號的存在,不會出現重複處理消息的狀況。
 
6.副本
kafka中,replication策略是基於partition,而不是topic;kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有);備份的個數能夠經過broker配置文件來設定。leader處理全部的read-write請求,follower須要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具備良好的網絡環境.即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可.
選擇follower時須要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,須要考慮到"負載均衡",partition leader較少的broker將會更有可能成爲新的leader.
 
7.log
每一個log entry格式爲"4個字節的數字N表示消息的長度" + "N個字節的消息內容";每一個日誌都有一個offset來惟一的標記一條消息,offset的值爲8個字節的數字,表示此消息在此partition中所處的起始位置..每一個partition在物理存儲層面,有多個log file組成(稱爲segment).segment file的命名爲"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
獲取消息時,須要指定offset和最大chunk尺寸,offset用來表示消息的起始位置,chunk size用來表示最大獲取消息的總長度(間接的表示消息的條數).根據offset,能夠找到此消息所在segment文件,而後根據segment的最小offset取差值,獲得它在file中的相對位置,直接讀取輸出便可.
 
8.分佈式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;此做用主要是爲了"負載均衡".一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.
Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.
Partition Owner registry: 用來標記partition正在被哪一個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions)
當consumer啓動時,所觸發的操做:
A) 首先進行"Consumer id Registry";
B) 而後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).
C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.
 
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
2) Broker端使用zookeeper用來註冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。
 
9.Leader的選擇
Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。
若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。
一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。
實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.
這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。
這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。
 
10.副本管理
以上僅僅以一個topic一個分區爲例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka儘可能的使全部分區均勻的分佈到集羣全部的節點上而不是集中在某些節點上,另外主從關係也儘可能均衡這樣每一個幾點都會擔任必定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.
 
11.Leader與副本同步
對於某個分區來講,保存正分區的"broker"爲該分區的"leader",保存備份分區的"broker"爲該分區的"follower"。備份分區會徹底複製正分區的消息,包括消息的編號等附加屬性值。爲了保持正分區和備份分區的內容一致,Kafka採起的方案是在保存備份分區的"broker"上開啓一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。通常狀況下,一個分區有一個「正分區」和零到多個「備份分區」。能夠配置「正分區+備份分區」的總數量,關於這個配置,不一樣主題能夠有不一樣的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通訊。
 
Kafka容許topic的分區擁有若干副本,這個數量是能夠配置的,你能夠爲每一個topic配置副本的數量。Kafka會自動在每一個副本上備份數據,因此當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你能夠配置只有一個副本,這樣其實就至關於只有一份數據。
建立副本的單位是topic的分區,每一個分區都有一個leader和零或多個followers.全部的讀寫操做都由leader處理,通常分區的數量都比broker的數量多的多,各分區的leader均勻的分佈在brokers中。全部的followers都複製leader的日誌,日誌中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那裏拉取消息並保存在本身的日誌文件中。
許多分佈式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)」有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須能夠維護和ZooKeeper的鏈接,Zookeeper經過心跳機制檢查每一個節點的鏈接。
2. 若是節點是個follower,他必須能及時的同步leader的寫操做,延時不能過久。
符合以上條件的節點準確的說應該是「同步中的(in sync)」,而不是模糊的說是「活着的」或是「失敗的」。Leader會追蹤全部「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時過久,leader就會把它移除。至於延時多久算是「過久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被全部的副本加入到日誌中時,纔算是「committed」,只有committed的消息纔會發送給consumer,這樣就不用擔憂一旦leader down掉了消息會丟失。Producer也能夠選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。
 
 
  • 2.3  kafka拓撲結構

       一個典型的Kafka集羣中包含若干Producer(能夠是web前端FET,或者是服務器日誌等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集羣。Kafka經過Zookeeper管理Kafka集羣配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,由於consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。
 

 

  • topic中partition存儲分佈
  • partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
  • partiton中segment文件存儲結構
  • 在partition中如何經過offset查找message

 

 
2.3 topic中partition存儲分佈
 
  |--report_push-0
  |--report_push-1
  |--report_push-2
  |--report_push-3
  |--launch_info-0
  |--launch_info-1
  |--launch_info-2
  |--launch_info-3
 
在Kafka文件存儲中,同一個topic下有多個不一樣partition,每一個partition爲一個目錄,partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic由是由一些Partition組成,其組織結構以下圖所示:
 
咱們能夠看到,Partition是一個Queue的結構,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition上,其中的每個消息都被賦予了一個惟一的offset值。
 
Kafka集羣會保存全部的消息,無論消息有沒有被消費;咱們能夠設定消息的過時時間,只有過時的數據纔會被自動清除以釋放磁盤空間。好比咱們設置消息過時時間爲2天,那麼這2天內的全部消息都會被保存到集羣中,數據只有超過了兩天才會被清除。
 
Kafka只維護在Partition中的offset值,由於這個offsite標識着這個partition的message消費到哪條了。Consumer每消費一個消息,offset就會加1。其實消息的狀態徹底是由Consumer控制的,Consumer能夠跟蹤和重設這個offset值,這樣的話Consumer就能夠讀取任意位置的消息。
 
把消息日誌以Partition的形式存放有多重考慮,第一,方便在集羣中擴展,每一個Partition能夠經過調整以適應它所在的機器,而一個topic又能夠有多個Partition組成,所以整個集羣就能夠適應任意大小的數據了;第二就是能夠提升併發,由於能夠以Partition爲單位讀寫了。
 
經過上面介紹的咱們能夠知道,kafka中的數據是持久化的而且可以容錯的。Kafka容許用戶爲每一個topic設置副本數量,副本數量決定了有幾個broker來存放寫入的數據。若是你的副本數量設置爲3,那麼一份數據就會被存放在3臺不一樣的機器上,那麼就容許有2個機器失敗。通常推薦副本數量至少爲2,這樣就能夠保證增減、重啓機器時不會影響到數據消費。若是對數據持久化有更高的要求,能夠把副本數量設置爲3或者更多。
 
Kafka中的topic是以partition的形式存放的,每個topic均可以設置它的partition數量,Partition的數量決定了組成topic的message的數量。Producer在生產數據時,會按照必定規則(這個規則是能夠自定義的)把消息發佈到topic的各個partition中。上面將的副本都是以partition爲單位的,不過只有一個partition的副本會被選舉成leader做爲讀寫用。
 
關於如何設置partition值須要考慮的因素。一個partition只能被一個消費者消費(一個消費者能夠同時消費多個partition),所以,若是設置的partition的數量小於consumer的數量,就會有消費者消費不到數據。因此,推薦partition的數量必定要大於同時運行的consumer的數量。另一方面,建議partition的數量大於集羣broker的數量,這樣leader partition就能夠均勻的分佈在各個broker中,最終使得集羣負載均衡。在Cloudera,每一個topic都有上百個partition。須要注意的是,kafka須要爲每一個partition分配一些內存來緩存消息數據,若是partition數量越大,就要爲kafka分配更大的heap space。
2.4 partiton中文件存儲方式
 
  • 每一個partion(目錄)至關於一個巨型文件被平均分配到多個大小相等segment(段)數據文件中。但每一個段segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。
  • 每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。

 

2.5 partiton中segment文件存儲結構
producer發message到某個topic,message會被均勻的分佈到多個partition上(隨機或根據用戶指定的回調函數進行分佈),kafka broker收到message往對應partition的最後一個segment上添加該消息,當某個segment上的消息條數達到配置值或消息發佈時間超過閾值時,segment上的消息會被flush到磁盤,只有flush到磁盤上的消息consumer才能消費,segment達到必定的大小後將不會再往該segment寫數據,broker會建立新的segment。
 
每一個part在內存中對應一個index,記錄每一個segment中的第一條消息偏移。
  • segment file組成:由2大部分組成,分別爲index file和data file,此2個文件一一對應,成對出現,後綴".index"和「.log」分別表示爲segment索引文件、數據文件.
  • segment文件命名規則:partion全局的第一個segment從0開始,後續每一個segment文件名爲上一個全局partion的最大offset(偏移message數)。數值最大爲64位long大小,19位數字字符長度,沒有數字用0填充。
 
每一個segment中存儲不少條消息,消息id由其邏輯位置決定,即從消息id可直接定位到消息的存儲位置,避免id到位置的額外映射。
 
 

 

 

參數說明:

   
   
   
   
   
   
   
   
   
 
2.6 在partition中如何經過offset查找message

 

segment index file採起稀疏索引存儲方式,它減小索引文件大小,經過mmap能夠直接內存操做,稀疏索引爲數據文件的每一個對應message設置一個元數據指針,它 比稠密索引節省了更多的存儲空間,但查找起來須要消耗更多的時間。
 
kafka會記錄offset到zk中。可是,zk client api對zk的頻繁寫入是一個低效的操做。0.8.2 kafka引入了native offset storage,將offset管理從zk移出,而且能夠作到水平擴展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic與partion的組合做爲key直接提交到compacted topic中。同時Kafka又在內存中維護了的三元組來維護最新的offset信息,consumer來取最新offset信息的時候直接內存裏拿便可。固然,kafka容許你快速的checkpoint最新的offset信息到磁盤上。
 
3.Partition Replication原則

 

  • Kafka把topic中一個parition大文件分紅多個小文件段,經過多個小文件段,就容易按期清除或刪除已經消費完文件,減小磁盤佔用。
  • 經過索引信息能夠快速定位message和肯定response的最大大小。
  • 經過index元數據所有映射到memory,能夠避免segment file的IO磁盤操做。
  • 經過索引文件稀疏存儲,能夠大幅下降index文件元數據佔用空間大小。
 
 

 

 

 

 

 

 

 

 

  • 在Kafka集羣中,每一個Broker都有均等分配Partition的Leader機會。
  • 上述圖Broker Partition中,箭頭指向爲副本,以Partition-0爲例:broker1中parition-0爲Leader,Broker2中Partition-0爲副本。
  • 上述圖種每一個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker爲副本,如此循環迭代分配,多副本都遵循此規則。
 
副本分配算法以下:
  • 將全部N Broker和待分配的i個Partition排序.
  • 將第i個Partition分配到第(i mod n)個Broker上.
  • 將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上.
 
4.Kafka Broker一些特性
4.1 無狀態的Kafka Broker :
1. Broker沒有副本機制,一旦broker宕機,該broker的消息將都不可用。
2. Broker不保存訂閱者的狀態,由訂閱者本身保存。
3. 無狀態致使消息的刪除成爲難題(可能刪除的消息正在被訂閱),kafka採用基於時間的SLA(服務水平保證),消息保存必定時間(一般爲7天)後會被刪除。
4. 消息訂閱者能夠rewind back到任意位置從新進行消費,當訂閱者故障時,能夠選擇最小的offset進行從新讀取消費消息。
 
4.2 message的交付與生命週期 :
1. 不是嚴格的JMS, 所以kafka對消息的重複、丟失、錯誤以及順序型沒有嚴格的要求。(這是與AMQ最大的區別)
2. kafka提供at-least-once delivery,即當consumer宕機後,有些消息可能會被重複delivery。
3. 因每一個partition只會被consumer group內的一個consumer消費,故kafka保證每一個partition內的消息會被順序的訂閱。
4. Kafka爲每條消息爲每條消息計算CRC校驗,用於錯誤檢測,crc校驗不經過的消息會直接被丟棄掉。
 
4.3 壓縮
 
Kafka支持以集合(batch)爲單位發送消息,在此基礎上,Kafka還支持對消息集合進行壓縮,Producer端能夠經過GZIP或Snappy格式對消息集合進行壓縮。Producer端進行壓縮以後,在Consumer端需進行解壓。壓縮的好處就是減小傳輸的數據量,減輕對網絡傳輸的壓力,在對大數據處理上,瓶頸每每體如今網絡上而不是CPU。
 
那麼如何區分消息是壓縮的仍是未壓縮的呢,Kafka在消息頭部添加了一個描述壓縮屬性字節,這個字節的後兩位表示消息的壓縮採用的編碼,若是後兩位爲0,則表示消息未被壓縮。
 
4.4 消息可靠性
 
在消息系統中,保證消息在生產和消費過程當中的可靠性是十分重要的,在實際消息傳遞過程當中,可能會出現以下三中狀況:
 
- 一個消息發送失敗
 
- 一個消息被髮送屢次
 
- 最理想的狀況:exactly-once ,一個消息發送成功且僅發送了一次
 
有許多系統聲稱它們實現了exactly-once,可是它們其實忽略了生產者或消費者在生產和消費過程當中有可能失敗的狀況。好比雖然一個Producer成功發送一個消息,可是消息在發送途中丟失,或者成功發送到broker,也被consumer成功取走,可是這個consumer在處理取過來的消息時失敗了。
 
從Producer端看:Kafka是這麼處理的,當一個消息被髮送後,Producer會等待broker成功接收到消息的反饋(可經過參數控制等待時間),若是消息在途中丟失或是其中一個broker掛掉,Producer會從新發送(咱們知道Kafka有備份機制,能夠經過參數控制是否等待全部備份節點都收到消息)。
 
從Consumer端看:前面講到過partition,broker端記錄了partition中的一個offset值,這個值指向Consumer下一個即將消費message。當Consumer收到了消息,但卻在處理過程當中掛掉,此時Consumer能夠經過這個offset值從新找到上一個消息再進行處理。Consumer還有權限控制這個offset值,對持久化到broker端的消息作任意處理。
 
4.5 備份機制
 
備份機制是Kafka0.8版本的新特性,備份機制的出現大大提升了Kafka集羣的可靠性、穩定性。有了備份機制後,Kafka容許集羣中的節點掛掉後而不影響整個集羣工做。一個備份數量爲n的集羣容許n-1個節點失敗。在全部備份節點中,有一個節點做爲lead節點,這個節點保存了其它備份節點列表,並維持各個備份間的狀體同步。下面這幅圖解釋了Kafka的備份機制:
 
 
4.6 Kafka高效性相關設計
 
4.6.1 消息的持久化
Kafka高度依賴文件系統來存儲和緩存消息(AMQ的nessage是持久化到mysql數據庫中的),由於通常的人認爲磁盤是緩慢的,這致使人們對持久化結構具備競爭性持懷疑態度。其實,磁盤的快或者慢,這決定於咱們如何使用磁盤。由於磁盤線性寫的速度遠遠大於隨機寫。線性讀寫在大多數應用場景下是能夠預測的。
4.6.2 常數時間性能保證
每一個Topic的Partition的是一個大文件夾,裏面有無數個小文件夾segment,但partition是一個隊列,隊列中的元素是segment,消費的時候先從第0個segment開始消費,新來message存在最後一個消息隊列中。對於segment也是對隊列,隊列元素是message,有對應的offsite標識是哪一個message。消費的時候先從這個segment的第一個message開始消費,新來的message存在segment的最後。
 
消息系統的持久化隊列能夠構建在對一個文件的讀和追加上,就像通常狀況下的日誌解決方案。它有一個優勢,全部的操做都是常數時間,而且讀寫之間不會相互阻塞。這種設計具備極大的性能優點:最終系統性能和數據大小徹底無關,服務器能夠充分利用廉價的硬盤來提供高效的消息服務。
 
事實上還有一點,磁盤空間的無限增大而不影響性能這點,意味着咱們能夠提供通常消息系統沒法提供的特性。好比說,消息被消費後不是立馬被刪除,咱們能夠將這些消息保留一段相對比較長的時間(好比一個星期)。
 
5.Kafka 生產者-消費者
     消息系統一般都會由生產者,消費者,Broker三大部分組成,生產者會將消息寫入到Broker,消費者會從Broker中讀取出消息,不一樣的MQ實現的Broker實現會有所不一樣,不過Broker的本質都是要負責將消息落地到服務端的存儲系統中。具體步驟以下:
  1.  

    1. 客戶端鏈接對象將消息包裝到請求中發送到服務端
    2. 服務端的入口也有一個鏈接對象負責接收請求,並將消息以文件的形式存儲起來
    3. 服務端返回響應結果給生產者客戶端
  2.  

    1. 客戶端鏈接對象將消費信息也包裝到請求中發送給服務端
    2. 服務端從文件存儲系統中取出消息
    3. 服務端返回響應結果給消費者客戶端
    4. 客戶端將響應結果還原成消息並開始處理消息
 
                                                                              圖4-1 客戶端和服務端交互
 
5.1  Producers
 
Producers直接發送消息到broker上的leader partition,不須要通過任何中介或其餘路由轉發。爲了實現這個特性,kafka集羣中的每一個broker均可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是能夠直接被訪問的。
 
Producer客戶端本身控制着消息被推送到哪些partition。實現的方式能夠是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶能夠爲每一個消息指定一個partitionKey,經過這個key來實現一些hash分區算法。好比,把userid做爲partitionkey的話,相同userid的消息將會被推送到同一個partition。
 
以Batch的方式推送數據能夠極大的提升處理效率,kafka Producer 能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。Batch的數量大小能夠經過Producer的參數控制,參數值能夠設置爲累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。
 
Producers能夠異步的並行的向kafka發送消息,可是一般producer在發送完消息以後會獲得一個future響應,返回的是offset值或者發送過程當中遇到的錯誤。這其中有個很是重要的參數「acks」,這個參數決定了producer要求leader partition 收到確認的副本個數,若是acks設置數量爲0,表示producer不會等待broker的響應,因此,producer沒法知道消息是否發送成功,這樣有可能會致使數據丟失,但同時,acks值爲0會獲得最大的系統吞吐量。
 
若acks設置爲1,表示producer會在leader partition收到消息時獲得broker的一個確認,這樣會有更好的可靠性,由於客戶端會等待直到broker確認收到消息。若設置爲-1,producer會在全部備份的partition收到消息時獲得broker的確認,這個設置能夠獲得最高的可靠性保證。
 
Kafka 消息有一個定長的header和變長的字節數組組成。由於kafka消息支持字節數組,也就使得kafka能夠支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但咱們推薦消息大小不要超過1MB,一般通常消息大小都在1~10kB以前。
 
發佈消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發佈,能夠往消息集合中添加多條消息,一次行發佈),send消息時,producer client需指定消息所屬的topic。
 
5.2  Consumers
Kafka提供了兩套consumer api,分爲high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的鏈接,而且這個API是徹底無狀態的,每次請求都須要指定offset值,所以,這套API也是最靈活的。
 
在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,所以,consumer能夠本身決定如何讀取kafka中的數據。好比,consumer能夠經過重設offset值來從新消費已消費過的數據。無論有沒有被消費,kafka會保存數據一段時間,這個時間週期是可配置的,只有到了過時時間,kafka纔會刪除這些數據。(這一點與AMQ不同,AMQ的message通常來講都是持久化到mysql中的,消費完的message會被delete掉)
 
High-level API封裝了對集羣中一系列broker的訪問,能夠透明的消費一個topic。它本身維持了已消費消息的狀態,即每次消費的都是下一個消息。
 
High-level API還支持以組的形式消費topic,若是consumers有同一個組名,那麼kafka就至關於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不一樣的組名,那麼此時kafka就至關與一個廣播服務,會把topic中的全部消息廣播到每一個consumer。
 
High level api和Low level api是針對consumer而言的,和producer無關。
 
High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啓動另一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,若是使用了High level api, 每一個message只能被讀一次,一旦讀了這條message以後,不管我consumer的處理是否ok。High level api的另一個線程會自動的把offiste+1同步到zookeeper上。若是consumer讀取數據出了問題,offsite也會在zookeeper上同步。所以,若是consumer處理失敗了,會繼續執行下一條。這每每是不對的行爲。所以,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,可是最後讀的這一條數據是丟失了,由於在zookeeper裏面的offsite已經+1了。等再次啓動conusmer group的時候,已經從下一條開始讀取處理了。
 
Low level api是consumer讀的partition的offsite在consumer本身的程序中維護。不會同步到zookeeper上。可是爲了kafka manager可以方便的監控,通常也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite咱們本身維護,咱們不會+1。下次再啓動的時候,還會從這個offsite開始讀。這樣能夠作到exactly once對於數據的準確性有保證。
 
 
對於Consumer group:
1. 容許consumer group(包含多個consumer,如一個集羣同時消費)對一個topic進行消費,不一樣的consumer group之間獨立消費。
2. 爲了對減少一個consumer group中不一樣consumer之間的分佈式協調開銷,指定partition爲最小的並行消費單位,即一個group內的consumer只能消費不一樣的partition。
 
 
Consumer與Partition的關係:
- 若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數
- 若是consumer比partition少,一個consumer會對應於多個partitions,這裏主要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻
- 若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣
- 增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化
- High-level接口中獲取不到數據的時候是會block的
 
負載低的狀況下能夠每一個線程消費多個partition。但負載高的狀況下,Consumer 線程數最好和Partition數量保持一致。若是仍是消費不過來,應該再開 Consumer 進程,進程內線程數一樣和分區數一致。
 
消費消息時,kafka client需指定topic以及partition number(每一個partition對應一個邏輯日誌流,如topic表明某個產品線,partition表明產品線的日誌按天切分的結果),consumer client訂閱後,就可迭代讀取消息,若是沒有消息,consumer client會阻塞直到有新的消息發佈。consumer能夠累積確認接收到的消息,當其確認了某個offset的消息,意味着以前的消息也都已成功接收到,此時broker會更新zookeeper上地offset registry。
 
5.3  高效的數據傳輸
1.  發佈者每次可發佈多條消息(將消息加到一個消息集合中發佈), consumer每次迭代消費一條消息。

 

3.  使用sendfile優化網絡傳輸,減小一次內存拷貝。
 
6.Kafka 與 Zookeeper
 
6.1 Zookeeper 協調控制
1. 管理broker與consumer的動態加入與離開。(Producer不須要管理,隨便一臺計算機均可以做爲Producer向Kakfa Broker發消息)
2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一
   個 consumer group內的多個consumer的消費負載平衡。(由於一個comsumer消費一個或多個partition,一個partition只能被一個consumer消費)
相關文章
相關標籤/搜索