Kafka是一個分佈式的流數據平臺,可發佈、訂閱消息流,使用zookeeper進行集羣管理。也可做爲一個消息隊列中間件,相似於RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn開源,用Scala語言實現。
Kafka有以下特色:java
// 從官網下載最新版本,這裏爲:kafka_2.11-1.0.0.tgznode
// 解壓git
$ tar -xzf kafka_2.11-1.0.0.tgz $ cd kafka_2.11-1.0.0
// Kafka用到了zookeeper,因此須要啓動zookeeper(新版本內置了zookeeper,若是讀者已有其餘zookeeper啓動了,這步能夠略過)github
$ bin/zookeeper-server-start.sh config/zookeeper.properties
// 修改配置文件,並啓動kafka server:
config/server.properties中的zookeeper.connect默認爲localhost:2181,能夠修改成其餘的zookeeper地址。多個地址間,經過逗號分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默認爲9092端口,經過修改「listeners=PLAINTEXT://:9092」 來指定其餘端口或IP。
配置好後,啓動kafka server:apache
$ bin/kafka-server-start.sh config/server.properties
配置文件目錄下還有consumer.properties和producer.properties,按默認便可。bootstrap
// 建立一個topic,topic名稱爲test瀏覽器
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
能夠經過命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看當前全部的topic.緩存
// 經過producer發送消息網絡
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
往test中發送數據。session
// 啓動一個consumer,拉取消息
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
--from-beginning參數表示從頭開始讀取數據,若是不設置,則只讀取最新的數據。
// 能夠再啓動一個Server
$ bin/kafka-server-start.sh config/server-1.properties &
這裏的server-1.properties是拷貝的server.properties,主要修改以下幾個參數:
# broker id,整數,和其餘broker不能重複 broker.id=2 # 指定端口爲9094,由於在同一臺機器上,須要避免端口衝突。這裏沒有配置IP,默認爲本機 listeners=PLAINTEXT://:9094 # 日誌文件路徑,即topic數據的存儲位置。不一樣的broker,指定不一樣的路徑。 log.dir=/tmp/kafka-logs-2
Producer1和Producer2往Topic A中發送消息,Consumer1/2/3/4/5 從Topic中接收消息。
Kafka Cluster包含兩個Server,分別爲Server1,Server2。
Topic A包含4個Partition,爲:P0, P1, P3, P4,平均分配到Server1和Server2上。
一個Broker就是一個server。多個Broker構成一個kafka集羣,同時對外提供服務,若是某個節點down掉,則從新分配。
注意:集羣和主從熱備不一樣,對於主從熱備,同時只有一個節點提供服務,其餘節點待命狀態。
消息發佈者,Push方式,負責發佈消息到Kafka broker。
消費者,Pull方式,消費消息。每一個consumer屬於一個特定的consuer group。
經過對消息指定主題能夠將消息分類,Consumer能夠只關注特定Topic中的消息。
查看總共有多少個Topic:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
查看某個topic的狀況(分區、副本數等),這裏查看topic爲test的信息:
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
每一個Consumer都會歸到一個Group中。某個Partition中的消息,能夠被多個Group消費,但只能被Group中的一個Consumer消費。因此,若是要對多個Consumer進行消息廣播,則這些Consumer須要放在不一樣的Group中。
當一個Consumer進程或線程掛掉,它所訂閱的Partition會被從新分配到該Group內的其餘Consumer上。若是Consumer A訂閱了多個Partition,那麼當該Group內新增Consumer B時,會從Consumer A中分配出一個Partition給Consumer B。
爲了維持Consumer 與 Consumer Group的關係,須要Consumer週期性的發送heartbeat到coordinator(協調者,在早期版本,以zookeeper做爲協調者。後期版本則以某個broker做爲協調者)。當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。而這個過程,被稱爲rebalance。
Offset是針對Partition的,它用來記錄消費到Partition中的哪條消息了。
Consumer並不維護Offset,而是由Consumer所在的Group維護。所以,Group中的一個Consumer消費了某個Partition中的消息,那麼該組的其餘Consumer就不能重複消費該條消息了,由於Offset已經+1了。
上圖中,Consumer A和Consumer B屬於不一樣的Group。Consumer A所在的Group,在該Partition的Offset=9,表示下次該Group獲取消息時是從9開始獲取;同理,Consumer B所在的Group在該Partition的Offset=11,下次該Group的Consumer獲取消息時,從11開始獲取。
Partition是物理上的概念,每一個Partition對應一個文件夾(默認在/tmp/kafka-logs下,經過server.properties中log.dirs配置)。一個topic能夠對應多個partition,consumer訂閱的其實就是partition。
上圖表示一個Topic,指定了3個分區。在向該Topic寫數據時,會根據均衡策略,往相應的分區中寫。這3個分區中的數據是不同的,它們的數據總和,構成該Topic的數據。
每一個分區中的數據,保證嚴格的寫入順序。
分區會自動根據均衡策略分配到多個broker上。好比有2個broker(或者叫Server):broker1, broker2,建立一個包含4個partition且replication-factor(副本)爲1的topic,那麼對於該topic,每一個broker會被分配2個partition。以下圖:
有兩個Group:Group A和Group B,其中Group A包含C一、C2兩個Consumer;Group B包含C3,C4,C5,C6四個Consumer。
若是向該Topic寫入4條信息:M1, M2, M3, M4。那麼各個Consumer收到的消息是(一種狀況):
C1:M1, M3 C2:M2, M4 C3:M1 C4:M3 C5:M2 C6:M4
C1,C2各接收到2條消息,它們的和爲:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1條消息,它們的和爲:M1,M2,M3,M4。
代表Topic消息,被同一Group內的Consumer均分了。由於每次向Topic中寫入消息時,會被均分至各個Partition,而後各Consumer收到本身所訂閱Partition的消息。同時,這裏也說明了同一個partition內的消息只能被同一個組中的一個consumer消費。
注:若是replication-factor爲3,那麼每一個broker會有6(即2x3)個partition。
另外,建立topic時,在當前的全部broker間進行均分,分好後就不會變了。假設把上述broker1停掉,它的分區不會轉到broker2上。producer在寫消息時,不會再寫入broker2中的分區。
那麼,原先訂閱broker2分區的consumer,不能接收消息了。提示:
WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
對於一個Topic的partition數,增長Broker(即服務節點)並不會增長partition的數量。
驗證:
查看topic信息
$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
再啓動一個新的Broker:
$ bin/kafka-server-start.sh ../config/server-1.properties
啓動後,再用上一步的命令看topic信息,partition數量並未改變。
而且,若是group g1上有兩個consumer,始終只會有一個consumer能收到該topic的消息,另外一個一直處於空閒狀態(光佔着資源不作事)。因此,Topic的Partition數,要大於等於Consumer數量。
默認組的疑問
可能讀者會有疑問,經過命令:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
執行屢次,建立了多個consumer,這些consumer屬於默認的一個組,可是卻能同時收到一個topic的信息。和上述所說的「一個Topic中的消息,只能被group中的一個consumer消費」有衝突。
其實,不指定group名稱,的確會分配默認的group,但每次分配的名稱是不同的,即這裏建立的consumer是屬於不一樣的group的。能夠經過命令查看全部group:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-94070 console-consumer-27823 console-consumer-4826 console-consumer-47050
能夠看出,這裏的group名稱是不同的。
consumer數量和group數量
對於一個topic,若是group中consumer數量比partition數量多,那麼多餘的consumer會空閒。這是由於,group中的某個consumer一旦訂閱了某個partition,則會一直佔用並消費該partition中的信息。除非該consumer退出,不然該partition不會被該組的其餘consumer佔用。因此會致使多餘的consumer空閒,一直收不到消息。
能夠經過命令,查看consumer和partition的對應關係:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4
一個topic能夠對應多個partition,但一個partition只能對應一個topic。
Kafka解決查詢效率的手段之一是將數據文件分段,好比有100條Message,它們的offset是從0到99。假設將數據文件分紅5段,第一段爲0-19,第二段爲20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就能夠定位到該Message在哪一個段中。
上圖展現文件傳輸到Socket的常規方式,步驟:
上圖展現零拷貝方式傳輸文件到Socket,少了文件緩存到用戶空間,再由用戶空間到內核空間的操做。
Kafka採用零拷貝的方式。
經過Replication Factor指定副本的數量,這樣,若是一個Partition出現了問題,那麼能夠從副本中恢復了。
若是不喜歡經過命令行操做,也能夠經過圖形化管理界面,好比yahoo開源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
這裏以CentOS7爲例,進行編譯、運行說明。
注:Kafka Manager的編譯須要javac,須要安裝jdk環境。最新版的須要jdk8版本。
CentOS7默認安裝了OpenJDK,將其卸載,從Oracle官網下載jdk8文件,而後安裝。
// github上下載kafka manager源碼
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager
// 修改配置文件中zookeeper地址
配置文件:conf/application.conf
kafka-manager.zkhosts="127.0.0.1:2181"
若是有多個zookeeper,經過逗號分隔,如:
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
// 將源碼編譯打包成zip包
$ ./sbt clean dist
這一步用到了javac,運行完後,會在當前目錄下生成target文件夾。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip
// 進入zip所在目錄,解壓zip包,啓動服務(默認9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager
// 打開Kafka Manager頁面
瀏覽器輸入地址:http://192.168.0.12:9000/ (這裏的IP須要替換成讀者本身的IP)
很簡潔的一個頁面,第一次打開,什麼都沒有。
// 添加一個Cluster
Cluster Name: 名稱隨意,好比MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,好比:192.168.0.12:2181
Kafka Version: 筆者選的0.11
勾選「Enable JMX Polling」。注意:勾選了該項,啓動kafka server前,須要設置JMX_PORT變量,如:
$ JMX_PORT=9999 $ bin/zookeeper-server-start.sh config/zookeeper.properties
保存後,就能夠經過MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:若是查看不了Consumer信息,提示「Please enable consumer polling here.」,須要勾選一個配置。如:
提示信息:
修改Cluster:
勾選中「Poll consumer information」
保存。具體的管理功能,能夠經過操做頁面進一步挖掘。