kafka生產消費原理筆記

1、什麼是kafka

  Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。前端

2、kafka與其餘消息中間件

Redis
  • 基於Key-Value對的NoSQL數據庫
  • 入隊時,當數據比較小時Redis的性能要高於RabbitMQ,而若是數據大小超過了10K,Redis則慢的沒法忍受;
  • 出隊時,不管數據大小,Redis都表現出很是好的性能,而RabbitMQ的出隊性能則遠低於Redis。
RabbitMQ
  • Erlang編寫
  • 支持不少的協議:AMQP,XMPP, SMTP, STOMP
  • 很是重量級,更適合於企業級的開發
  • 發送給客戶端時先在中心隊列排隊。對路由,負載均衡或者數據持久化都有很好的支持。
ZeroMQ
  • 號稱最快的消息隊列系統,尤爲針對大吞吐量的需求場景。
  • 高級/複雜的隊列,可是開發人員須要本身組合多種技術框架,技術上的複雜度是對這MQ可以應用成功的挑戰。
  • 具備一個獨特的非中間件的模式,不須要安裝和運行一個消息服務器或中間件
  • ZeroMQ僅提供非持久性的隊列,也就是說若是宕機,數據將會丟失。
ActiveMQ
  • 相似於ZeroMQ,它可以以代理人和點對點的技術實現隊列。
  • 相似於RabbitMQ,它少許代碼就能夠高效地實現高級應用場景。
Kafka/Jafka
  • 高性能跨語言分佈式發佈/訂閱消息隊列系統
  • 快速持久化,能夠在O(1)的系統開銷下進行消息持久化;
  • 高吞吐,在一臺普通的服務器上既能夠達到10W/s的吞吐速率;
  • 徹底的分佈式系統,Broker、Producer、Consumer都原生自動支持分佈式,自動實現負載均衡;
  • 支持Hadoop數據並行加載,對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka經過Hadoop的並行加載機制統一了在線和離線的消息處理。
  • 一個很是輕量級的消息系統,除了性能很是好以外,仍是一個工做良好的分佈式系統。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

3、kafka解決了什麼問題

Kafka主要用途是數據集成,或者說是流數據集成,以Pub/Sub形式的消息總線形式提供。可是,Kafka不只僅是一套傳統的消息總線,本質上Kafka是分佈式的流數據平臺,由於如下特性而著名:node

  1. 提供Pub/Sub方式的海量消息處理。
  2. 以高容錯的方式存儲海量數據流。
  3. 保證數據流的順序。

經常使用場景:mysql

  - 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如hadoop、Hbase、Solr等。
  - 消息系統:解耦和生產者和消費者、緩存消息等。
  - 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic
來作實時的監控分析,或者裝載到hadoop、數據倉庫中作離線分析和挖掘。
  - 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告。
  - 流式處理:好比spark streaming和storm
  - 事件源

4、kafka基本概念

Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間); 其中offset和timestamp在kafka集羣中產生,key/value在producer發送數據的時候產生Broker(代理者):Kafka集羣中的機器/服務被成爲broker, 是一個物理概念。linux

Topic(主題):維護Kafka上的消息類型被稱爲Topic,是一個邏輯概念。nginx

Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic能夠包含多個分區;Partition特性:web

ordered & immutable。(在數據的產生和消費過程當中,不須要關注數據具體存儲的Partition在那個Broker上,只須要指定Topic便可,由Kafka負責將數據和對應的Partition關聯上)算法

Producer(生產者):負責將數據發送到Kafka對應Topic的進程sql

Consumergroup:各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,若是一個message能夠被多個consumer(consumer 線程)消費的話,那麼這些consumer必須在不一樣的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啓動一個新的consumer group。因此若是想同時對一個topic作消費的話,啓動多個consumer group就能夠了,可是要注意的是,這裏的多個consumer的消費都必須是順序讀取partition裏面的message,新啓動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣能夠多個BET做爲consumer去互斥的(for update悲觀鎖)併發處理message,這是由於多個BET去消費一個Queue中的數據的時候,因爲要保證不能多個線程拿同一條message,因此就須要行級別悲觀所(for update),這就致使了consume的性能降低,吞吐量不夠。而kafka爲了保證吞吐量,只容許同一個consumer group下的一個consumer線程去訪問一個partition。若是以爲效率不高的時候,能夠加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。若是想多個不一樣的業務都須要這個topic的數據,起多個consumer group就行了,你們都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就造成了分佈式消費的概念。數據庫

    當啓動一個consumer group去消費一個topic的時候,不管topic裏面有多個少個partition,不管咱們consumer group裏面配置了多少個consumer thread,這個consumer group下面的全部consumer thread必定會消費所有的partition;即使這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費全部的partition。所以,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。api

    同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不可以一個consumer group的多個consumer同時消費一個partition。

    一個consumer group下,不管有多少個consumer,這個consumer group必定回去把這個topic下全部的partition都消費了。當consumer group裏面的consumer數量小於這個topic下的partition數量的時候,以下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的狀況,總之是這個topic下的partition都會被消費。若是consumer group裏面的consumer數量等於這個topic下的partition數量的時候,以下圖groupC,此時效率是最高的,每一個partition都有一個consumer thread去消費。當consumer group裏面的consumer數量大於這個topic下的partition數量的時候,以下圖GroupD,就會有一個consumer thread空閒。所以,咱們在設定consumer group的時候,只須要指明裏面有幾個consumer數量便可,無需指定對應的消費partition序號,consumer會自動進行rebalance。

    多個Consumer Group下的consumer能夠消費同一條message,可是這種消費也是以o(1)的方式順序的讀取message去消費,,因此必定會重複消費這批message的,不能向AMQ那樣多個BET做爲consumer消費(對message加鎖,消費的時候不能重複消費message)

Consumer Rebalance的觸發條件:(1)Consumer增長或刪除會觸發 Consumer Group的Rebalance(2)Broker的增長或者減小都會觸發 Consumer Rebalance

Consumer: Consumer處理partition裏面的message的時候是o(1)順序讀取的。因此必須維護着上一次讀到哪裏的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由本身維護。通常來講都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。

通常狀況下,必定是一個consumer group處理一個topic的message。Best Practice是這個consumer group裏面consumer的數量等於topic裏面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。若是這個consumer group裏面consumer的數量小於topic裏面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,咱們不用指定),可是總之這個topic裏面的全部partition都會被處理到的。。若是這個consumer group裏面consumer的數量大於topic裏面partition的數量,多出的consumer thread就會閒着啥也不幹,剩下的是一個consumer thread處理一個partition,這就形成了資源的浪費,由於一個partition不可能被兩個consumer thread去處理。因此咱們線上的分佈式多個service服務,每一個service裏面的kafka consumer數量都小於對應的topic的partition數量,可是全部服務的consumer數量只和等於partition的數量,這是由於分佈式service服務的全部consumer都來自一個consumer group,若是來自不一樣的consumer group就會處理重複的message了(同一個consumer group下的consumer不能處理同一個partition,不一樣的consumer group能夠處理同一個topic,那麼都是順序處理message,必定會處理重複的。通常這種狀況都是兩個不一樣的業務邏輯,纔會啓動兩個consumer group來處理一個topic)。

 
若是producer的流量增大,當前的topic的parition數量=consumer數量,這時候的應對方式就是很想擴展:增長topic下的partition,同時增長這個consumer group下的consumer。

5、消息如何生產消費

官網的圖解能夠直觀看出消費概覽

須要注意以下幾點:

1)一組(類)消息一般由某個topic來歸類,咱們能夠把這組消息「分發」給若干個分區(partition),每一個分區的消息各不相同;

2)每一個分區都維護着他本身的偏移量(Offset),記錄着該分區的消息此時被消費的位置;

3)一個消費線程能夠對應若干個分區,但一個分區只能被具體某一個消費線程消費;

4)group.id用於標記某一個消費組,每個消費組都會被記錄他在某一個分區的Offset,即不一樣consumer group針對同一個分區,都有「各自」的偏移量。

6、消息投遞

一個消息如何算投遞成功,Kafka提供了三種模式:

- 第一種是啥都無論,發送出去就看成成功,這種狀況固然不能保證消息成功投遞到broker;

- 第二種是Master-Slave模型,只有當Master和全部Slave都接收到消息時,纔算投遞成功,這種模型提供了最高的投遞可靠性,可是損傷了性能;

- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數狀況下都會中和可靠性和性能選擇第三種模型

  消息在broker上的可靠性,由於消息會持久化到磁盤上,因此若是正常stop一個broker,其上的數據不會丟失;可是若是不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這能夠經過配置flush頁面緩存的週期、閾值緩解,可是一樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際狀況配置。

  消息消費的可靠性,Kafka提供的是「At least once」模型,由於消息的讀取進度由offset提供,offset能夠由消費者本身維護也能夠維護在zookeeper裏,可是當消息消費後consumer掛掉,offset沒有即時寫回,就有可能發生重複讀的狀況,這種狀況一樣能夠經過調整commit offset週期、閾值緩解,甚至消費者本身把消費和commit offset作成一個事務解決,可是若是你的應用不在意重複消費,那就乾脆不要解決,以換取最大的性能。

- Partition ack:當ack=1,表示producer寫partition leader成功後,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其餘一個follower成功的時候,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer所有寫成功的時候,纔算成功,kafka broker才返回成功信息。這裏須要注意的是,若是ack=1的時候,一旦有個broker宕機致使partition的follower和leader切換,會致使丟數據。

1.持久化
kafka使用文件存儲消息(append only log),這就直接決定kafka在性能上嚴重依賴文件系統的自己特性.且不管任何OS下,對文件系統自己的優化是很是艱難的.文件緩存/直接內存映射等是經常使用的手段.由於kafka是對日誌文件進行append操做,所以磁盤檢索的開支是較小的;同時爲了減小磁盤寫入的次數,broker會將消息暫時buffer起來,當消息的個數(或尺寸)達到必定閥值時,再flush到磁盤,這樣減小了磁盤IO調用的次數.對於kafka而言,較高性能的磁盤,將會帶來更加直接的性能提高.
 
2.性能
除磁盤IO以外,咱們還須要考慮網絡IO,這直接關係到kafka的吞吐量問題.kafka並無提供太多高超的技巧;對於producer端,能夠將消息buffer起來,當消息的條數達到必定閥值時,批量發送給broker;對於consumer端也是同樣,批量fetch多條消息.不過消息量的大小能夠經過配置文件來指定.對於kafka broker端,彷佛有個sendfile系統調用能夠潛在的提高網絡IO的性能:將文件的數據映射到系統內存中,socket直接讀取相應的內存區域便可,而無需進程再次copy和交換(這裏涉及到"磁盤IO數據"/"內核內存"/"進程內存"/"網絡緩衝區",多者之間的數據copy).
其實對於producer/consumer/broker三者而言,CPU的開支應該都不大,所以啓用消息壓縮機制是一個良好的策略;壓縮須要消耗少許的CPU資源,不過對於kafka而言,網絡IO更應該須要考慮.能夠將任何在網絡上傳輸的消息都通過壓縮.kafka支持gzip/snappy等多種壓縮方式.
 
3.負載均衡
kafka集羣中的任何一個broker,均可以向producer提供metadata信息,這些metadata中包含"集羣中存活的servers列表"/"partitions leader列表"等信息(請參看zookeeper中的節點信息). 當producer獲取到metadata信息以後, producer將會和Topic下全部partition leader保持socket鏈接;消息由producer直接經過socket發送到broker,中間不會通過任何"路由層".
異步發送,將多條消息暫且在客戶端buffer起來,並將他們批量發送到broker;小數據IO太多,會拖慢總體的網絡延遲,批量延遲發送事實上提高了網絡效率;不過這也有必定的隱患,好比當producer失效時,那些還沒有發送的消息將會丟失。
 
4.Topic模型
其餘JMS實現,消息消費的位置是有prodiver保留,以便避免重複發送消息或者將沒有消費成功的消息重發等,同時還要控制消息的狀態.這就要求JMS broker須要太多額外的工做.在kafka中,partition中的消息只有一個consumer在消費,且不存在消息狀態的控制,也沒有複雜的消息確認機制,可見kafka broker端是至關輕量級的.當消息被consumer接收以後,consumer能夠在本地保存最後消息的offset,並間歇性的向zookeeper註冊offset.因而可知,consumer客戶端也很輕量級。
kafka中consumer負責維護消息的消費記錄,而broker則不關心這些,這種設計不只提升了consumer端的靈活性,也適度的減輕了broker端設計的複雜度;這是和衆多JMS prodiver的區別.此外,kafka中消息ACK的設計也和JMS有很大不一樣,kafka中的消息是批量(一般以消息的條數或者chunk的尺寸爲單位)發送給consumer,當消息消費成功後,向zookeeper提交消息的offset,而不會向broker交付ACK.或許你已經意識到,這種"寬鬆"的設計,將會有"丟失"消息/"消息重發"的危險.

7、副本 

kafka中,replication策略是基於partition,而不是topic;kafka將每一個partition數據複製到多個server上,任何一個partition有一個leader和多個follower(能夠沒有);備份的個數能夠經過broker配置文件來設定。leader處理全部的read-write請求,follower須要和leader保持同步.Follower就像一個"consumer",消費消息並保存在本地日誌中;leader負責跟蹤全部的follower狀態,若是follower"落後"太多或者失效,leader將會把它從replicas同步列表中刪除.當全部的follower都將一條消息保存成功,此消息才被認爲是"committed",那麼此時consumer才能消費它,這種同步策略,就要求follower和leader之間必須具備良好的網絡環境.即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可.
選擇follower時須要兼顧一個問題,就是新leader server上所已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力.在選舉新leader,須要考慮到"負載均衡",partition leader較少的broker將會更有可能成爲新的leader.
副本管理
以上僅僅以一個topic一個分區爲例子進行了討論,但實際上一個Kafka將會管理成千上萬的topic分區.Kafka儘可能的使全部分區均勻的分佈到集羣全部的節點上而不是集中在某些節點上,另外主從關係也儘可能均衡這樣每一個幾點都會擔任必定比例的分區的leader.
優化leader的選擇過程也是很重要的,它決定了系統發生故障時的空窗期有多久。Kafka選擇一個節點做爲「controller」,當發現有節點down掉的時候它負責在游泳分區的全部節點中選擇新的leader,這使得Kafka能夠批量的高效的管理全部分區節點的主從關係。若是controller down掉了,活着的節點中的一個會備切換爲新的controller.
 
Leader與副本同步
對於某個分區來講,保存正分區的"broker"爲該分區的"leader",保存備份分區的"broker"爲該分區的"follower"。備份分區會徹底複製正分區的消息,包括消息的編號等附加屬性值。爲了保持正分區和備份分區的內容一致,Kafka採起的方案是在保存備份分區的"broker"上開啓一個消費者進程進行消費,從而使得正分區的內容與備份分區的內容保持一致。通常狀況下,一個分區有一個「正分區」和零到多個「備份分區」。能夠配置「正分區+備份分區」的總數量,關於這個配置,不一樣主題能夠有不一樣的配置值。注意,生產者,消費者只與保存正分區的"leader"進行通訊。
 
Kafka容許topic的分區擁有若干副本,這個數量是能夠配置的,你能夠爲每一個topic配置副本的數量。Kafka會自動在每一個副本上備份數據,因此當一個節點down掉時數據依然是可用的。
Kafka的副本功能不是必須的,你能夠配置只有一個副本,這樣其實就至關於只有一份數據。
建立副本的單位是topic的分區,每一個分區都有一個leader和零或多個followers.全部的讀寫操做都由leader處理,通常分區的數量都比broker的數量多的多,各分區的leader均勻的分佈在brokers中。全部的followers都複製leader的日誌,日誌中的消息和順序都和leader中的一致。followers向普通的consumer那樣從leader那裏拉取消息並保存在本身的日誌文件中。
許多分佈式的消息系統自動的處理失敗的請求,它們對一個節點是否着(alive)」有着清晰的定義。Kafka判斷一個節點是否活着有兩個條件:
1. 節點必須能夠維護和ZooKeeper的鏈接,Zookeeper經過心跳機制檢查每一個節點的鏈接。
2. 若是節點是個follower,他必須能及時的同步leader的寫操做,延時不能過久。
符合以上條件的節點準確的說應該是「同步中的(in sync)」,而不是模糊的說是「活着的」或是「失敗的」。Leader會追蹤全部「同步中」的節點,一旦一個down掉了,或是卡住了,或是延時過久,leader就會把它移除。至於延時多久算是「過久」,是由參數replica.lag.max.messages決定的,怎樣算是卡住了,怎是由參數replica.lag.time.max.ms決定的。
只有當消息被全部的副本加入到日誌中時,纔算是「committed」,只有committed的消息纔會發送給consumer,這樣就不用擔憂一旦leader down掉了消息會丟失。Producer也能夠選擇是否等待消息被提交的通知,這個是由參數acks決定的。
Kafka保證只要有一個「同步中」的節點,「committed」的消息就不會丟失。


       一個典型的Kafka集羣中包含若干Producer(能夠是web前端FET,或者是服務器日誌等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干ConsumerGroup,以及一個Zookeeper集羣。Kafka經過Zookeeper管理Kafka集羣配置:選舉Kafka broker的leader,以及在Consumer Group發生變化時進行rebalance,由於consumer消費kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式將消息發佈到broker,Consumer使用pull模式從broker訂閱並消費消息。
 

分析過程分爲如下4個步驟:

  • topic中partition存儲分佈
  • partiton中文件存儲方式 (partition在linux服務器上就是一個目錄(文件夾))
  • partiton中segment文件存儲結構
  • 在partition中如何經過offset查找message

經過上述4過程詳細分析,咱們就能夠清楚認識到kafka文件存儲機制的奧祕。

8、zookeeper

kafka leader

Kakfa Broker集羣受Zookeeper管理。全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。這個Controller會監聽其餘的Kafka Broker的全部信息,若是這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時全部的kafka broker又會一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上全部的partition在zookeeper上的狀態,並選取ISR列表中的一個replica做爲partition leader(若是ISR列表中的replica全掛,選一個倖存的replica做爲leader; 若是該partition的全部的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica「活」過來,而且選它做爲Leader;或選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其餘的kafka broker。

Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。

若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。

Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。

一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。

實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:

1. 等待ISR中的任何一個節點恢復並擔任leader。

2. 選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.

這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。

這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。

 

分佈式

kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)

Broker node registry: 當一個kafka broker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.

Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.

Consumer and Consumer group: 每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;此做用主要是爲了"負載均衡".一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.

Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.

Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.

Partition Owner registry: 用來標記partition正在被哪一個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions)

當consumer啓動時,所觸發的操做:

A) 首先進行"Consumer id Registry";

B) 而後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).

C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.

 

總結:

1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.

2) Broker端使用zookeeper用來註冊broker信息,已經監測partition leader存活性.

3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。

 

協調機制

1. 管理broker與consumer的動態加入與離開。(Producer不須要管理,隨便一臺計算機均可以做爲Producer向Kakfa Broker發消息)

2. 觸發負載均衡,當broker或consumer加入或離開時會觸發負載均衡算法,使得一consumer group內的多個consumer的消費負載平衡。(由於一個comsumer消費一個或多個partition,一個partition只能被一個consumer消費)

3.  維護消費關係及每一個partition的消費信息。

9、開發相關 

Producers

Producers直接發送消息到broker上的leader partition,不須要通過任何中介或其餘路由轉發。爲了實現這個特性,kafka集羣中的每一個broker均可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是能夠直接被訪問的。

Producer客戶端本身控制着消息被推送到哪些partition。實現的方式能夠是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶能夠爲每一個消息指定一個partitionKey,經過這個key來實現一些hash分區算法。好比,把userid做爲partitionkey的話,相同userid的消息將會被推送到同一個partition。

以Batch的方式推送數據能夠極大的提升處理效率,kafka Producer 能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。Batch的數量大小能夠經過Producer的參數控制,參數值能夠設置爲累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。

Producers能夠異步的並行的向kafka發送消息,可是一般producer在發送完消息以後會獲得一個future響應,返回的是offset值或者發送過程當中遇到的錯誤。這其中有個很是重要的參數「acks」,這個參數決定了producer要求leader partition 收到確認的副本個數,若是acks設置數量爲0,表示producer不會等待broker的響應,因此,producer沒法知道消息是否發送成功,這樣有可能會致使數據丟失,但同時,acks值爲0會獲得最大的系統吞吐量。

若acks設置爲1,表示producer會在leader partition收到消息時獲得broker的一個確認,這樣會有更好的可靠性,由於客戶端會等待直到broker確認收到消息。若設置爲-1,producer會在全部備份的partition收到消息時獲得broker的確認,這個設置能夠獲得最高的可靠性保證。

Kafka 消息有一個定長的header和變長的字節數組組成。由於kafka消息支持字節數組,也就使得kafka能夠支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但咱們推薦消息大小不要超過1MB,一般通常消息大小都在1~10kB以前。

發佈消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發佈,能夠往消息集合中添加多條消息,一次行發佈),send消息時,producer client需指定消息所屬的topic。

 

Consumers

Kafka提供了兩套consumer api,分爲high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的鏈接,而且這個API是徹底無狀態的,每次請求都須要指定offset值,所以,這套API也是最靈活的。 

在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,所以,consumer能夠本身決定如何讀取kafka中的數據。好比,consumer能夠經過重設offset值來從新消費已消費過的數據。無論有沒有被消費,kafka會保存數據一段時間,這個時間週期是可配置的,只有到了過時時間,kafka纔會刪除這些數據。(這一點與AMQ不同,AMQ的message通常來講都是持久化到mysql中的,消費完的message會被delete掉)

High-level API封裝了對集羣中一系列broker的訪問,能夠透明的消費一個topic。它本身維持了已消費消息的狀態,即每次消費的都是下一個消息。 

High-level API還支持以組的形式消費topic,若是consumers有同一個組名,那麼kafka就至關於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不一樣的組名,那麼此時kafka就至關與一個廣播服務,會把topic中的全部消息廣播到每一個consumer。 

High level api和Low level api是針對consumer而言的,和producer無關。

High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啓動另一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,若是使用了High level api, 每一個message只能被讀一次,一旦讀了這條message以後,不管我consumer的處理是否ok。High level api的另一個線程會自動的把offiste+1同步到zookeeper上。若是consumer讀取數據出了問題,offsite也會在zookeeper上同步。所以,若是consumer處理失敗了,會繼續執行下一條。這每每是不對的行爲。所以,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,可是最後讀的這一條數據是丟失了,由於在zookeeper裏面的offsite已經+1了。等再次啓動conusmer group的時候,已經從下一條開始讀取處理了。

Low level api是consumer讀的partition的offsite在consumer本身的程序中維護。不會同步到zookeeper上。可是爲了kafka manager可以方便的監控,通常也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite咱們本身維護,咱們不會+1。下次再啓動的時候,還會從這個offsite開始讀。這樣能夠作到exactly once對於數據的準確性有保證。

 

借鑑:http://blog.csdn.net/ychenfeng/article/details/74980531

相關文章
相關標籤/搜索