哪些場景適合使用Kafka
線上系統會實時產生數以萬計的日誌信息,服務器運行狀態,用戶行爲記錄,業務消息 等信息,這些信息須要用於多個不一樣的目的,好比審計、安全、數據挖掘等,所以須要以分類的方式將這些信息發送到某個地方,以方便後臺處理service實時的去獲取數據。MQ用於解決數據生成速率與數據消費速率不一致的場景,業務接口解耦,數據緩存冗餘,海量數據處理彈性,異步通訊。node
Kafka是LinkedIn開源出來的分佈式消息發佈-訂閱系統,主要特色是基於Pull模式來處理消息,O(1)常數時間級別的消息持久化和讀取時間複雜度,基於at least once的處理原則,追求高吞吐量(單臺機器吞吐量可達10w/s),主要用於非敏感信息如日誌的收集和傳輸,不支持事務,對消息的重複、丟失和錯誤沒有嚴格要求 。算法
簡單的Kafka消息生產/消費模型
最簡單的Kafka拓撲網絡裏只有一個broker,producer建立不一樣的topics,並將msg經過push的方式發送到broker,broker使用append log的方式順序性持久化msg,而後不一樣的consumers根據本身的消費速率按需從broker處pull本身須要的topics對應的msg;對於一個broker上的一個topic的msg而言,kafka總會保證其被consumer消費的前後順序;採用pull的消息處理模式可讓consumer按需處理。api
Broker從本地文件系統裏添加消息和獲取消息都是按照隊列的入列和出列模式操做,所以時間複雜度都是O(1);Kafka不會主動刪除被消費過的消息,而是經過server.properties中的配置按照過時時間或者文件總大小來進行文件刪除。緩存
構建Kafka Cluster,並使用zookeeper cluster做爲協調服務
爲了提供更高的msg吞吐量以及HA,Kafka支持以cluster的方式建立多個broker對外提供服務,因爲集羣環境引入的不肯定性,kafka使用Zookeeper做爲協調性服務(0.7.+引入),功能以下:
#1監聽broker的活躍狀態及其存儲的topic和partition狀態,協調broker leader和partition leader的選舉;
#2 爲producer提供broker的訪問地址,並記錄每一個topic下對應的partition leader分佈地址,以幫助實現負載均衡;
#3 爲consumer提供broker的訪問地址,並記錄每一個partition正在被哪些CG內的哪個consumer消費,CG中成員的變化,以幫助實現負載均衡,同時記錄每一個CG的offset。安全
Zookeeper上典型的存儲kafka信息的格式以下,Kafka Cluster中每一個broker均可以獲取關於cluster的metadata,包含active broker list,topic’s partition leaders等,所以對於producer而言每一個broker都是對等的;服務器
1 leo-chen-zookeeper 2 -> broker 3 -> ids 4 -> [broker_id] ## temp znode, value is host:port 5 -> topics 6 -> [topic_id] 7 -> [partition_id] ## temp znode, value is partition refer 8 -> consumers 9 -> [group_id] 10 -> ids 11 -> [consumer_id] ## temp znode, value is partition_id list 12 -> offsets 13 -> [topic_id] 14 -> [broker_id-partition_id] ## persist znode, value is offset 15 -> owners 16 -> [topic_id] 17 -> [broker_id-partition_id] ## temp znode, value is consumer_id
Zookeeper實現的功能以下:
#1 broker node註冊:新上線的broker會在zookeeper上建立一個temp znode以維護本身的活躍狀態,znode value爲broker的訪問地址;broker下線或者session失效都會致使temp znode被刪除。網絡
#2 broker topic註冊:新上線的broker還會根據自身存儲的topic-partition建立對應的temp znode,znode value爲partition的索引,用於失效轉移的時候進行狀態對比。session
#3 consumer node註冊:新上線的consumer會在zookeeper上建立一個temp znode以維護本身的活躍狀態,znode value爲該consumer正在訪問的topic-partition列表;consumer的上線下線都會觸發kafka的rebalance動做。併發
#4 consumer-partition offset註冊:一個CG中新上線的consumer會根據本身正在訪問的partition對應的offset在zookeeper上建立一個persist znode,znode value爲offset的值;當consumer下線以後,同一個CG內的其餘consumer會繼續消費這個offset對應的partition msg;app
#5 partition owner註冊:新上線的consumer會根據本身正在消費的topic-partition在zookeeper上建立一個temp znode,表示當前CG內全部正在被消費的partition都有哪些consumer在消費。
一個新的consumer上線以後會觸發以下操做:
#1 進行consumer node註冊;
#2 在consumers/[group_id]/ids路徑下注冊一個watcher用於監聽當前group中其餘consumer臨時節點的變更,若是有變更則觸發負載均衡,通知當前consumer node從新計算可消費的topic-partition;
#3 在broker/ids路徑下注冊一個watcher用於監聽全部broker臨時節點的變更,若是有變更則觸發負載均衡,通知當前consumer node從新計算可消費的topic-partition;
維護topic下partition的數量,同步和過時策略
kafka將每個topics拆分紅多個partition(0.8.+引入)以便於負載均衡到多個broker上,因爲一個topics的msg被分拆到了多個partition,則 kafka只能保證按一個partition中的msg按順序讓consumer進行消費(除partition所在的broker下線的狀況),並不保證一個topic內多個partition間的msg的消費順序。一個topics的msg劃分到哪一個partition的策略有兩種,一是採用Key Hash算法,一是採用Round Robin算法。
Kafka經過partition log文件在文件系統上存儲msg,msg的寫入和讀取均可以是批量線性的,同時基於read-ahead,write-behind,線性讀寫,系統頁緩存的操做方式使得kafka對partition log文件的操做很是快,而且優於JVM的內存操做效率;傳統的RPC文件讀取流程會經歷四個步驟:磁盤到內核頁緩存,內核頁緩存到用戶空間緩存,用戶空間緩存到內核socket緩存,內核socket緩存到網卡緩存,最終發送給用戶;而利用sendfile和zero-copy技術能夠將內核頁緩存的數據直接複製到網卡緩存,從而可讓kafka實現近似緩存級別的數據操做速度。
Broker上典型的存儲msg的文件格式以下,~/leo-chen/kafka-msg表示log.dirs指向的根目錄,而後是按照topic以及partition劃分的子目錄,[topic-name] - [partition-num],數字表示partition的編號,同一個topic下的partition儘可能不要分佈在一個broker下;
1 leo-chen-broker 2 -> kafka-msg 3 -> topic_report-0 4 -> 34477849968.index 5 -> 34477849968.log 6 -> 35551592052.index 7 -> 35551592052.log 8 -> topic_report-3 9 -> topic_launch_info-0 10 -> topic_api_call-0 11 -> topic_api_call-1
producer向指定的topic發送msg也就是尋找對應的topic目錄,並將一條msg entry添加到文件末尾的過程,一個日誌文件由*.index和*.log組成,前者爲msg的位置索引,後者是msg自己,這樣的存儲設計有以下優點:
#1 segment file的分段存儲方式方便獨立加載,檢索和刪除數據;
#2 獨立存儲索引信息*.index的方式能夠避免冗餘IO操做,快速定位數據;
下圖是一個topic下一個partition log的邏輯抽象圖,全部的partition log files都以topic name做爲根目錄,該broker上存儲的關於該topic的日誌文件都位於此目錄下;每一條日誌由三個部分組成,8個字節的offset用於惟一標記該msg,4個字節的num表示該msg的總長度,n個字節的content表示消費內容,其餘就是一些版本和校驗字段;每個segment file由一個offset區間段的msg組成,文件名是該區間最小的offset,所以獲取消息時指定一個起始offset和maximum chunk size就能夠定位目標msg;將msg分文件存儲的一個好處就是若是指定了過時時間,則刪除過時的msg就只須要簡單刪除獨立的文件。
另外kafka容許給每一個partition設置一個或多個replication,但只有一個partition會做爲leader對外同時提供讀寫服務,其餘的replication僅做爲備份,他們之間的關係由zookeeper進行維護;噹噹前的partition leader下線後,zookeeper維護的臨時節點會由於session失效而自動刪除,所以其餘的follower能夠競爭成爲新的leader,實現故障轉移;kafka會爲每一個partition維護一個a set of in-sync replicas的列表(ISR,總數爲n的集羣裏只要有1個節點存活就能正常工做,不一樣於zookeeper的majority vote策略,須要總數爲2n+1的集羣裏至少有n+1個節點存活才能正常工做,優點在於系統的latency取決於吞吐量最快的node),存儲全部replication中與以前的partition leader同步狀態保持一致的節點,而新的leader也將從這個列表裏誕生,而不須要通過投票過程產生新的leader。
Replication與partition leader同步的過程相似於consumer消費msg的過程,也是順序批量的將partition leader上的消息pull到本地;而kafka會經過兩個狀態值斷定一個replication的健康狀態,一個是replication與zookeeper的heartbeat,另外一個是partition與replication上最大offset的差值,只有知足兩個條件的replication纔會被加入ISR;一個msg只有在全部的replication上都進行同步以後,msg的狀態才設置爲committed,表示這條msg可被consumer消費。
producer使用batch或者async的方式向broker發送消息
producer須要將msg發送到broker以前,會先爲msg指定一個partitionKey,並經過可自定義的hash算法獲取一個partition refer,而後向zookeeper獲取對應partition的host:port;經過這樣的方式保證在一個topic下,全部標記爲partitionKey的msg都會被髮送到同一個partition上;爲了提高性能,producer能夠當msg累積到必定量以後統一將一批消息發送到broker,能夠是累積消息數量,時間間隔,或者累積msg數據大小;Kafka支持gzip,snappy等多種數據壓縮方式。
另外因爲kafka會爲每一個leader partition提供一個或者多個replication以保證容錯性,所以leader partition在收到msg以後會將數據同步到其餘的replication上, producer能夠經過設置acks參數(0|1|-1)要求同步\異步等待成功被同步了msg的replication的數量。一個msg只有在對應的全部replication上都sync以後纔會在partition leader上被標註爲committed狀態,表示能夠被producer消費。
consumer基於consumer group和offset從broker取出msg
kafka定義一個 topic能夠被多個CG(Consumer Group)監聽消費,CG內的consumer能夠消費多個不一樣的partitions,但對於一個topic內的partitions,一個CG下的consumers只能消費不一樣的partitions,也就是對於一個partition而言禁止同一個CG內的consumer進行併發訪問,這樣能夠最小的代價保證一個CG內的msg消費順序。
所以若是想實現topic內的消息廣播(一個msg被全部consumer消費)則爲每個consumer都建立一個CG,若是想實現topic內的消息單播(一個msg只被一個consumer消費)則全部的consumer都放到一個CG裏。通常狀況下必定是一個CG處理一個topic下全部的partition的全部msg,爲了達到最優效率,一個topic下全部partition的數量須要大於等於一個CG內全部consumer的數量(儘可能讓全部consumer都處理工做狀態),同時也要大於全部broker的數量以便於均衡分配到不一樣的broker上。
一個msg是否被消費是經過partition上msg隊列中的位置(offset)決定,所以consumer能夠經過修改offset的值從而讀取partition上任意位置的msg,因爲這個offset是交由consumers進行維護,若是是多個CG消費同一個partition的msg的話,須要各自維護本身的offset,所以不存在鎖的競爭,而且經過簡單增長broker的數量就能夠提高訪問併發量,經過配置producer.property能夠將offset存儲於zookeeper,或者producer本身維護 。
kafka提供三種級別的msg消費一致性語義
#1 at most once:fetch msg, update offset, consume msg,因爲更新offset是在處理msg以前,因此可能出現msg丟失的場景。
#2 at least once:fetch msg, consume msg, update offset,因爲更新offset是在處理msg以後,因此可能出現msg被重複消費的場景,kafka推薦配置。
#3 exactly once:在at least once的基礎上,在處理msg以前添加一個接口冪等性斷定,或者基於2階段提交。
Kafka如何保證消息機制的可靠性
消息系統中因爲參與方較多以及網絡延遲等問題,須要保證幾個點,
#1 保證msg發送成功:producer會同步等待broker的返回,並確認replication同步的結果,以保證消息成功被多個broker保存;但若是設置爲等待全部replication都同步才返回的話會極大下降producer的吞吐量。
#2 保證msg的消費順序與發送順序一致:kafka能夠保證一個broker下一個partition接受到的msg能夠依次發送到consumer,但須要處理幾個常見的問題:
一個問題是因爲網絡緣由致使先發送的msg晚於後發送的msg到達broker/consumer,這樣的問題能夠經過producer在msg上添加version,並在consumer方按照version的前後順序進行消費。
另一個問題就是當一個broker下線以後,即便對應的partition在其餘broker上有replication能夠支持故障轉移,但因爲partition leader被新的replication替代,CG針對原來partition鎖記錄的offset再也不可用,也就是再也不能保證當前msg的消費順序。
#3 保證msg被成功消費後再也不重複消費:在at most once/at least once/exactly once中,kafka使用的是at least once,所以msg有可能被重複消費,而exactly once能夠保證一條消息有且只有一次消費過程,能夠在at least once的基礎上在producer端添加冪等性斷定,因爲不一樣msgId可能表示同一個業務消息,所以還須要從業務層面定製一個全局惟一性的標識 。