Kafka是一個分佈式的、可分區的、可複製的消息系統。它提供了普通消息系統的功能,但具備本身獨特的設計。這個獨特的設計是什麼樣的呢?java
首先讓咱們看幾個基本的消息系統術語:node
Kafka將消息以topic爲單位進行概括。linux
將向Kafka topic發佈消息的程序成爲producers.apache
將預訂topics並消費消息的程序成爲consumer.服務器
Kafka以集羣的方式運行,能夠由一個或多個服務組成,每一個服務叫作一個broker.網絡
producers經過網絡將消息發送到Kafka集羣,集羣向消費者提供消息,以下圖所示:架構
客戶端和服務端經過TCP協議通訊。Kafka提供了Java客戶端,而且對多種語言都提供了支持。併發
相關閱讀:負載均衡
分佈式發佈訂閱消息系統 Kafka 架構設計 http://www.linuxidc.com/Linux/2013-11/92751.htm異步
Apache Kafka 代碼實例 http://www.linuxidc.com/Linux/2013-11/92754.htm
Apache Kafka 教程筆記 http://www.linuxidc.com/Linux/2014-01/94682.htm
先來看一下Kafka提供的一個抽象概念:topic.
一個topic是對一組消息的概括。對每一個topic,Kafka 對它的日誌進行了分區,以下圖所示:
每一個分區都由一系列有序的、不可變的消息組成,這些消息被連續的追加到分區中。分區中的每一個消息都有一個連續的序列號叫作offset,用來在分區中惟一的標識這個消息。
在一個可配置的時間段內,Kafka集羣保留全部發布的消息,無論這些消息有沒有被消費。好比,若是消息的保存策略被設置爲2天,那麼在一個消息被髮布的兩天時間內,它都是能夠被消費的。以後它將被丟棄以釋放空間。Kafka的性能是和數據量無關的常量級的,因此保留太多的數據並非問題。
實際上每一個consumer惟一須要維護的數據是消息在日誌中的位置,也就是offset.這個offset有consumer來維護:通常狀況下隨着consumer不斷的讀取消息,這offset的值不斷增長,但其實consumer能夠以任意的順序讀取消息,好比它能夠將offset設置成爲一箇舊的值來重讀以前的消息。
以上特色的結合,使Kafka consumers很是的輕量級:它們能夠在不對集羣和其餘consumer形成影響的狀況下讀取消息。你可使用命令行來"tail"消息而不會對其餘正在消費消息的consumer形成影響。
將日誌分區能夠達到如下目的:首先這使得每一個日誌的數量不會太大,能夠在單個服務上保存。另外每一個分區能夠單獨發佈和消費,爲併發操做topic提供了一種可能。
每一個分區在Kafka集羣的若干服務中都有副本,這樣這些持有副本的服務能夠共同處理數據和請求,副本數量是能夠配置的。副本使Kafka具有了容錯能力。
每一個分區都由一個服務器做爲「leader」,零或若干服務器做爲「followers」,leader負責處理消息的讀和寫,followers則去複製leader.若是leader down了,followers中的一臺則會自動成爲leader。集羣中的每一個服務都會同時扮演兩個角色:做爲它所持有的一部分分區的leader,同時做爲其餘分區的followers,這樣集羣就會據有較好的負載均衡。
更常見的是,每一個topic都有若干數量的consumer組,每一個組都是一個邏輯上的「訂閱者」,爲了容錯和更好的穩定性,每一個組由若干consumer組成。這其實就是一個發佈-訂閱模式,只不過訂閱者是個組而不是單個consumer。
由兩個機器組成的集羣擁有4個分區 (P0-P3) 2個consumer組. A組有兩個consumerB組有4個
相比傳統的消息系統,Kafka能夠很好的保證有序性。
傳統的隊列在服務器上保存有序的消息,若是多個consumers同時從這個服務器消費消息,服務器就會以消息存儲的順序向consumer分發消息。雖然服務器按順序發佈消息,可是消息是被異步的分發到各consumer上,因此當消息到達時可能已經失去了原來的順序,這意味着併發消費將致使順序錯亂。爲了不故障,這樣的消息系統一般使用「專用consumer」的概念,其實就是隻容許一個消費者消費消息,固然這就意味着失去了併發性。
在這方面Kafka作的更好,經過分區的概念,Kafka能夠在多個consumer組併發的狀況下提供較好的有序性和負載均衡。將每一個分區分只分發給一個consumer組,這樣一個分區就只被這個組的一個consumer消費,就能夠順序的消費這個分區的消息。由於有多個分區,依然能夠在多個consumer組之間進行負載均衡。注意consumer組的數量不能多於分區的數量,也就是有多少分區就容許多少併發消費。
Kafka只能保證一個分區以內消息的有序性,在不一樣的分區之間是不能夠的,這已經能夠知足大部分應用的需求。若是須要topic中全部消息的有序性,那就只能讓這個topic只有一個分區,固然也就只有一個consumer組消費它。
Kafka用到了Zookeeper,全部首先啓動Zookper,下面簡單的啓用一個單實例的Zookkeeper服務。能夠在命令的結尾加個&符號,這樣就能夠啓動後離開控制檯。
> bin/zookeeper-server-start.sh config/zookeeper.properties &
...
如今啓動Kafka:
> bin/kafka-server-start.sh config/server.properties &
Step 3: 建立 topic
建立一個叫作「test」的topic,它只有一個分區,一個副本。
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
能夠經過list命令查看建立的topic:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
除了手動建立topic,還能夠配置broker讓它自動建立topic.
Step 4:發送消息.
Kafka 使用一個簡單的命令行producer,從文件中或者從標準輸入中讀取消息併發送到服務端。默認的每條命令將發送一條消息。
運行producer並在控制檯中輸一些消息,這些消息將被髮送到服務端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a messageThis is another message
ctrl+c能夠退出發送。
Step 5: 啓動consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一個命令行consumer能夠讀取消息並輸出到標準輸出:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
你在一個終端中運行consumer命令行,另外一個終端中運行producer命令行,就能夠在一個終端輸入消息,另外一個終端讀取消息。
這兩個命令都有本身的可選參數,能夠在運行的時候不加任何參數能夠看到幫助信息。
Step 6: 搭建一個多個broker的集羣
剛纔只是啓動了單個broker,如今啓動有3個broker組成的集羣,這些broker節點也都是在本機上的:
首先爲每一個節點編寫配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
在拷貝出的新文件中修改和添加如下參數:
config/server-1.properties:
broker.id=1(修改)
log.dir=/tmp/kafka-logs-1(修改)
port=9093(添加)
config/server-2.properties:
broker.id=2(修改)
log.dir=/tmp/kafka-logs-2 (修改)
port=9094 (添加)
broker.id在集羣中惟一的標註一個節點,由於在同一個機器上,因此必須制定不一樣的端口和日誌文件,避免數據被覆蓋。
We already have Zookeeper and our single node started, so we just need to start the two new nodes:
剛纔已經啓動可Zookeeper和一個節點,如今啓動另外兩個節點:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
建立一個擁有3個副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
如今咱們搭建了一個集羣,怎麼知道每一個節點的信息呢?運行「"describe topics」命令就能夠了:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 1,2,0 Isr: 1,2,0
下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,由於咱們只有一個分區因此下面就只加了一行。
leader:負責處理消息的讀和寫,leader是從全部節點中隨機選擇的,這裏的--replication-factor是副本數,它不能大於broker數
replicas:列出了全部的副本節點,無論節點是否在服務中.
isr:是正在服務中的節點.
在咱們的例子中,節點0是做爲leader運行。
向topic發送消息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1my test message 2^C
消費這些消息:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
測試一下容錯能力.Broker 1做爲leader運行,如今咱們kill掉它:
> ps | grep server-1.properties7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564
另一個節點被選作了leader,node 1 再也不出如今 in-sync 副本列表中:
> bin/kafka-topics.sh --describe --zookeeper localhost:218192 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
雖然最初負責續寫消息的leader down掉了,但以前的消息仍是能夠消費的:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
看來Kafka的容錯機制仍是不錯的。