架構設計:系統間通訊(28)——Kafka及場景應用(中1)

(接上文《架構設計:系統間通訊(27)——其餘消息中間件及場景應用(上)》)css

在本月初的寫做計劃中,我原本只打算粗略介紹一下Kafka(一樣是由於進度緣由)。可是,最近有不少朋友要求我詳細講講Kafka的設計和使用,另外兩年前我在研究Kafka準備將其應用到生產環境時,因爲沒有仔細理解Kafka的設計結構所致使的問題最後也尚未進行交代。因此我決定即便耽誤一些時間,也要將Kafka的原理和使用場景給讀者詳細討論討論。這樣,也算是對兩年來本身學習和使用Kafka的一個總結。html

四、Kafka及特性

Apache Kafka最初由LinkedIn貢獻,目前它是Apache下的一個頂級開源項目。Apache Kafka設計的首要目標是解決LinkedIn網站中海量的用戶操做行爲記錄、頁面瀏覽記錄,後繼的Apache Kafka版本也都是將「知足高數據吞吐量」做爲版本優化的首要目標。爲了達到這個目標,Apache Kafka甚至在其餘功能方面上作了必定的犧牲,例如:消息的事務性。若是您的系統須要進行單位時間內大量的數據採集工做,那麼能夠考慮在您的系統設計方案中加入Apache Kafka。node

4-一、Kafka集羣安裝

4-1-一、安裝環境介紹

Apache Kafka的安裝過程很是簡單。爲了節約篇幅我不許備像介紹Apache ActiveMQ那樣,專門花費筆墨來介紹它的單機(單服務節點)安裝過程和最簡單的生產者、消費者的編碼過程。而是換一種思路:web

直接介紹Apache Kafka多節點集羣的安裝過程,而且在這個Apache Kafka集羣中爲新的Topic劃分多個分區,演示Apache Kafka的消息負載均衡原理。可能在這個過程當中,我會使用一些您還不太瞭解的詞語(或者某些操做您暫時不會理解其中的緣由),可是沒有關係,您只須要按照我給出的步驟一步一步的作——這些詞語和操做會在後文被逐一解釋。sql

首先咱們列出將要安裝的Kafka集羣中須要的服務節點,以及每一個服務節點在其中的做用:apache

節點位置 節點做用
192.168.61.139 Apache Kafka Brocker 1
192.168.61.138 Apache Kafka Brocker 2
192.168.61.140 zookeeper server

在這個Apache Kafka集羣安裝的演示實例中,咱們準備了兩個Apache Kafka的Brocker服務節點,而且使用其中一個節點充當zookeeper的運行節點。瀏覽器

Apache Kafka集羣須要使用Zookeeper服務進行協調工做,因此安裝Apache Kafka前須要首先安裝和運行Zookeeper服務。因爲這邊文章主要介紹的是Apache Kafka的工做原理,因此怎樣安裝和使用Zookeeper的內容就再也不進行贅述了,不清楚的讀者能夠參考我另外一篇文章:《hadoop系列:zookeeper(1)——zookeeper單點和集羣安裝》。這裏咱們運行zookeeper只是使用了zookeeper服務的單節點工做模式,若是您須要在實際生產環境運行Apache Kafka集羣,那麼zookeeper clusters的服務節點數量至少應該是3個(且使用不一樣的物理機)。bash

4-1-二、Kafka集羣安裝過程

  • 首先咱們在192.168.61.140的服務器上安裝Zookeeper之後,直接啓動zookeeper便可:
zkServer.sh start

您能夠直接使用wget命令,也能夠經過瀏覽器(或者第三方軟件)下載:服務器

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
  • 下載後,運行命令進行壓縮文件的解壓操做:
tar -xvf ./kafka_2.10-0.8.1.1.tgz

筆者習慣將可運行軟件放置在/usr目錄下,您能夠按照您本身的操做習慣或者您所在團隊的規範要求放置解壓後的目錄(正式環境下不建議使用root帳號運行Kafka):架構

mv /root/kafka_2.10-0.8.1.1 /usr/kafka_2.10-0.8.1.1/
  • Apache Kafka全部的管理命令都存放在安裝路徑下的./bin目錄中。因此,若是您但願後續管理方便就能夠設置一下環境變量:
export PATH=/usr/kafka_2.10-0.8.1.1/bin:$PATH
#記得在/etc/profile文件的末尾加入相同的設置
  • Apache Kafka的配置文件存放在安裝路徑下的./config目錄下。以下所示:
-rw-rw-r--. 1 root root 1202 422 2014 consumer.properties
-rw-rw-r--. 1 root root 3828 422 2014 log4j.properties
-rw-rw-r--. 1 root root 2217 422 2014 producer.properties
-rw-rw-r--. 1 root root 5322 428 23:32 server.properties
-rw-rw-r--. 1 root root 3326 422 2014 test-log4j.properties
-rw-rw-r--. 1 root root  995 422 2014 tools-log4j.properties
-rw-rw-r--. 1 root root 1023 422 2014 zookeeper.properties

若是您進行的是Apache Kafka集羣安裝,那麼您只須要關心「server.properties」這個配置文件(其餘配置文件的做用,咱們後續會討論到)。

其中目錄下有一個zookeeper.properties不建議使用。之因此有這個配置文件,是由於Kafka中帶有一個zookeeper運行環境,若是您使用Kafka中的「zookeeper-server-start.sh」命令啓動這個自帶zookeeper環境,纔會用到這個配置文件。

  • 開始編輯server.properties配置文件。這個配置文件中默認的配置項就有不少,可是您沒必要所有進行更改。下面咱們列舉了更改後的配置文件狀況,其中您須要主要關心的屬性使用中文進行了說明(固然原有的註釋也會進行保留):
# The id of the broker. This must be set to a unique integer for each broker.
# 很是重要的一個屬性,在Kafka集羣中每個brocker的id必定要不同,不然啓動時會報錯
broker.id=2

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost

# The number of threads handling network requests
num.network.threads=2

# The number of threads doing disk I/O
# 故名思議,就是有多少個線程同時進行磁盤IO操做。
# 這個值實際上並非設置得越大性能越好。
# 在我後續的「存儲」專題會講到,若是您提供給Kafka使用的文件系統物理層只有一個磁頭在工做
# 那麼這個值就變得沒有任何意義了
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

# A comma seperated list of directories under which to store log files
# 不少開發人員在使用Kafka時,不重視這個屬性。
# 實際上Kafka的工做性能絕大部分就取決於您提供什麼樣的文件系統
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across the brokers.
num.partitions=2

# The number of messages to accept before forcing a flush of data to disk
# 從Page Cache中將消息正式寫入磁盤上的閥值:以待轉儲消息數量爲依據
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 從Page Cache中將消息正式寫入磁盤上的閥值:以轉儲間隔時間爲依據
#log.flush.interval.ms=1000

# The minimum age of a log file to be eligible for deletion
# log消息信息保存時長,默認爲168個小時
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# 默認爲1GB,在此以前log文件不會執行刪除策略
# 實際環境中,因爲磁盤空間根本不是問題,而且內存空間足夠大。因此筆者會將這個值設置的較大,例如100GB。
#log.retention.bytes=1073741824

# The maximum size of a log segment file. 
# When this size is reached a new log segment will be created.
# 默認爲512MB,當達到這個大小,Kafka將爲這個Partition建立一個新的分段文件
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
# 文件刪除的保留策略,多久被檢查一次(單位毫秒)
# 實際生產環境中,6-12小時檢查一次就夠了
log.retention.check.interval.ms=60000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# root directory for all kafka znodes.
# 到zookeeper的鏈接信息,若是有多個zookeeper服務節點,則使用「,」進行分割
# 例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
zookeeper.connect=192.168.61.140:2181

# Timeout in ms for connecting to zookeeper
# zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=1000000

固然以上系統自帶的Brocker服務節點的配置項還不是最完整的,在官網(http://kafka.apache.org/documentation.html#brokerconfigs)上完整的「server.properties」文件的配置屬性和說明信息。

再次強調一下,以上配置屬性中必須按照您本身的環境更改的屬性有:「broker.id」、「log.dirs」以及「zookeeper.connect」。其中每個Kafka服務節點的「broker.id」屬性都必須不同

  • 這樣咱們就完成了其中一個Broker節點的安裝和配置。接下來您須要按照以上描述的步驟進行Kafka集羣中另外一個Broker節點的安裝和配置。必定注意每個Kafka服務節點的「broker.id」屬性都必須不同,在本演示實例中,我設置的broker.id分別爲1和2。

  • 接下來咱們啓動Apache Kafka集羣中已經完成安裝和配置的兩個Broker節點。若是以上全部步驟您都正確完成了,那麼您將會看到相似以下的啓動日誌輸出:

#分別在兩個節點上執行這條命令,以便完成節點啓動:
kafka-server-start.sh /usr/kafka_2.10-0.8.1.1/config/server.properties #若是啓動成功,您將看到相似以下的日誌提示:
...... [2016-04-30 02:53:17,787] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor) [2016-04-30 02:53:17,799] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer) ......
  • 啓動成功後,咱們能夠在某一個Kafka Broker 節點上運行如下命令來建立一個topic。爲了後續進行講解,咱們建立的topic有4個分區和兩個複製因子:
kafka-topics.sh --create --zookeeper 192.168.61.139:2181 --replication-factor 2 --partitions 4 --topic my_topic2

4-1-三、Kafka中的經常使用命令

在安裝Kafka集羣的時候,咱們使用到了Kafka提供的腳本命令進行集羣啓動、topic建立等相關操做。實際上Kafka提供了至關豐富的腳本命令,以便於開發者進行集羣管理、集羣狀態監控、消費者/生產者測試等工做,這裏爲你們列舉一些經常使用的命令:

4-1-3-1 集羣啓動:

kafka-server-start.sh config/server.properties

這個命令帶有一個參數——指定啓動服務所須要的配置文件。默認的配置文件上文已經提到過,存在於Kafka安裝路徑的./config文件夾下,文件名爲server.properties。

4-1-3-2 建立Topic:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

帶有 –create參數的kafka-topics命令腳本用於在Kafka集羣上建立一個新的topic。後續的四個參數爲:

  • zookeeper 該參數用來指定Kafka集羣所使用的zookeeper的地址,這是由於當topic被建立時,zookeeper下的/config/topics目錄中會記錄新的topic的配置信息。

  • replication-factor 複製因子數量。副本是Kafka V0.8.X版本中加入的保證消息可靠性的功能,複製由於是指某一條消息進行復制的副本數量,該功能以集羣中Broker服務節點的數量爲單位。也就是說當Broker服務節點的數量爲X時,複製因子的數量最多爲X。不然在執行topic建立時會報告相似以下的錯誤:

Error while executing topic command replication factor: 3 larger than available brokers: 2

Kafka的複製過程將在本文的後續章節進行介紹。固然,這個參數能夠不進行設置,若是不進行設置該參數的默認值則爲1。

  • partitions 分區數量(默認分區爲1)。一個topic能夠有若干分區,這些分區分佈在Kafka集羣的一個或者多個Broker上。後文咱們將討論到,partition分區是Kafka集羣實現消息負載均衡功能的重要基礎,且topic中partition分區一旦建立就不容許進行動態更改。因此一旦您準備在正式生產環境建立topic,就必定要慎重考慮它的分區數量。

  • topic 新建立的topic的名稱。該參數在建立topic時指定,且在Kafka集羣中topic的名稱必須是惟一的。

4-1-3-3 以生產者身份登陸測試

kafka-console-producer.sh --broker-list localhost:9093 --topic test

# 或者
kafka-console-producer.sh --producer.config client-ssl.properties

使用命令腳本(而不是Kafka提供的各類語言的API),模擬一個消息生產者登陸集羣,主要是爲了測試指定的topic的工做狀況是否正常。能夠有兩種方式做爲消息生產者登陸Kafka集羣:

第一種方式指定broker-list參數和topic參數,broker-list攜帶須要鏈接的一個或者多個broker服務節點;topic爲指定的該消息生產者所使用的topic的名稱。

第二種方式是指定producer生產者配置文件和客戶端ssl加密信息配置文件(後一個文件也可不進行指定,若是您沒有在Kafka集羣中配置ssl加密規則的話)。默認的producer生產者配置文件存放在kafka安裝路徑的./config目錄下,文件名爲producer.properties。

4-1-3-4 以消費者身份登陸測試

kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

一樣您可使用命令腳本的方式,以消息消費者的身份登陸Kafka集羣,目的相同:爲了測試Kafka集羣下您建立的topic是否可以正常工做。該命令有兩個參數:

  • zookeeper 指定的Kafka集羣所使用的zookeeper地址,若是有多個zookeeper節點就是用「,」進行分割。該參數必須進行指定。

  • topic 該參數用於指定使用的topic名稱信息。若是您的topic在kafka集羣下工做正常的話,那麼在成功使用消費者身份登陸後,就能夠收到topic中有生產者發送的消息信息了。

4-1-3-5 查看Topic狀態

kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

以上命令能夠用來查詢指定的topic(my_topic2)的關鍵屬性,包括topic的名稱、分區狀況、每一個分期的主控節點、複製因子、複製序列已經賦值序列的同步狀態等信息。命令可能的結果以下所示:

Topic:my_topic2 PartitionCount:4        ReplicationFactor:2     Configs:
        Topic: my_topic2        Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: my_topic2        Partition: 2    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: my_topic2        Partition: 3    Leader: 1       Replicas: 1,2   Isr: 1,2

請注意這個查詢命令,由於這個查詢命令所反映的結果透露出了Apache Kafka V0.8.X版本的主要設計原理,咱們本節下半部分的內容將從這裏展開。

4-二、Kafka原理:設計結構

一個完整的Apache Kafka解決方案的組成包括四個要素:Producer(消息生產者)、Server Broker(服務代理器)、Zookeeper(協調者)、Consumer(消息消費者)。 Apache Kafka在設計之初就被認爲是集羣化工做的,因此要說清楚Apache Kafa的設計結構除了要講述每個Kafka Broker是如何工做的之外,還要講述清楚整個Apache Kafka集羣是如何工做的。

4-2-一、Kafka Broker工做結構

這裏寫圖片描述

在Apache Kafka的Server Broker設計中,一個獨立進行消息獲取、消息記錄和消息分送操做的隊列稱之爲Topic(和ActiveMQ中Queue或者Topic的概念同屬一個級別)。如下咱們討論的內容都是針對一個Topic而言,後續內容就再也不進行說明了。

  • 上圖描述了一個獨立的Topic構造結構:Apache Kafka將Topic拆分紅多個分區(Partition),這些分區(Partition)可能存在於同一個Broker上也可能存在於不一樣的Broker上。若是您觀察Kafka的文件存儲結構就會發現Kafka會爲Topic中每個分區建立一個獨立的文件加,相似以下所示(如下的Topic——my_topic2一共建立了4個分區):
[root@kafka1 my_topic2-0]# ls
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-0
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-1
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-2
drwxr-xr-x. 2 root root 4096 4月  29 18:32 my_topic2-3
  • 由Producer發送的消息會被分配到各個分區(Partition)中進行存儲,至於它們是按照什麼樣的規則被分配的在後文會進行講述。一條消息記錄只會被分配到一個分區進行存儲,而且這些消息以分區爲單位保持順序排列。這些分區是Apache Kafka性能的第一種保證方式:單位數量相同的消息將分發到存在於多個Broker服務節點上的多個Partition中,並利用每一個Broker服務節點的計算資源進行獨立處理。

  • 每個分區都中會有一個或者多個段(segment)結構。如上圖所示,一個段(segment)結構包含兩種類型的文件:.index後綴的索引文件和.log後綴的數據文件。前一個index文件記錄了消息在整個topic中的序號以及消息在log文件中的偏移位置(offset),經過這兩個信息,Kafka能夠在後一個log文件中找到這條消息的真實內容。

  • 咱們在以前的文章中已經介紹過(在我後續的專題中還會繼續討論這個問題),在磁盤上進行的文件操做只有採用順序讀和順序寫才能作到高效的磁盤I/O性能。這是Kafka保證性能的又一種方式——對索引index文件始終保證順序讀寫:當在磁盤上記錄一條消息時,始終在文件的末尾進行操做;當在磁盤上讀取一條消息時,經過index順序查找到消息的offset位置,再進行消息讀取。後一種消息讀取操做下,若是index文件過大,Kafka的磁盤操做就會耗費掉至關的時間。因此Kafak須要對index文件和log文件進行分段。

  • 實際上Kafka之因此「快」,並不僅是由於它的I/O操做是順序讀寫和多個分區的概念;畢竟相似於AcitveMQ也有多節點集羣的概念,而且後者經過使用LevelDB或者KahaDB這樣的存儲方案也能夠實現磁盤的順序I/O操做。要知道若是消息消費者真正須要到磁盤上尋找數據了,那麼整個Kafka集羣的性能也不會好到哪兒去:目前SATA3串口通信的理論速度也只有6Gpbs,使用SATA3串口通信的固態硬盤,真實的順序讀取最快速度也不過550M/S。

  • Kafka對Linux操做系統下Page Cache技術的應用,纔是其高性能的最大保證。文件內容的組織結構只是其保證消息可靠性的一種方式,真實的業務環境下Kafka通常不須要在磁盤上爲消費者尋找消息記錄(只要您的內存空間夠大)。關於Linux操做系統下的Page Cache技術又是另一個技術話題,我會在隨後推出的「存儲」專題中爲各位讀者進行詳細介紹(LevelDB也應用到了Linux Page Cache技術)。

4-2-二、Kafka Cluster結構

說清楚了單個Kafka Broker結構,咱們再來看看整個Kafka集羣是怎樣工做的。如下視圖描述了某個Topic下的一條消息是如何在Kafka 集羣結構中流動的(實線有向箭頭):

這裏寫圖片描述

  • 整個Kafka集羣中,能夠有多個消息生產者。這些消息生產者可能在同一個物理節點上,也可能在不一樣的物理節點。它們都必須知道哪些Kafka Broker List是將要發送的目標:消息生產者會決定發送的消息將會送入Topic的哪個分區(Partition)

  • 消費者都是按照「組」的單位進行消息隔離:在同一個Topic下,Apache Kafka會爲不一樣的消費者組建立獨立的index索引定位。也就是說當消息生產者發送一條消息後,同一個Topic下不一樣組的消費者都會收到這條信息。

  • 同一組下的消息消費者能夠消費Topic下一個分區或者多個分區中的消息,可是一個分區中的消息只能被同一組下的某一個消息消費者所處理。也就是說,若是某個Topic下只有一個分區,就不能實現消息的負載均衡。另外Topic下的分區數量也只能是固定的,不能夠在使用Topic時動態改變,這些分區在Topic被建立時使用命令行指定或者參考Broker Server中配置的默認值

  • 因爲存在以上的操做規則,因此Kafka集羣中Consumer(消費者)須要和Kafka集羣中的Server Broker進行協調工做:這個協調工做者交給了Zookeeper集羣。zookeeper集羣須要記錄/協調的工做包括:當前整個Kafka集羣中有哪些Broker節點以及每個節點處於什麼狀態(活動/離線/狀態)、當前集羣中全部已建立的Topic以及分區狀況、當前集羣中全部活動的消費者組/消費者、每個消費者組針對每一個topic的索引位置等。

  • 當一個消費者上線,而且在消費消息以前。首先會經過zookeeper協調集羣獲取當前消費組中其餘消費者的鏈接狀態,並獲得當前Topic下可用於消費的分區和該消費者組中其餘消費者的對應關係。若是當前消費者發現Topic下全部的分區都已經有一一對應的消費者了,就將本身置於掛起狀態(和broker、zookeeper的鏈接仍是會創建,可是不會到分區Pull消息),以便在其餘消費者失效後進行接替。

  • 若是當前消費者鏈接時,發現整個Kafka集羣中存在一個消費者(記爲消費者A)關聯Topic下多個分區的狀況,且消費者A處於繁忙沒法處理這些分區下新的消息(即消費者A的上一批Pull的消息尚未處理完成)。這時新的消費者將接替原消費者A所關聯的一個(或者多個)分區,而且一直保持和這個分區的關聯。

  • 因爲Kafka集羣中只保證同一個分區(Partition)下消息隊列中消息的順序。因此當一個或者多個消費者分別Pull一個Topic下的多個消息分區時,您在消費者端觀察的現象可能就是消息順序是混亂的。這裏咱們一直在說消費者端的Pull行爲,是指的Topic下分區中的消息並非由Broker主動推送到(Push)到消費者端,而是由消費者端主動拉取(Pull)。

=========================== (接下文)

相關文章
相關標籤/搜索