kafka分佈式消息平臺的初探

介紹

Kafka是一個分佈式的流數據平臺,可發佈、訂閱消息流,使用zookeeper進行集羣管理。也可做爲一個消息隊列中間件,相似於RabbitMQ,ActiveMQ,ZeroMQ等。由LinkdIn開源,用Scala語言實現。
Kafka有以下特色:java

  • kafka利用線性存儲來進行硬盤讀寫,速度快;
  • 以時間複雜度爲O(1)的方式提供消息持久化能力,即便對TB級以上數據也能保證常數時間的訪問性能。所以不清除存儲的數據並不會影響性能;
  • zero-copy Gzip和Snappy壓縮
  • 已消費的消息不會自動刪除
  • 考慮到高效性,對事務的支持較弱。

應用場景

圖片描述

安裝使用

// 從官網下載最新版本,這裏爲:kafka_2.11-1.0.0.tgznode

// 解壓git

$ tar -xzf kafka_2.11-1.0.0.tgz
$ cd kafka_2.11-1.0.0

// Kafka用到了zookeeper,因此須要啓動zookeeper(新版本內置了zookeeper,若是讀者已有其餘zookeeper啓動了,這步能夠略過)github

$ bin/zookeeper-server-start.sh config/zookeeper.properties

// 修改配置文件,並啓動kafka server:
config/server.properties中的zookeeper.connect默認爲localhost:2181,能夠修改成其餘的zookeeper地址。多個地址間,經過逗號分隔,如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
默認爲9092端口,經過修改「listeners=PLAINTEXT://:9092」 來指定其餘端口或IP。
配置好後,啓動kafka server:apache

$ bin/kafka-server-start.sh config/server.properties

配置文件目錄下還有consumer.properties和producer.properties,按默認便可。bootstrap

// 建立一個topic,topic名稱爲test瀏覽器

$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

能夠經過命令:bin/kafka-topics.sh --list --zookeeper localhost:2181查看當前全部的topic.緩存

// 經過producer發送消息網絡

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

往test中發送數據。session

// 啓動一個consumer,拉取消息

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

--from-beginning參數表示從頭開始讀取數據,若是不設置,則只讀取最新的數據。

// 能夠再啓動一個Server

$ bin/kafka-server-start.sh config/server-1.properties &

這裏的server-1.properties是拷貝的server.properties,主要修改以下幾個參數:

# broker id,整數,和其餘broker不能重複
broker.id=2
# 指定端口爲9094,由於在同一臺機器上,須要避免端口衝突。這裏沒有配置IP,默認爲本機
listeners=PLAINTEXT://:9094
# 日誌文件路徑,即topic數據的存儲位置。不一樣的broker,指定不一樣的路徑。
log.dir=/tmp/kafka-logs-2

示意圖

圖片描述

Producer1和Producer2往Topic A中發送消息,Consumer1/2/3/4/5 從Topic中接收消息。
Kafka Cluster包含兩個Server,分別爲Server1,Server2。
Topic A包含4個Partition,爲:P0, P1, P3, P4,平均分配到Server1和Server2上。

Broker

一個Broker就是一個server。多個Broker構成一個kafka集羣,同時對外提供服務,若是某個節點down掉,則從新分配。
注意:集羣和主從熱備不一樣,對於主從熱備,同時只有一個節點提供服務,其餘節點待命狀態。

Producer

消息發佈者,Push方式,負責發佈消息到Kafka broker。

Consumer

消費者,Pull方式,消費消息。每一個consumer屬於一個特定的consuer group。

主題(Topic)

經過對消息指定主題能夠將消息分類,Consumer能夠只關注特定Topic中的消息。
查看總共有多少個Topic:

$ bin/kafka-topics.sh --list --zookeeper localhost:2181

查看某個topic的狀況(分區、副本數等),這裏查看topic爲test的信息:

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

消費者組(Consumer Group)

每一個Consumer都會歸到一個Group中。某個Partition中的消息,能夠被多個Group消費,但只能被Group中的一個Consumer消費。因此,若是要對多個Consumer進行消息廣播,則這些Consumer須要放在不一樣的Group中。
當一個Consumer進程或線程掛掉,它所訂閱的Partition會被從新分配到該Group內的其餘Consumer上。若是Consumer A訂閱了多個Partition,那麼當該Group內新增Consumer B時,會從Consumer A中分配出一個Partition給Consumer B。

爲了維持Consumer 與 Consumer Group的關係,須要Consumer週期性的發送heartbeat到coordinator(協調者,在早期版本,以zookeeper做爲協調者。後期版本則以某個broker做爲協調者)。當Consumer因爲某種緣由不能發Heartbeat到coordinator時,而且時間超過session.timeout.ms時,就會認爲該consumer已退出,它所訂閱的partition會分配到同一group 內的其它的consumer上。而這個過程,被稱爲rebalance。

位移(Offset)

Offset是針對Partition的,它用來記錄消費到Partition中的哪條消息了。
Consumer並不維護Offset,而是由Consumer所在的Group維護。所以,Group中的一個Consumer消費了某個Partition中的消息,那麼該組的其餘Consumer就不能重複消費該條消息了,由於Offset已經+1了。
圖片描述

上圖中,Consumer A和Consumer B屬於不一樣的Group。Consumer A所在的Group,在該Partition的Offset=9,表示下次該Group獲取消息時是從9開始獲取;同理,Consumer B所在的Group在該Partition的Offset=11,下次該Group的Consumer獲取消息時,從11開始獲取。

分區(Partition)

Partition是物理上的概念,每一個Partition對應一個文件夾(默認在/tmp/kafka-logs下,經過server.properties中log.dirs配置)。一個topic能夠對應多個partition,consumer訂閱的其實就是partition。
圖片描述

上圖表示一個Topic,指定了3個分區。在向該Topic寫數據時,會根據均衡策略,往相應的分區中寫。這3個分區中的數據是不同的,它們的數據總和,構成該Topic的數據。
每一個分區中的數據,保證嚴格的寫入順序。

分區會自動根據均衡策略分配到多個broker上。好比有2個broker(或者叫Server):broker1, broker2,建立一個包含4個partition且replication-factor(副本)爲1的topic,那麼對於該topic,每一個broker會被分配2個partition。以下圖:
圖片描述

有兩個Group:Group A和Group B,其中Group A包含C一、C2兩個Consumer;Group B包含C3,C4,C5,C6四個Consumer。
若是向該Topic寫入4條信息:M1, M2, M3, M4。那麼各個Consumer收到的消息是(一種狀況):

C1:M1, M3
C2:M2, M4

C3:M1
C4:M3
C5:M2
C6:M4

C1,C2各接收到2條消息,它們的和爲:M1,M2,M3,M4。
C3,C4,C5,C6各接收到1條消息,它們的和爲:M1,M2,M3,M4。
代表Topic消息,被同一Group內的Consumer均分了。由於每次向Topic中寫入消息時,會被均分至各個Partition,而後各Consumer收到本身所訂閱Partition的消息。同時,這裏也說明了同一個partition內的消息只能被同一個組中的一個consumer消費
注:若是replication-factor爲3,那麼每一個broker會有6(即2x3)個partition。

另外,建立topic時,在當前的全部broker間進行均分,分好後就不會變了。假設把上述broker1停掉,它的分區不會轉到broker2上。producer在寫消息時,不會再寫入broker2中的分區。
那麼,原先訂閱broker2分區的consumer,不能接收消息了。提示:

WARN [Consumer clientId=consumer-1, groupId=g4] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

對於一個Topic的partition數,增長Broker(即服務節點)並不會增長partition的數量。
驗證:
查看topic信息

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

再啓動一個新的Broker:

$ bin/kafka-server-start.sh ../config/server-1.properties

啓動後,再用上一步的命令看topic信息,partition數量並未改變。
而且,若是group g1上有兩個consumer,始終只會有一個consumer能收到該topic的消息,另外一個一直處於空閒狀態(光佔着資源不作事)。因此,Topic的Partition數,要大於等於Consumer數量。

默認組的疑問
可能讀者會有疑問,經過命令:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

執行屢次,建立了多個consumer,這些consumer屬於默認的一個組,可是卻能同時收到一個topic的信息。和上述所說的「一個Topic中的消息,只能被group中的一個consumer消費」有衝突。
其實,不指定group名稱,的確會分配默認的group,但每次分配的名稱是不同的,即這裏建立的consumer是屬於不一樣的group的。能夠經過命令查看全部group:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.

console-consumer-94070
console-consumer-27823
console-consumer-4826
console-consumer-47050

能夠看出,這裏的group名稱是不同的。

consumer數量和group數量
對於一個topic,若是group中consumer數量比partition數量多,那麼多餘的consumer會空閒。這是由於,group中的某個consumer一旦訂閱了某個partition,則會一直佔用並消費該partition中的信息。除非該consumer退出,不然該partition不會被該組的其餘consumer佔用。因此會致使多餘的consumer空閒,一直收不到消息。
能夠經過命令,查看consumer和partition的對應關係:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group g4

一個topic能夠對應多個partition,但一個partition只能對應一個topic。

數據文件分段

Kafka解決查詢效率的手段之一是將數據文件分段,好比有100條Message,它們的offset是從0到99。假設將數據文件分紅5段,第一段爲0-19,第二段爲20-39,以此類推,每段放在一個單獨的數據文件裏面,數據文件以該段中最小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就能夠定位到該Message在哪一個段中。
圖片描述

零拷貝(圖來自網絡)

圖片描述

上圖展現文件傳輸到Socket的常規方式,步驟:

  1. 操做系統將文件數據從磁盤讀入到內核空間的頁緩存;
  2. 應用程序將數據從內核空間讀入到用戶空間緩存中;
  3. 應用程序將數據寫回到內核空間到socket緩存中;
  4. 操做系統將數據從socket緩衝區複製到網卡緩衝區,以便將數據經網絡發出。

圖片描述

上圖展現零拷貝方式傳輸文件到Socket,少了文件緩存到用戶空間,再由用戶空間到內核空間的操做。
Kafka採用零拷貝的方式。

Partition備份

經過Replication Factor指定副本的數量,這樣,若是一個Partition出現了問題,那麼能夠從副本中恢復了。

Kafka Manager安裝和使用

若是不喜歡經過命令行操做,也能夠經過圖形化管理界面,好比yahoo開源的Kafka Manager。
地址:https://github.com/yahoo/kafk...
這裏以CentOS7爲例,進行編譯、運行說明。
注:Kafka Manager的編譯須要javac,須要安裝jdk環境。最新版的須要jdk8版本。
CentOS7默認安裝了OpenJDK,將其卸載,從Oracle官網下載jdk8文件,而後安裝。

// github上下載kafka manager源碼
$ git clone https://github.com/yahoo/kafk...
$ cd kafka-manager

// 修改配置文件中zookeeper地址
配置文件:conf/application.conf

kafka-manager.zkhosts="127.0.0.1:2181"

若是有多個zookeeper,經過逗號分隔,如:

kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"

// 將源碼編譯打包成zip包
$ ./sbt clean dist
這一步用到了javac,運行完後,會在當前目錄下生成target文件夾。生成zip包地址:
target/universal/kafka-manager-1.3.3.16.zip

// 進入zip所在目錄,解壓zip包,啓動服務(默認9000端口)
$ cd target/universal
$ unzip kafka-manager-1.3.3.16
$ ./kafka-manager-1.3.3.16/bin/kafka-manager

// 打開Kafka Manager頁面
瀏覽器輸入地址:http://192.168.0.12:9000/ (這裏的IP須要替換成讀者本身的IP)
圖片描述

很簡潔的一個頁面,第一次打開,什麼都沒有。

// 添加一個Cluster
圖片描述

Cluster Name: 名稱隨意,好比MyFirstCluster
Cluster Zookeeper Hosts: zookeeper的地址,好比:192.168.0.12:2181
Kafka Version: 筆者選的0.11
勾選「Enable JMX Polling」。注意:勾選了該項,啓動kafka server前,須要設置JMX_PORT變量,如:
$ JMX_PORT=9999
$ bin/zookeeper-server-start.sh config/zookeeper.properties

保存後,就能夠經過MyFirstCluster,查看Broker, Topic, Partition, Consumer等信息了。
注:若是查看不了Consumer信息,提示「Please enable consumer polling here.」,須要勾選一個配置。如:
提示信息:
圖片描述

修改Cluster:
圖片描述

勾選中「Poll consumer information」
圖片描述

保存。具體的管理功能,能夠經過操做頁面進一步挖掘。

相關文章
相關標籤/搜索