Kafka
是一個分佈式流式平臺,它有三個關鍵能力html
消息隊列
或 企業消息傳遞系統
Kafka 能夠創建流數據管道,可靠性的在系統或應用之間獲取數據。java
創建流式應用傳輸和響應數據。shell
Kafka 做爲消息系統,它有三個基本組件數據庫
在大型系統中,會須要和不少子系統作交互,也須要消息傳遞,在諸如此類系統中,你會找到源系統(消息發送方)和 目的系統(消息接收方)。爲了在這樣的消息系統中傳輸數據,你須要有合適的數據管道apache
這種數據的交互看起來就很混亂,若是咱們使用消息傳遞系統,那麼系統就會變得更加簡單和整潔bootstrap
topics
Kafka 有四個核心API,它們分別是windows
Kafka 做爲一個高度可擴展可容錯的消息系統,它有不少基本概念,下面就來認識一下這些 Kafka 專屬的概念服務器
Topic 被稱爲主題,在 kafka 中,使用一個類別屬性來劃分消息的所屬類,劃分消息的這個類稱爲 topic。topic 至關於消息的分配標籤,是一個邏輯概念。主題比如是數據庫的表,或者文件系統中的文件夾。網絡
partition 譯爲分區,topic 中的消息被分割爲一個或多個的 partition,它是一個物理概念,對應到系統上的就是一個或若干個目錄,一個分區就是一個 提交日誌
。消息以追加的形式寫入分區,前後以順序的方式讀取。socket
注意:因爲一個主題包含無數個分區,所以沒法保證在整個 topic 中有序,可是單個 Partition 分區能夠保證有序。消息被迫加寫入每一個分區的尾部。Kafka 經過分區來實現數據冗餘和伸縮性
分區能夠分佈在不一樣的服務器上,也就是說,一個主題能夠跨越多個服務器,以此來提供比單個服務器更強大的性能。
Segment 被譯爲段,將 Partition 進一步細分爲若干個 segment,每一個 segment 文件的大小相等。
Kafka 集羣包含一個或多個服務器,每一個 Kafka 中服務器被稱爲 broker。broker 接收來自生產者的消息,爲消息設置偏移量,並提交消息到磁盤保存。broker 爲消費者提供服務,對讀取分區的請求做出響應,返回已經提交到磁盤上的消息。
broker 是集羣的組成部分,每一個集羣中都會有一個 broker 同時充當了 集羣控制器(Leader)
的角色,它是由集羣中的活躍成員選舉出來的。每一個集羣中的成員都有可能充當 Leader,Leader 負責管理工做,包括將分區分配給 broker 和監控 broker。集羣中,一個分區從屬於一個 Leader,可是一個分區能夠分配給多個 broker(非Leader),這時候會發生分區複製。這種複製的機制爲分區提供了消息冗餘,若是一個 broker 失效,那麼其餘活躍用戶會從新選舉一個 Leader 接管。
生產者,即消息的發佈者,其會將某 topic 的消息發佈到相應的 partition 中。生產者在默認狀況下把消息均衡地分佈到主題的全部分區上,而並不關心特定消息會被寫到哪一個分區。不過,在某些狀況下,生產者會把消息直接寫到指定的分區。
消費者,即消息的使用者,一個消費者能夠消費多個 topic 的消息,對於某一個 topic 的消息,其只會消費同一個 partition 中的消息
在瞭解完 Kafka 的基本概念以後,咱們經過搭建 Kafka 集羣來進一步深入認識一下 Kafka。
在安裝 Kafka 以前,先確保Linux 環境上是否有 Java 環境,使用 java -version
命令查看 Java 版本,推薦使用Jdk 1.8 ,若是沒有安裝 Java 環境的話,能夠按照這篇文章進行安裝(https://www.cnblogs.com/zs-no...)
Kafka 的底層使用 Zookeeper 儲存元數據,確保一致性,因此安裝 Kafka 前須要先安裝 Zookeeper,Kafka 的發行版自帶了 Zookeeper ,能夠直接使用腳原本啓動,不過安裝一個 Zookeeper 也不費勁
Zookeeper 單機搭建比較簡單,直接從 https://www.apache.org/dyn/cl... 官網下載一個穩定版本的 Zookeeper ,這裏我使用的是 3.4.10
,下載完成後,在 Linux 系統中的 /usr/local
目錄下建立 zookeeper 文件夾,使用xftp
工具(xftp 和 xshell 工具均可以在官網 https://www.netsarang.com/zh/... 申請免費的家庭版)把下載好的 zookeeper 壓縮包放到 /usr/local/zookeeper 目錄下。
若是下載的是一個 tar.gz 包的話,直接使用 tar -zxvf zookeeper-3.4.10.tar.gz
解壓便可
若是下載的是 zip 包的話,還要檢查一下 Linux 中是否有 unzip 工具,若是沒有的話,使用 yum install unzip
安裝 zip 解壓工具,完成後使用 unzip zookeeper-3.4.10.zip
解壓便可。
解壓完成後,cd 到 /usr/local/zookeeper/zookeeper-3.4.10
,建立一個 data 文件夾,而後進入到 conf 文件夾下,使用 mv zoo_sample.cfg zoo.cfg
進行重命名操做
而後使用 vi 打開 zoo.cfg ,更改一下dataDir = /usr/local/zookeeper/zookeeper-3.4.10/data
,保存。
進入bin目錄,啓動服務輸入命令 ./zkServer.sh start
輸出下面內容表示搭建成功
關閉服務輸入命令,./zkServer.sh stop
使用 ./zkServer.sh status
能夠查看狀態信息。
準備條件:須要三個服務器,這裏我使用了CentOS7 並安裝了三個虛擬機,併爲各自的虛擬機分配了1GB
的內存,在每一個 /usr/local/
下面新建 zookeeper 文件夾,把 zookeeper 的壓縮包挪過來,解壓,完成後會有 zookeeper-3.4.10 文件夾,進入到文件夾,新建兩個文件夾,分別是 data
和 log
文件夾
注:上一節單機搭建中已經建立了一個data 文件夾,就不須要從新建立了,直接新建一個 log 文件夾,對另外兩個新增的服務須要新建這兩個文件夾。
新建完成後,須要編輯 conf/zoo.cfg 文件,三個文件的內容以下
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/usr/local/zookeeper/zookeeper-3.4.10/data dataLogDir=/usr/local/zookeeper/zookeeper-3.4.10/log clientPort=12181 server.1=192.168.1.7:12888:13888 server.2=192.168.1.8:12888:13888 server.3=192.168.1.9:12888:13888
server.1 中的這個 1 表示的是服務器的標識也能夠是其餘數字,表示這是第幾號服務器,這個標識要和下面咱們配置的 myid
的標識一致能夠。
192.168.1.7:12888:13888
爲集羣中的 ip 地址,第一個端口表示的是 master 與 slave 之間的通訊接口,默認是 2888,第二個端口是leader選舉的端口,集羣剛啓動的時候選舉或者leader掛掉以後進行新的選舉的端口,默認是 3888
如今對上面的配置文件進行解釋
tickTime
: 這個時間是做爲 Zookeeper 服務器之間或客戶端與服務器之間維持心跳
的時間間隔,也就是每一個 tickTime 時間就會發送一個心跳。
initLimit
:這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶鏈接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集羣中鏈接到 Leader 的 Follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 5*2000=10 秒
syncLimit
: 這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒
dataDir
: 快照日誌的存儲路徑
dataLogDir
: 事務日誌的存儲路徑,若是不配置這個那麼事務日誌會默認存儲到dataDir指定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事務日誌、快照日誌太多
clientPort
: 這個端口就是客戶端鏈接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。
在瞭解完其配置文件後,如今來建立每一個集羣節點的 myid ,咱們上面說過,這個 myid 就是 server.1
的這個 1 ,相似的,須要爲集羣中的每一個服務都指定標識,使用 echo
命令進行建立
# server.1 echo "1" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid # server.2 echo "2" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid # server.3 echo "3" > /usr/local/zookeeper/zookeeper-3.4.10/data/myid
配置完成,爲每一個 zk 服務啓動並測試,我在 windows 電腦的測試結果以下
啓動服務(每臺都須要執行)
cd /usr/local/zookeeper/zookeeper-3.4.10/bin ./zkServer.sh start
檢查服務狀態
使用 ./zkServer.sh status
命令檢查服務狀態
192.168.1.7 --- follower
192.168.1.8 --- leader
192.168.1.9 --- follower
zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。
在 /usr/local
下新建 kafka
文件夾,而後把下載完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用 tar -zxvf 壓縮包
進行解壓,解壓完成後,進入到 kafka_2.12-2.3.0 目錄下,新建 log 文件夾,進入到 config 目錄下
咱們能夠看到有不少 properties 配置文件,這裏主要關注 server.properties
這個文件便可。
kafka 啓動方式有兩種,一種是使用 kafka 自帶的 zookeeper 配置文件來啓動(能夠按照官網來進行啓動,並使用單個服務多個節點來模擬集羣http://kafka.apache.org/quick...),一種是經過使用獨立的zk集羣來啓動,這裏推薦使用第二種方式,使用 zk 集羣來啓動
須要爲每一個服務
都修改一下配置項,也就是server.properties
, 須要更新和添加的內容有
broker.id=0 //初始是0,每一個 server 的broker.id 都應該設置爲不同的,就和 myid 同樣 個人三個服務分別設置的是 1,2,3 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設置zookeeper的鏈接端口 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置項的含義
broker.id=0 #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣 port=9092 #當前kafka對外提供服務的端口默認是9092 host.name=192.168.1.7 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除 log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設置zookeeper的鏈接端口
/usr/local/kafka/kafka_2.12-2.3.0/bin
目錄下# 啓動後臺進程 ./kafka-server-start.sh -daemon ../config/server.properties
# 執行命令 jps 6201 QuorumPeerMain 7035 Jps 6972 Kafka
# cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下 bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
對上面的解釋
--replication-factor 2 複製兩份--partitions 1 建立1個分區
--topic 建立主題
查看咱們的主題是否出建立成功
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
啓動一個服務就能把集羣啓動起來
在一臺機器上建立一個發佈者
# 建立一個broker,發佈者 ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一臺服務器上建立一個訂閱者
# 建立一個consumer, 消費者 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:這裏使用 --zookeeper 的話可能出現zookeeper is not a recognized option
的錯誤,這是由於 kafka 版本過高,須要使用--bootstrap-server
指令
測試結果
發佈
消費
顯示 topic
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181 # 顯示 cxuantopic
查看 topic 狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic # 下面是顯示的詳細信息 Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs: Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 # 分區爲爲1 複製因子爲2 主題 cxuantopic 的分區爲0 # Replicas: 0,1 複製的爲1,2
Leader
負責給定分區的全部讀取和寫入的節點,每一個節點都會經過隨機選擇成爲 leader。
Replicas
是爲該分區複製日誌的節點列表,不管它們是 Leader 仍是當前處於活動狀態。
Isr
是同步副本的集合。它是副本列表的子集,當前仍處於活動狀態並追隨Leader。
至此,kafka 集羣搭建完畢。
剛剛咱們都使用的是 相同的ip 服務,下面使用其餘集羣中的節點,驗證是否可以接受到服務
在另外兩個節點上使用
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
而後再使用 broker 進行消息發送,經測試三個節點均可以接受到消息。
在搭建 Kafka 的時候咱們簡單介紹了一下 server.properties
中配置的含義,如今咱們來詳細介紹一下參數的配置和概念
這些參數是 kafka 中最基本的配置
每一個 broker 都須要有一個標識符,使用 broker.id 來表示。它的默認值是 0,它能夠被設置成其餘任意整數,在集羣中須要保證每一個節點的 broker.id 都是惟一的。
若是使用配置樣原本啓動 kafka ,它會監聽 9092 端口,修改 port 配置參數能夠把它設置成其餘任意可用的端口。
用於保存 broker 元數據的地址是經過 zookeeper.connect 來指定。 localhost:2181 表示運行在本地 2181 端口。該配置參數是用逗號分隔的一組 hostname:port/path 列表,每一部分含義以下:
hostname 是 zookeeper 服務器的服務名或 IP 地址
port 是 zookeeper 鏈接的端口
/path 是可選的 zookeeper 路徑,做爲 Kafka 集羣的 chroot 環境。若是不指定,默認使用跟路徑
Kafka 把消息都保存在磁盤上,存放這些日誌片斷的目錄都是經過 log.dirs
來指定的。它是一組用逗號分隔的本地文件系統路徑。若是指定了多個路徑,那麼 broker 會根據 "最少使用" 原則,把同一分區的日誌片斷保存到同一路徑下。要注意,broker 會向擁有最少數目分區的路徑新增分區,而不是向擁有最小磁盤空間的路徑新增分區。
對於以下 3 種狀況,Kafka 會使用可配置的線程池來處理日誌片斷
服務器正常啓動,用於打開每一個分區的日誌片斷;
服務器崩潰後啓動,用於檢查和截斷每一個分區的日誌片斷;
服務器正常關閉,用於關閉日誌片斷
默認狀況下,每一個日誌目錄只使用一個線程。由於這些線程只是在服務器啓動和關閉時會用到,因此徹底能夠設置大量的線程來達到井行操做的目的。特別是對於包含大量分區的服務器來講,一旦發生崩憤,在進行恢復時使用井行操做可能會省下數小時的時間。設置此參數時須要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,若是 num.recovery.threads.per.data.dir 被設爲 8,而且 log.dir 指定了 3 個路徑,那麼總共須要 24 個線程。
默認狀況下,Kafka 會在以下 3 種狀況下建立主題
當一個生產者開始往主題寫入消息時
當一個消費者開始從主題讀取消息時
當任意一個客戶向主題發送元數據請求時
若是你想要刪除一個主題,你可使用主題管理工具。默認狀況下,是不容許刪除主題的,delete.topic.enable 的默認值是 false 所以你不能隨意刪除主題。這是對生產環境的合理性保護,可是在開發環境和測試環境,是能夠容許你刪除主題的,因此,若是你想要刪除主題,須要把 delete.topic.enable 設爲 true。
Kafka 爲新建立的主題提供了不少默認配置參數,下面就來一塊兒認識一下這些參數
num.partitions 參數指定了新建立的主題須要包含多少個分區。若是啓用了主題自動建立功能(該功能是默認啓用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,咱們能夠增長主題分區的個數,但不能減小分區的個數。
這個參數比較簡單,它表示 kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務default.replication.factor 的默認值爲1,這個參數在你啓用了主題自動建立功能後有效。
Kafka 一般根據時間來決定數據能夠保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一週。除此以外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數做用是同樣的,都是決定消息多久之後被刪除,推薦使用 log.retention.ms。
另外一種保留消息的方式是判斷消息是否過時。它的值經過參數 log.retention.bytes
來指定,做用在每個分區上。也就是說,若是有一個包含 8 個分區的主題,而且 log.retention.bytes 被設置爲 1GB,那麼這個主題最多能夠保留 8GB 數據。因此,當主題的分區個數增長時,整個主題能夠保留的數據也隨之增長。
上述的日誌都是做用在日誌片斷上,而不是做用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片斷上,當日志片斷大小到達 log.segment.bytes 指定上限(默認爲 1GB)時,當前日誌片斷就會被關閉,一個新的日誌片斷被打開。若是一個日誌片斷被關閉,就開始等待過時。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而下降磁盤寫入的總體效率。
上面提到日誌片斷經關閉後需等待過時,那麼 log.segment.ms
這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片斷會在大小或時間到達上限時被關閉,就看哪一個條件先獲得知足。
broker 經過設置 message.max.bytes
參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,若是生產者嘗試發送的消息超過這個大小,不只消息不會被接收,還會收到 broker 返回的錯誤消息。跟其餘與字節相關的配置參數同樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小能夠大於這個值
這個值對性能有顯著的影響。值越大,那麼負責處理網絡鏈接和請求的線程就須要花越多的時間來處理這些請求。它還會增長磁盤寫入塊的大小,從而影響 IO 吞吐量。
文章參考:
kafka消費者kafka-console-consumer接收不到數據
《Kafka權威指南》