Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源項目。前端
Redis |
|
RabbitMQ |
|
ZeroMQ |
|
ActiveMQ |
|
Kafka/Jafka |
|
Kafka主要用途是數據集成,或者說是流數據集成,以Pub/Sub形式的消息總線形式提供。可是,Kafka不只僅是一套傳統的消息總線,本質上Kafka是分佈式的流數據平臺,由於如下特性而著名:node
經常使用場景:mysql
Message(消息):傳遞的數據對象,主要由四部分構成:offset(偏移量)、key、value、timestamp(插入時間); 其中offset和timestamp在kafka集羣中產生,key/value在producer發送數據的時候產生Broker(代理者):Kafka集羣中的機器/服務被成爲broker, 是一個物理概念。linux
Topic(主題):維護Kafka上的消息類型被稱爲Topic,是一個邏輯概念。nginx
Partition(分區):具體維護Kafka上的消息數據的最小單位,一個Topic能夠包含多個分區;Partition特性:web
ordered & immutable。(在數據的產生和消費過程當中,不須要關注數據具體存儲的Partition在那個Broker上,只須要指定Topic便可,由Kafka負責將數據和對應的Partition關聯上)算法
Producer(生產者):負責將數據發送到Kafka對應Topic的進程sql
Consumergroup:各個consumer(consumer 線程)能夠組成一個組(Consumer group ),partition中的每一個message只能被組(Consumer group )中的一個consumer(consumer 線程)消費,若是一個message能夠被多個consumer(consumer 線程)消費的話,那麼這些consumer必須在不一樣的組。Kafka不支持一個partition中的message由兩個或兩個以上的同一個consumer group下的consumer thread來處理,除非再啓動一個新的consumer group。因此若是想同時對一個topic作消費的話,啓動多個consumer group就能夠了,可是要注意的是,這裏的多個consumer的消費都必須是順序讀取partition裏面的message,新啓動的consumer默認從partition隊列最頭端最新的地方開始阻塞的讀message。它不能像AMQ那樣能夠多個BET做爲consumer去互斥的(for update悲觀鎖)併發處理message,這是由於多個BET去消費一個Queue中的數據的時候,因爲要保證不能多個線程拿同一條message,因此就須要行級別悲觀所(for update),這就致使了consume的性能降低,吞吐量不夠。而kafka爲了保證吞吐量,只容許同一個consumer group下的一個consumer線程去訪問一個partition。若是以爲效率不高的時候,能夠加partition的數量來橫向擴展,那麼再加新的consumer thread去消費。若是想多個不一樣的業務都須要這個topic的數據,起多個consumer group就行了,你們都是順序的讀取message,offsite的值互不影響。這樣沒有鎖競爭,充分發揮了橫向的擴展性,吞吐量極高。這也就造成了分佈式消費的概念。數據庫
當啓動一個consumer group去消費一個topic的時候,不管topic裏面有多個少個partition,不管咱們consumer group裏面配置了多少個consumer thread,這個consumer group下面的全部consumer thread必定會消費所有的partition;即使這個consumer group下只有一個consumer thread,那麼這個consumer thread也會去消費全部的partition。所以,最優的設計就是,consumer group下的consumer thread的數量等於partition數量,這樣效率是最高的。api
同一partition的一條message只能被同一個Consumer Group內的一個Consumer消費。不可以一個consumer group的多個consumer同時消費一個partition。
一個consumer group下,不管有多少個consumer,這個consumer group必定回去把這個topic下全部的partition都消費了。當consumer group裏面的consumer數量小於這個topic下的partition數量的時候,以下圖groupA,groupB,就會出現一個conusmer thread消費多個partition的狀況,總之是這個topic下的partition都會被消費。若是consumer group裏面的consumer數量等於這個topic下的partition數量的時候,以下圖groupC,此時效率是最高的,每一個partition都有一個consumer thread去消費。當consumer group裏面的consumer數量大於這個topic下的partition數量的時候,以下圖GroupD,就會有一個consumer thread空閒。所以,咱們在設定consumer group的時候,只須要指明裏面有幾個consumer數量便可,無需指定對應的消費partition序號,consumer會自動進行rebalance。
多個Consumer Group下的consumer能夠消費同一條message,可是這種消費也是以o(1)的方式順序的讀取message去消費,,因此必定會重複消費這批message的,不能向AMQ那樣多個BET做爲consumer消費(對message加鎖,消費的時候不能重複消費message)
Consumer: Consumer處理partition裏面的message的時候是o(1)順序讀取的。因此必須維護着上一次讀到哪裏的offsite信息。high level API,offset存於Zookeeper中,low level API的offset由本身維護。通常來講都是使用high level api的。Consumer的delivery gurarantee,默認是讀完message先commmit再處理message,autocommit默認是true,這時候先commit就會更新offsite+1,一旦處理失敗,offsite已經+1,這個時候就會丟message;也能夠配置成讀完消息處理再commit,這種狀況下consumer端的響應就會比較慢的,須要等處理完才行。
通常狀況下,必定是一個consumer group處理一個topic的message。Best Practice是這個consumer group裏面consumer的數量等於topic裏面partition的數量,這樣效率是最高的,一個consumer thread處理一個partition。若是這個consumer group裏面consumer的數量小於topic裏面partition的數量,就會有consumer thread同時處理多個partition(這個是kafka自動的機制,咱們不用指定),可是總之這個topic裏面的全部partition都會被處理到的。。若是這個consumer group裏面consumer的數量大於topic裏面partition的數量,多出的consumer thread就會閒着啥也不幹,剩下的是一個consumer thread處理一個partition,這就形成了資源的浪費,由於一個partition不可能被兩個consumer thread去處理。因此咱們線上的分佈式多個service服務,每一個service裏面的kafka consumer數量都小於對應的topic的partition數量,可是全部服務的consumer數量只和等於partition的數量,這是由於分佈式service服務的全部consumer都來自一個consumer group,若是來自不一樣的consumer group就會處理重複的message了(同一個consumer group下的consumer不能處理同一個partition,不一樣的consumer group能夠處理同一個topic,那麼都是順序處理message,必定會處理重複的。通常這種狀況都是兩個不一樣的業務邏輯,纔會啓動兩個consumer group來處理一個topic)。
官網的圖解能夠直觀看出消費概覽
須要注意以下幾點:
1)一組(類)消息一般由某個topic來歸類,咱們能夠把這組消息「分發」給若干個分區(partition),每一個分區的消息各不相同;
2)每一個分區都維護着他本身的偏移量(Offset),記錄着該分區的消息此時被消費的位置;
3)一個消費線程能夠對應若干個分區,但一個分區只能被具體某一個消費線程消費;
4)group.id用於標記某一個消費組,每個消費組都會被記錄他在某一個分區的Offset,即不一樣consumer group針對同一個分區,都有「各自」的偏移量。
一個消息如何算投遞成功,Kafka提供了三種模式:
- 第一種是啥都無論,發送出去就看成成功,這種狀況固然不能保證消息成功投遞到broker;
- 第二種是Master-Slave模型,只有當Master和全部Slave都接收到消息時,纔算投遞成功,這種模型提供了最高的投遞可靠性,可是損傷了性能;
- 第三種模型,即只要Master確認收到消息就算投遞成功;實際使用時,根據應用特性選擇,絕大多數狀況下都會中和可靠性和性能選擇第三種模型
消息在broker上的可靠性,由於消息會持久化到磁盤上,因此若是正常stop一個broker,其上的數據不會丟失;可是若是不正常stop,可能會使存在頁面緩存來不及寫入磁盤的消息丟失,這能夠經過配置flush頁面緩存的週期、閾值緩解,可是一樣會頻繁的寫磁盤會影響性能,又是一個選擇題,根據實際狀況配置。
消息消費的可靠性,Kafka提供的是「At least once」模型,由於消息的讀取進度由offset提供,offset能夠由消費者本身維護也能夠維護在zookeeper裏,可是當消息消費後consumer掛掉,offset沒有即時寫回,就有可能發生重複讀的狀況,這種狀況一樣能夠經過調整commit offset週期、閾值緩解,甚至消費者本身把消費和commit offset作成一個事務解決,可是若是你的應用不在意重複消費,那就乾脆不要解決,以換取最大的性能。
- Partition ack:當ack=1,表示producer寫partition leader成功後,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=2,表示producer寫partition leader和其餘一個follower成功的時候,broker就返回成功,不管其餘的partition follower是否寫成功。當ack=-1[parition的數量]的時候,表示只有producer所有寫成功的時候,纔算成功,kafka broker才返回成功信息。這裏須要注意的是,若是ack=1的時候,一旦有個broker宕機致使partition的follower和leader切換,會致使丟數據。
分析過程分爲如下4個步驟:
經過上述4過程詳細分析,咱們就能夠清楚認識到kafka文件存儲機制的奧祕。
kafka leader
Kakfa Broker集羣受Zookeeper管理。全部的Kafka Broker節點一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper註冊Watch)。這個Controller會監聽其餘的Kafka Broker的全部信息,若是這個kafka broker controller宕機了,在zookeeper上面的那個臨時節點就會消失,此時全部的kafka broker又會一塊兒去Zookeeper上註冊一個臨時節點,由於只有一個Kafka Broker會註冊成功,其餘的都會失敗,因此這個成功在Zookeeper上註冊臨時節點的這個Kafka Broker會成爲Kafka Broker Controller,其餘的Kafka broker叫Kafka Broker follower。例如:一旦有一個broker宕機了,這個kafka broker controller會讀取該宕機broker上全部的partition在zookeeper上的狀態,並選取ISR列表中的一個replica做爲partition leader(若是ISR列表中的replica全掛,選一個倖存的replica做爲leader; 若是該partition的全部的replica都宕機了,則將新的leader設置爲-1,等待恢復,等待ISR中的任一個Replica「活」過來,而且選它做爲Leader;或選擇第一個「活」過來的Replica(不必定是ISR中的)做爲Leader),這個broker宕機的事情,kafka controller也會通知zookeeper,zookeeper就會通知其餘的kafka broker。
Kafka的核心是日誌文件,日誌文件在集羣中的同步是分佈式數據系統最基礎的要素。
若是leaders永遠不會down的話咱們就不須要followers了!一旦leader down掉了,須要在followers中選擇一個新的leader.可是followers自己有可能延時過久或者crash,因此必須選擇高質量的follower做爲leader.必須保證,一旦一個消息被提交了,可是leader down掉了,新選出的leader必須能夠提供這條消息。大部分的分佈式系統採用了多數投票法則選擇新的leader,對於多數投票法則,就是根據全部副本節點的情況動態的選擇最適合的做爲leader.Kafka並非使用這種方法。
Kafka動態維護了一個同步狀態的副本的集合(a set of in-sync replicas),簡稱ISR,在這個集合中的節點都是和leader保持高度一致的,任何一條消息必須被這個集合中的每一個節點讀取並追加到日誌中了,纔回通知外部這個消息已經被提交了。所以這個集合中的任何一個節點隨時均可以被選爲leader.ISR在ZooKeeper中維護。ISR中有f+1個節點,就能夠容許在f個節點down掉的狀況下不會丟失消息並正常提供服。ISR的成員是動態的,若是一個節點被淘汰了,當它從新達到「同步中」的狀態時,他能夠從新加入ISR.這種leader的選擇方式是很是快速的,適合kafka的應用場景。
一個邪惡的想法:若是全部節點都down掉了怎麼辦?Kafka對於數據不會丟失的保證,是基於至少一個節點是存活的,一旦全部節點都down了,這個就不能保證了。
實際應用中,當全部的副本都down掉時,必須及時做出反應。能夠有如下兩種選擇:
1. 等待ISR中的任何一個節點恢復並擔任leader。
2. 選擇全部節點中(不僅是ISR)第一個恢復的節點做爲leader.
這是一個在可用性和連續性之間的權衡。若是等待ISR中的節點恢復,一旦ISR中的節點起不起來或者數據都是了,那集羣就永遠恢復不了了。若是等待ISR意外的節點恢復,這個節點的數據就會被做爲線上數據,有可能和真實的數據有所出入,由於有些數據它可能還沒同步到。Kafka目前選擇了第二種策略,在將來的版本中將使這個策略的選擇可配置,能夠根據場景靈活的選擇。
這種窘境不僅Kafka會遇到,幾乎全部的分佈式數據系統都會遇到。
分佈式
kafka使用zookeeper來存儲一些meta信息,並使用了zookeeper watch機制來發現meta信息的變動並做出相應的動做(好比consumer失效,觸發負載均衡等)
Broker node registry: 當一個kafka broker啓動後,首先會向zookeeper註冊本身的節點信息(臨時znode),同時當broker和zookeeper斷開鏈接時,此znode也會被刪除.
Broker Topic Registry: 當一個broker啓動時,會向zookeeper註冊本身持有的topic和partitions信息,仍然是一個臨時znode.
Consumer and Consumer group: 每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;此做用主要是爲了"負載均衡".一個group中的多個consumer能夠交錯的消費一個topic的全部partitions;簡而言之,保證此topic的全部partitions都能被此group所消費,且消費時爲了性能考慮,讓partition相對均衡的分散到每一個consumer上.
Consumer id Registry: 每一個consumer都有一個惟一的ID(host:uuid,能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
Consumer offset Tracking: 用來跟蹤每一個consumer目前所消費的partition中最大的offset.此znode爲持久節點,能夠看出offset跟group_id有關,以代表當group中一個消費者失效,其餘consumer能夠繼續消費.
Partition Owner registry: 用來標記partition正在被哪一個consumer消費.臨時znode。此節點表達了"一個partition"只能被group下一個consumer消費,同時當group下某個consumer失效,那麼將會觸發負載均衡(即:讓partitions在多個consumer間均衡消費,接管那些"遊離"的partitions)
當consumer啓動時,所觸發的操做:
A) 首先進行"Consumer id Registry";
B) 而後在"Consumer id Registry"節點下注冊一個watch用來監聽當前group中其餘consumer的"leave"和"join";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).
C) 在"Broker id registry"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.
總結:
1) Producer端使用zookeeper用來"發現"broker列表,以及和Topic下每一個partition leader創建socket鏈接併發送消息.
2) Broker端使用zookeeper用來註冊broker信息,已經監測partition leader存活性.
3) Consumer端使用zookeeper用來註冊consumer信息,其中包括consumer消費的partition列表等,同時也用來發現broker列表,並和partition leader創建socket鏈接,並獲取消息。
協調機制
1. 管理broker與consumer的動態加入與離開。(Producer不須要管理,隨便一臺計算機均可以做爲Producer向Kakfa Broker發消息)
3. 維護消費關係及每一個partition的消費信息。
Producers
Producers直接發送消息到broker上的leader partition,不須要通過任何中介或其餘路由轉發。爲了實現這個特性,kafka集羣中的每一個broker均可以響應producer的請求,並返回topic的一些元信息,這些元信息包括哪些機器是存活的,topic的leader partition都在哪,現階段哪些leader partition是能夠直接被訪問的。
Producer客戶端本身控制着消息被推送到哪些partition。實現的方式能夠是隨機分配、實現一類隨機負載均衡算法,或者指定一些分區算法。Kafka提供了接口供用戶實現自定義的partition,用戶能夠爲每一個消息指定一個partitionKey,經過這個key來實現一些hash分區算法。好比,把userid做爲partitionkey的話,相同userid的消息將會被推送到同一個partition。
以Batch的方式推送數據能夠極大的提升處理效率,kafka Producer 能夠將消息在內存中累計到必定數量後做爲一個batch發送請求。Batch的數量大小能夠經過Producer的參數控制,參數值能夠設置爲累計的消息的數量(如500條)、累計的時間間隔(如100ms)或者累計的數據大小(64KB)。經過增長batch的大小,能夠減小網絡請求和磁盤IO的次數,固然具體參數設置須要在效率和時效性方面作一個權衡。
Producers能夠異步的並行的向kafka發送消息,可是一般producer在發送完消息以後會獲得一個future響應,返回的是offset值或者發送過程當中遇到的錯誤。這其中有個很是重要的參數「acks」,這個參數決定了producer要求leader partition 收到確認的副本個數,若是acks設置數量爲0,表示producer不會等待broker的響應,因此,producer沒法知道消息是否發送成功,這樣有可能會致使數據丟失,但同時,acks值爲0會獲得最大的系統吞吐量。
若acks設置爲1,表示producer會在leader partition收到消息時獲得broker的一個確認,這樣會有更好的可靠性,由於客戶端會等待直到broker確認收到消息。若設置爲-1,producer會在全部備份的partition收到消息時獲得broker的確認,這個設置能夠獲得最高的可靠性保證。
Kafka 消息有一個定長的header和變長的字節數組組成。由於kafka消息支持字節數組,也就使得kafka能夠支持任何用戶自定義的序列號格式或者其它已有的格式如Apache Avro、protobuf等。Kafka沒有限定單個消息的大小,但咱們推薦消息大小不要超過1MB,一般通常消息大小都在1~10kB以前。
發佈消息時,kafka client先構造一條消息,將消息加入到消息集set中(kafka支持批量發佈,能夠往消息集合中添加多條消息,一次行發佈),send消息時,producer client需指定消息所屬的topic。
Consumers
Kafka提供了兩套consumer api,分爲high-level api和sample-api。Sample-api 是一個底層的API,它維持了一個和單一broker的鏈接,而且這個API是徹底無狀態的,每次請求都須要指定offset值,所以,這套API也是最靈活的。
在kafka中,當前讀到哪條消息的offset值是由consumer來維護的,所以,consumer能夠本身決定如何讀取kafka中的數據。好比,consumer能夠經過重設offset值來從新消費已消費過的數據。無論有沒有被消費,kafka會保存數據一段時間,這個時間週期是可配置的,只有到了過時時間,kafka纔會刪除這些數據。(這一點與AMQ不同,AMQ的message通常來講都是持久化到mysql中的,消費完的message會被delete掉)
High-level API封裝了對集羣中一系列broker的訪問,能夠透明的消費一個topic。它本身維持了已消費消息的狀態,即每次消費的都是下一個消息。
High-level API還支持以組的形式消費topic,若是consumers有同一個組名,那麼kafka就至關於一個隊列消息服務,而各個consumer均衡的消費相應partition中的數據。若consumers有不一樣的組名,那麼此時kafka就至關與一個廣播服務,會把topic中的全部消息廣播到每一個consumer。
High level api和Low level api是針對consumer而言的,和producer無關。
High level api是consumer讀的partition的offsite是存在zookeeper上。High level api 會啓動另一個線程去每隔一段時間,offsite自動同步到zookeeper上。換句話說,若是使用了High level api, 每一個message只能被讀一次,一旦讀了這條message以後,不管我consumer的處理是否ok。High level api的另一個線程會自動的把offiste+1同步到zookeeper上。若是consumer讀取數據出了問題,offsite也會在zookeeper上同步。所以,若是consumer處理失敗了,會繼續執行下一條。這每每是不對的行爲。所以,Best Practice是一旦consumer處理失敗,直接讓整個conusmer group拋Exception終止,可是最後讀的這一條數據是丟失了,由於在zookeeper裏面的offsite已經+1了。等再次啓動conusmer group的時候,已經從下一條開始讀取處理了。
Low level api是consumer讀的partition的offsite在consumer本身的程序中維護。不會同步到zookeeper上。可是爲了kafka manager可以方便的監控,通常也會手動的同步到zookeeper上。這樣的好處是一旦讀取某個message的consumer失敗了,這條message的offsite咱們本身維護,咱們不會+1。下次再啓動的時候,還會從這個offsite開始讀。這樣能夠作到exactly once對於數據的準確性有保證。
借鑑:http://blog.csdn.net/ychenfeng/article/details/74980531