Kafka 轉載

轉載自:https://fangyeqing.github.io/2016/10/28/kafka---%E4%BB%8B%E7%BB%8D/

kafka---介紹


Kafka是一種分佈式的消息系統。本文基於0.9.0版本,新版kafka加入了流處理組件kafka stream,最新的官方文檔又自稱分佈式流處理平臺。前端

概念

  • Broker
    Kafka的節點。kafka集羣包含一個或多個broker
  • Producer
    消息的生產者。負責發佈消息到Kafka broker
  • Consumer
    消息的消費者。每一個consumer屬於一個特定的consumer group(若不指定group id則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。
  • Topic
    消息主題。例如pv日誌、click日誌、轉化日誌均可以做爲topic。
  • Partition
    topic物理上的分組。每一個topic包含一個或多個partition,建立topic時可指定parition數量。每一個partition是一個有序的隊列,對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件。在發送一條消息時,生產者能夠指定這條消息的key和分區機制來發送到不一樣的分區。
  • offset
    每一個partition中的每條消息被分配的有序id,是消息的惟一標識。每一個partition都由一系列有序的、不可變的消息組成,這些消息被連續的追加到partition中。

    架構

    image
    producers(生產者)經過網絡將不一樣topic的messages(消息)發送到Kafka 集羣,consumers(消費者)在集羣訂閱本身想要消費的topic。

一個典型的kafka集羣中包含若干producer(能夠是web前端產生的page view,或者是服務器日誌,系統CPU、memory等),若干broker(Kafka支持水平擴展,通常broker數量越多,集羣吞吐率越高),若干consumer group,以及一個Zookeeper集羣。Kafka經過Zookeeper管理集羣配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將消息發佈到broker,consumer使用pull模式從broker訂閱並消費消息。  node

topic&&partitions

對於每一個Topic,Kafka會爲其維護一個以下圖所示的分區的日誌文件
image
每一個partition(分區)是一個有序的、不可修改的消息組成的隊列;這些消息是被不斷的appended到這個commit log(提交日誌文件)上的。在這些patitions之中的每一個消息都會被賦予一個叫作offset的順序id編號,用來在partition之中惟一性的標示這個消息。git

Kafka集羣會保存一個時間段內全部被髮布出來的信息,不管這個消息是否已經被消費過。github

partition內有序

Kafka僅僅提供提供partition以內的消息的全局有序,在不一樣的partition之間不能擔保。partition的消息有序性加上能夠按照指定的key劃分消息的partition,這基本上知足了大部分應用的需求。若是你必需要實現一個全局有序的消息隊列,那麼能夠採用Topic只劃分1個partition來實現。可是這就意味着你的每一個消費組只能有惟一的一個消費者進程。web

Consumer Group

每個consumer實例都屬於一個consumer group,每一條消息都會被全部訂閱了該topic的consumer group消費。經過group id指定consumer group。至關於同一個consumer group的消費者會瓜分全部的分區,每一個consumer會消費一個或多個分區。apache

使用high-level consumer時,同一個consumer group裏只有一個consumer能消費到該消息。服務器

由於high level不用client關心offset, 會自動的讀zookeeper中該Consumer group的last offset,至關於全部consumer都公用這個offset。當其中一個consumer消費一條消息時,offset就移動到下一條。網絡

image

不一樣形式的消息播發

訂閱模式:每一個Consumer都採用不一樣的group,每一條消息都會發送給全部消費者
消息隊列模式:全部的Consumer在同一個Group裏,消費者之間負載均衡session

Producer&&Consumer&&partitions

新建topic時,經過–partitions 能夠設置分區數。能夠指定partitions數爲broker的整數倍,這樣,每一個broker會對應相同個數的partitions。架構

生產者在生產數據的時候,能夠爲每條消息指定Key,這樣消息被髮送到broker時,會根據分區規則選擇被存儲到哪個partitions中。若是分區規則設置的合理,那麼全部的消息將會被均勻的分佈到不一樣的partitions中,這樣就實現了負載均衡和水平擴展。

Kafka保證同一consumer group中只有一個consumer會消費某條消息,實際上,Kafka保證的是穩定狀態下每個consumer實例只會消費某一個或多個特定partition的數據,而某個partition的數據只會被某一個特定的consumer實例所消費。其中consumer和partition數量關係以下表所示:

consumer和partition數量關係 消費狀況
小於 至少有一個consumer會消費多個partition的數據
相等 一個consumer消費一個partition的數據
大於 部分consumer沒法消費該topic下任何一條消息,浪費

增減consumer,broker,partition會致使rebalance,rebalance後consumer對應的partition會發生變化,在後面的實例中也能夠看到。

測試

利用kafka中自帶的生產者和消費者例子來作個簡單的測試。具體步驟在另外一篇kakfa部署中。

kafka集羣有3個broker節點。新建一個partitions數量爲3的topic。啓動一個A終端爲生產者,啓動B、C、D、E終端爲消費者。C、D、E終端爲一個consumer group,B爲單獨的一個consumer group。

在producer終端輸入消息從1-20。能夠看到B終端會輸出1-20所有消息,圖中B所示。而C、D、E終端因爲屬於同一個Consumer Group,partitions數量等於consumer,每一個consumer消費了一個partition裏的消息。圖中爲C、D、E。

將C終端斷開,剩下B、D、E去消費消息。B終端仍是會輸出1-20所有消息,圖中爲B1所示。而D、E屬於同一個Consumer Group,且consumer數量少於partition數,能夠看到D消費了兩個partition中的數據,見圖中D1所示。
image

Replication&&broker節點故障處理

Replication

replication策略是基於partition。kafka經過建立topic時能夠經過–replication-factor配置partition副本數。配置副本以後,每一個partition都有一個惟一的leader,有0個或多個follower。

全部的讀寫操做都在leader上完成,leader批量從leader上pull數據。followers從leader消費消息來複制message,就跟普通的consumer消費消息同樣。

通常狀況下partition的數量大於等於broker的數量,而且全部partition的leader均勻分佈在broker上。

follower故障處理

broker是否alive包含兩個條件:

  • 一是它必須維護與Zookeeper的session(這個經過Zookeeper的heartbeat機制來實現)。
  • 二是follower必須可以及時將leader的writing複製過來,不能「落後太多」。

leader會track「in sync」的node list。若是一個follower宕機,或者落後太多,leader將把它從」in sync」 list中移除。

一條消息只有被「in sync」 list裏的全部follower都從leader複製過去纔會被認爲已提交。這樣就避免了部分數據被寫進了leader,還沒來得及被任何follower複製就宕機了,而形成數據丟失(consumer沒法消費這些數據)

而對於producer而言,它能夠選擇是否等待消息commit,這能夠經過producer的「acks」來設置。默認爲acks=all ,這意味着leader將等待全部follower複製完消息。

leader故障處理

leader掛掉後,怎樣在follower中選舉出新的leader?

Kafka在Zookeeper中動態維護了一個ISR(in-sync replicas) set,這個set裏的全部replica都跟上了leader,只有ISR裏的成員纔有被選爲leader的可能。

若是某一個partition的全部replica都掛了,就沒法保證數據不丟失了。這種狀況下有兩種可行的方案:

  • 一致性高:等待ISR中的任一個replica「活」過來,而且選它做爲leader。可能會等待比較長的時間
  • 可用性高:選擇第一個「活」過來的replica(不必定是ISR中的)做爲leader。有可能會丟失數據

kafka採用第二種方案,能夠經過配置unclean.leader.election.enable來關閉這種方案。

測試

kafka集羣有3個broker節點。具體部署在另外一篇kafka部署中。

作個簡單的測試,建立一個3分區的topic,不指定副本數,能夠看到默認一個副本,Partition均勻分佈在各broker。

1
2
3
4
5
6
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partitions
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partitions
Topic:test3partitions PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3partitions Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0

 

建立一個3分區2副本的topic,能夠看到Replicas和Isr中有1個follower。例如Partitions:0的Leader爲broker:1,follower爲broker:2,而且2在Isr中,理論上當Leader掛掉以後,2會頂上。

1
2
3
4
5
6
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition2replication --replication-factor 2
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition2replication
Topic:test3partition2replication PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test3partition2replication Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test3partition2replication Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test3partition2replication Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1

 

建立一個3分區2副本的topic,能夠看到Replicas和Isr中有2個follower

1
2
3
4
5
6
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition3replication --replication-factor 3
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition3replication
Topic:test3partition3replication PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test3partition3replication Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test3partition3replication Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test3partition3replication Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

 

Message delivery guarantees

目前有下面幾種消息確保機制:

  • At most once 消息可能會丟,但毫不會重複傳輸
  • At least one 消息毫不會丟,但可能會重複傳輸
  • Exactly once 每條消息確定會被傳輸一次且僅傳輸一次,不少時候這是用戶所想要的。

Kafka默認保證At least once,而且容許經過設置producer異步提交來實現At most once。下面分階段分析:

Producer向broker發送消息

當Producer向broker發送消息,由上述Replication的分析可知,一旦這條消息已經被commit,若是這個topic有多個replication(副本),某個broker掛掉也不會丟失消息。

Producer發送數據給broker的過程當中,若是遇到網絡問題而形成通訊中斷:

  • At least once:Producer就沒法判斷該條消息是否已經commit,再重複提交時就會是At least once。
  • Exactly once:在之後的版本中producer能夠生成一種相似於primary key的東西,發生故障時冪等性的retry屢次。
  • At most once:Producer異步提交來實現At most once

Consumer從broker消費消息

當Consumer從broker消費消息時,consumer若是在消費消息時crash:

  • At least once:讀完消息先處理再commit消費狀態(保存offset)
  • At most once:讀完消息先commit消費狀態(保存offset)再處理消息
  • Exactly once:須要協調offset和實際操做的輸出,目前比較麻煩。離線數據能夠作到去重,利用Camus或者Gobbin將kafka topic落地到HDFS,而後作去重便可。其中Camus能夠參考個人另外一篇博客Camus介紹
相關文章
相關標籤/搜索