一、什麼是Kafka?linux
Kafka是一個使用Scala編寫的消息系統,本來開發自LinkedIn,用做LinkedIn的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。如今它已被多家不一樣類型的公司做爲多種類型的數據管道和消息系統使用。算法
Kafka是一種分佈式的,基於發佈/訂閱的消息系統。服務器
Kafka使用zookeeper做爲其分佈式協調框架,很好的將消息生產、消息存儲、消息消費的過程結合在一塊兒。同時藉助zookeeper,kafka可以生產者、消費者和broker在內的因此組件在無狀態的狀況下,創建起生產者和消費者的訂閱關係,並實現生產者與消費者的負載均衡。網絡
二、kafka的特性架構
(1)以時間複雜度爲O(1)的方式提供消息持久化能力,即便對TB級以上數據也能保證常數時間複雜度的訪問性能。併發
(2)高吞吐率。即便在很是廉價的商用機器上也能作到單機支持每秒100K條以上消息的傳輸。負載均衡
(3)支持Kafka Server間的消息分區,及分佈式消費,同時保證每一個Partition內的消息順序存儲和傳輸。框架
(4)同時支持離線數據處理(Offline)和實時數據處理(Online)。dom
(5)Scale out:支持在線水平擴展。無需停機便可擴展機器。異步
(6)支持按期刪除數據機制。能夠按照時間段來刪除,也能夠按照文檔大小來刪除。
(7)Consumer採用pull的方式消費數據,消費狀態由Consumer控制,減輕Broker負擔。
三、Kafka架構
(1)Broker:和RabbitMQ中的Broker概念相似。一個kafka服務器就是一個Broker,而一個kafka集羣包含一個或多個Broker。Broker會持久化數據到相應的Partition中,不會有cache壓力。
(2)Topic:主題。每條消息都有一個類別,這個類別就叫作Topic。Kafka的Topic能夠理解爲RabbitMQ的Queue消息隊列,相同類別的消息被髮送到同一個Topic中,而後再被此Topic的Consumer消費。Topic是邏輯上的概念,而物理上的實現就是Partition。
(3)Partition:分區。分區是物理上的概念,每一個Topic包含一個或者多個Partition,每一個Partition都是一個有序隊列。發送給Topic的消息通過分區算法(能夠自定義),決定消息存儲在哪個Partition當中。每一條數據都會被分配一個有序id:Offset。注意:kafka只保證按一個partition中的順序將消息發給Consumer,不保證一個Topic的總體(多個partition間)的順序。
(4)Replication:備份。Replication是基於Partition而不是Topic的。每一個Partition都有本身的備份,且分佈在不一樣的Broker上。
(5)Offset:偏移量。kafka的存儲文件都是用offset來命名,用offset作名字的好處是方便查找。例如你想找位於2049的位置,只要找到2048.log的文件便可。固然the first offset就是00000000000.log。注意:每一個Partition中的Offset都是各不影響的從0開始的有序數列。
(6)Producer:消息生產者。
(7)Consumer:消息消費者。Consumer採用pull的方式從Broker獲取消息,而且Consumer要維護消費狀態,所以Kafaka系統中,業務重心通常都在Consumer身上,而不像RabbitMQ那樣Broker作了大部分的事情。
(8)Consumer Group:消費組。每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。每一個Topic能夠被多個Group訂閱,每一個Group中能夠有多個Consumer。發送到Topic的一條消息會被每一個Group中的一個Consumer消費,多個Consumer之間將交錯消費整個Topic的消息,實現負載均衡。
(9)Record:消息。每個消息由一個Key、一個Value和一個時間戳構成。
Kafka內部結構圖(圖片源於網絡)
Kafka拓撲結構圖(圖片源於網絡)
Topic在邏輯上能夠被認爲是一個queue,每條消費都必須指定它的Topic,能夠簡單理解爲必須指明把這條消息放進哪一個queue裏。爲了使得Kafka的吞吐率能夠線性提升,物理上把Topic分紅一個或多個Partition,每一個Partition在物理上對應一個文件夾,該文件夾下存儲這個Partition的全部消息和索引文件。若建立topic1和topic2兩個topic,且分別有13個和19個分區,則整個集羣上會相應會生成共32個文件夾。partiton命名規則爲topic名稱+有序序號,第一個partiton序號從0開始,序號最大值爲partitions數量減1。
(1)每一個partition目錄至關於一個巨型文件被平均分配到多個大小相等segment數據文件中。但每一個segment file消息數量不必定相等,這種特性方便old segment file快速被刪除。
(2)每一個partiton只須要支持順序讀寫就好了,segment文件生命週期由服務端配置參數決定。
(3)segment file組成:由2大部分組成,分別爲index file(後綴「.index」)和data file(後綴「.log」),此2個文件一一對應,成對出現。
(4)segment文件命名規則:partition全局的第一個segment從0開始,後續每一個segment文件名爲上一個segment文件最後一條消息的offset值。數值最大爲64位long大小,19位數字字符長度,沒有數字用0填充。
Segment file結構圖(圖片來源於網絡)
以上述圖2中一對segment file文件爲例,說明segment中index和log文件對應關係物理結構以下:
Index和log文件對應圖(圖片來源於網絡)
其中以索引文件中元數據3,497爲例,依次在數據文件中表示第3個message(在全局partition表示第368772個message)、以及該消息的物理偏移地址爲497。
例如讀取offset=368776的message,須要經過下面2個步驟查找。
(1)第一步查找segment file
上圖爲例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)爲0.第二個文件00000000000000368769.index的消息量起始消息爲368770 = 368769 + 1.一樣,第三個文件00000000000000737337.index的起始消息爲737338=737337 + 1,只要根據offset 進行二分查找文件列表,就能夠快速定位到具體文件。當offset=368776時定位到00000000000000368769.index|log。
(2)第二步經過segment file查找message
經過第一步定位到segment file,當offset=368776時,依次定位到00000000000000368769.index的元數據物理位置和00000000000000368769.log的物理偏移地址,而後再經過00000000000000368769.log順序查找直到offset=368776爲止。
首先來看一條在Linux下建立topic的命令:
bin/kafka-topics.sh --create --zookeeper ip1:2181,ip2:2181,ip3:2181,ip4:2181 --replication-factor 2 --partitions 4 --topic test
此命令的意思是在四個Broker的kafka集羣上建立一個名爲test的Topic,而且有4個分區2個備份(此處比較容易搞混,2個Replication表示Leader和Follower一共加起來有2個)。此時在四臺機器上面就有8個Partition,如圖所示。
Kafka集羣Partition分佈圖1(圖片來源於網絡)
當集羣中新增2節點,Partition增長到6個時分佈狀況以下:
Kafka集羣Partition分佈圖2(圖片來源於網絡)
在Kafka集羣中,每一個Broker都有均等分配Leader Partition機會。
上述圖Broker Partition中,箭頭指向爲副本,以Partition-0爲例:broker1中parition-0爲Leader,Broker2中Partition-0爲副本。每一個Broker(按照BrokerId有序)依次分配主Partition,下一個Broker爲副本,如此循環迭代分配,多副本都遵循此規則。
副本分配算法:
(1)將全部n個Broker和待分配的i個Partition排序。
(2)將第i個Partition分配到第(i mod n)個Broker上。
(3)將第i個Partition的第j個副本分配到第((i + j) mod n)個Broker上
例如圖2中的第三個Partition:partition-2,將被分配到Broker3((3 mod 6)=3)上,partition-2的副本將被分配到Broker4上((3+1) mod 6=4)。
(1)Kafka把topic中一個parition大文件分紅多個小文件段,經過多個小文件段,就容易按期清除或刪除已經消費完文件,減小磁盤佔用。能夠設置segment文件大小按期刪除和消息過時時間按期刪除
(2)經過索引信息能夠快速定位message。
(3)經過index元數據所有映射到memory,能夠避免segment file的IO磁盤操做。
(4)經過索引文件稀疏存儲,能夠大幅下降index文件元數據佔用空間大小。
對於多個Partition,多個Consumer
(1)若是consumer比partition多,是浪費,由於kafka的設計是在一個partition上是不容許併發的,因此consumer數不要大於partition數。
(2)若是consumer比partition少,一個consumer會對應於多個partition,這裏要合理分配consumer數和partition數,不然會致使partition裏面的數據被取的不均勻。最好partiton數目是consumer數目的整數倍,因此partition數目很重要,好比取24,就很容易設定consumer數目。
(3)若是consumer從多個partition讀到數據,不保證數據間的順序性,kafka只保證在一個partition上數據是有序的,但多個partition,根據你讀的順序會有不一樣
(4)增減consumer,broker,partition會致使rebalance,因此rebalance後consumer對應的partition會發生變化
(5)High-level接口中獲取不到數據的時候是會block的。
關於zookeeper中Offset初始值的問題:
Zookeeper中Offset的初始值默認是非法的,所以經過設置Consumer的參數auto.offset.reset來告訴Consumer讀取到Offset非法時該怎麼作。
auto.offset.reset有三個值:
(1)smallest : 自動把zookeeper中的offset設爲Partition中最小的offset;
(2)largest : 自動把zookeeper中offset設爲Partition中最大的offset;
(3)anything else: 拋出異常;
auto.offset.reset默認值是largest,此種狀況下若是producer先發送了10條數據到某個Partition,而後Consumer啓功後修改zookeeper中非法Offset值爲Partition中的最大值9(Offset從0開始),這樣Consumer就忽略了這10條消息。就算如今再次設置成smallest也讀取不到以前的10條數據了,由於此時Offset是合法的了。
因此,想要讀取以前的數據,就須要在一開始指定auto.offset.reset=smallest。
Replication是基於Partition而不是Topic的。每一個Partition都有本身的備份,且分佈在不一樣的Broker上。這些Partition當中有一個是Leader,其餘都是Follower。Leader Partition負責讀寫操做,Follower Partition只負責從Leader處複製數據,使本身與Leader保持一致。Zookeeper負責二者間的故障切換(fail over,能夠理解爲Leader選舉)。
消息複製延遲受最慢的Follower限制,Leader負責跟蹤全部Follower的狀態,若是Follower「落後」太多或者失效,Leader就將此Follower從Replication同步列表中移除,但此時Follower是活着的,而且一直從Leader拉取數據,直到差距小於replica.lag.max.messages值,而後從新加入同步列表。當一條消息被全部的Follower保存成功,此消息才被認爲是「committed」,Consumer才能消費這條消息。這種同步方式就要求Leader和Follower之間要有良好的網絡環境。
一個partition的follower落後於leader足夠多時,會被認爲不在同步副本列表或處於滯後狀態。在Kafka-0.8.2.x中,副本滯後判斷依據是副本落後於leader最大消息數量(replica.lag.max.messages)或replication響應Leader partition的最長等待時間(replica.lag.time.max.ms)。前者是用來檢測緩慢的副本,然後者是用來檢測失效或死亡的副本。假設replica.lag.max.messages設置爲4,代表只要follower落後leader的消息數小於4,就不會從同步副本列表中移除。replica.lag.time.max設置爲500 ms,代表只要follower向leader發送拉取數據請求時間間隔超過500 ms,就會被標記爲死亡,而且會從同步副本列表中移除。
當Leader處於流量高峯時,好比一瞬間就收到了4條數據,此時全部Follower將被認爲是「out-of-sync」而且從同步副本列表中移除,而後Follower拉取數據遇上Leader事後又從新加入同步列表,就這樣Follower頻繁在副本同步列表移除和從新加入之間來回切換。
即便只有一個replicas實例存活,仍然能夠保證消息的正常發送和接收,只要zookeeper集羣存活便可(注意:不一樣於其餘分佈式存儲,好比hbase須要"多數派"存活才行)。
當leader失效時,需在followers中選取出新的leader,可能此時follower落後於leader,所以須要選擇一個"up-to-date"的follower。kafka中leader選舉並無採用"投票多數派"的算法,由於這種算法對於"網絡穩定性"/"投票參與者數量"等條件有較高的要求,並且kafka集羣的設計,還須要容忍N-1個replicas失效。對於kafka而言,每一個partition中全部的replicas信息均可以在zookeeper中得到,那麼選舉leader將是一件很是簡單的事情。選擇follower時須要兼顧一個問題,就是新leader 所在的server服務器上已經承載的partition leader的個數,若是一個server上有過多的partition leader,意味着此server將承受着更多的IO壓力。在選舉新leader,須要考慮到"負載均衡",partition leader較少的broker將會更有可能成爲新的leader。在整個集羣中,只要有一個replicas存活,那麼此partition均可以繼續接受讀寫操做。
當一個Group中,有Consumer加入或者離開時,會觸發Partitions均衡。均衡的最終目的,是提高Topic的併發消費能力。
(1)假如topic1,具備以下partitions: P0,P1,P2,P3
(2)加入group中,有以下consumer: C0,C1
(3)首先根據partition索引號對partitions排序: P0,P1,P2,P3
(4)根據consumer.id排序: C0,C1
(5)計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
(6)而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
經過此算法,就能知道具體Consumer消費的是哪一個分區中的數據。
在kafka-Client-0.11.0.0.jar中,提供的有默認的KafkaProducer和DefaultPartitioner實現。其中DefaultPartitioner主要提供了Producer發送消息到分區的路由算法,若是給定Key值,就經過Key的哈希值和分區個數取餘來計算;若是沒有給定Key,就經過ThreadLocalRandom.current().nextInt()產生的隨機數與分區數取餘(其中涉及複雜步奏參考以下代碼)。具體代碼以下:
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<string,> configs) {} /** * 計算給定記錄的分區 * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<partitioninfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
咱們也能夠設置本身的Partition路由規則,須要繼承Partitioner類實現
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
Kafka的消息delivery保證主要有三種:
(1)At most once 最多一次。消息可能會丟失,但毫不會重複傳輸。
(2)At least once 最少一次。消息毫不會丟失,但可能會重複傳輸。
(3)Exactly once 正好一次。每條消息正好被傳輸一次和消費一次。
Producer的delivery保證能夠經過參數request.required.acks設置來保證:
(1)request.required.acks=0。
至關於消息異步發送。消息一發送完畢立刻發送下一條。因爲不須要ack,可能會形成數據丟失,至關於實現了At most once。
(2)request.required.acks=1。
消息發送給Leader Partition,在Leader Partition確認消息並ack 生產者事後才發下一條。
(3)request.required.acks=-1。
消息發送給Leader,在Leader收到全部Follower確認保存消息的ack後對producer進行ack才發送下一條。
因此一條消息從Producer到Broker至少是確保了At least once的,由於有Replication的存在,只要消息到達Broker就不會丟失。若是ack出現問題,好比網絡中斷,有可能會致使producer收不到ack而重複發送消息。Exactly once這種方式,沒有查到相關的實現。
第(3)種方式的具體步奏以下:
a. producer 先從 zookeeper 的 "/brokers/.../state" 節點找到該 partition 的 leader
b. producer 將消息發送給該 leader
c. leader 將消息寫入本地 log
d. followers 從 leader pull 消息,寫入本地 log 後向 leader 發送 ACK
e. leader 收到全部 ISR 中的 replica 的 ACK 後,增長 HW(high watermark,最後 commit 的 offset) 並向 producer 發送 ACK
Consumer從Broker拉取數據事後,能夠選擇commit,此操做會在zookeeper中存下此Consumer讀取對應Partition消息的Offset,以便下一次拉取數據時會從Partition的下一個Offset消費,避免重複消費。
一樣,Consumer能夠經過設置參數enable.auto.commit=true來自動確認消息,即Consumer一收到消息馬上自動commit。若是隻看消息的讀取過程,kafka是確保了Exactly once的,可是實際狀況中Consumer不可能讀取到數據就結束了,每每還須要處理讀取到的數據。所以Consumer處理消息和commit消息的順序就決定了delivery保證的類別。
(1)先處理後commit
這種方式實現了At least once。Consumer收到消息先處理後提交,若是在處理完成後機器崩潰了,致使Offset沒有更新,Consumer下次啓動時又會從新讀取上一次消費的數據,實際上此消息已經處理過了。
(2)先commit後處理
這種方式實現了At most once。Consumer收到消息事後馬上commit,更新zookeeper上的Offset,而後再處理消息。若是處理還未結束Consumer崩潰了,等Consumer再次啓動的時候會讀取Offset更新事後的下一條數據,這就致使了數據丟失。
Kafka提供了兩種Consumer API,選用哪一種API須要視具體狀況而定。
High Level Consumer API圍繞着Consumer Group這個邏輯概念展開,它屏蔽了每一個Topic的每一個Partition的Offset管理(自動讀取zookeeper中該Partition的last offset )、Broker失敗轉移以及增減Partition、Consumer時的負載均衡(當Partition和Consumer增減時,Kafka自動進行Rebalance)。
Low Level Consumer API,做爲底層的Consumer API,提供了消費Kafka Message更大的控制,用戶能夠實現重複讀取、跳讀等功能。
使用Low Level Consumer API,是沒有對Broker、Consumer、Partition增減進行處理,若是出現這些的增減時,須要本身處理負載均衡。
Low Level Consumer API提供更大靈活控制是以增長複雜性爲代價的:
(1)Offset再也不透明
(2)Broker自動失敗轉移須要處理
(3)增長Consumer、Partition、Broker須要本身作負載均衡