tar -zxvf .tgz -C 目標目錄
mv kafka_2.11-2.1.1/ kafka_2.11
mkdir logs
cd config/
vi server.properties
#broker的全局惟一編號,不能重複
broker.id=0
#是否容許刪除topic
delete.topic.enable=true
#處理網絡請求的線程數量
num.network.threads=3
#用來處理磁盤IO的線程數量
num.io.threads=8
#發送套接字的緩衝區大小
socket.send.buffer.bytes=102400
#接收套接字的緩衝區大小
socket.receive.buffer.bytes=102400
#請求套接字的最大緩衝區大小
socket.request.max.bytes=104857600
#kafka運行日誌存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當前broker上的分區個數
num.partitions=1
#用來恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#配置鏈接Zookeeper集羣地址
zookeeper.connect=XXXX:2181,XXXX:2181,XXXX:2181
複製代碼
vi /etc/profile
export KAFKA_HOME=kafka安裝路徑
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
scp -r kafka_2.11/ bigdata02:$PWD
scp -r kafka_2.11/ bigdata03:$PWD
broker.id
(注:broker.id不得重複)&
是在後臺啓動)
kafka-server-start.sh config/server.properties &
kafka-server-stop.sh stop
kafka-topics.sh --zookeeper XXXX:2181 --list
kafka-topics.sh --zookeeper XXXX:2181 --create --replication-factor 3 --partitions 1 --topic 名稱
--topic
定義topic名--replication-factor
定義副本數--partitions
定義分區數kafka-topics.sh --zookeeper XXXX:2181 --delete --topic first
server.properties
中設置delete.topic.enable=true
不然只是標記刪除或者直接重啓。kafka-console-producer.sh --broker-list XXXX:9092 --topic first
kafka-console-consumer.sh --bootstrap-server node3:9092 --from-beginning --topic first
--from-beginning
:會把first主題中以往全部的數據都讀取出來。根據業務場景選擇是否增長該配置。kafka-topics.sh --zookeeper XXXX:2181 --describe --topic first
<1>. 寫入方式node
<2>. 分區(Partition)apache
Kafka集羣有多個消息代理服務器(broker-server)組成,發佈到Kafka集羣的每條消息都有一個類別,用主題(topic)來表示。一般,不一樣應用產生不一樣類型的數據,能夠設置不一樣的主題。一個主題通常會有多個消息的訂閱者,當生產者發佈消息到某個主題時,訂閱了這個主題的消費者均可以接收到生成者寫入的新消息。bootstrap
Kafka集羣爲每一個主題維護了分佈式的分區(partition)日誌文件,物理意義上能夠把主題(topic)看做進行了分區的日誌文件(partition log)。主題的每一個分區都是一個有序的、不可變的記錄序列,新的消息會不斷追加到日誌中。分區中的每條消息都會按照時間順序分配到一個單調遞增的順序編號,叫作偏移量(offset),這個偏移量可以惟一地定位當前分區中的每一條消息。緩存
消息發送時都被髮送到一個topic,其本質就是一個目錄,而topic是由一些Partition Logs(分區日誌)組成,其組織結構以下圖所示:安全
下圖中的topic有3個分區,每一個分區的偏移量都從0開始,不一樣分區之間的偏移量都是獨立的,不會相互影響。 bash
能夠看到,每一個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每個消息都被賦予了一個惟一的offset值。服務器
發佈到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區後,都會分配到一個自增的偏移量。**原始的消息內容和分配的偏移量以及其餘一些元數據信息最後都會存儲到分區日誌文件中。**消息的鍵也能夠不用設置,這種狀況下消息會均衡地分佈到不一樣的分區。網絡
分區的緣由:架構
分區的原則:併發
<3>. 副本(Replication) 同一個partition可能會有多個replication(對應 server.properties 配置中的 default.replication.factor=N)。沒有replication的狀況下,一旦broker 宕機,其上全部 patition 的數據都不可被消費,同時producer也不能再將數據存於其上的patition。引入replication以後,同一個partition可能會有多個replication,而這時須要在這些replication之間選出一個leader,producer和consumer只與這個leader交互,其它replication做爲follower從leader 中複製數據。
<4>. 寫入流程
消息由生產者發佈到Kafka集羣后,會被消費者消費。消息的消費模型有兩種:推送模型(push)和拉取模型(pull)。
基於推送模型(push)的消息系統,由消息代理記錄消費者的消費狀態。消息代理在將消息推送到消費者後,標記這條消息爲已消費,但這種方式沒法很好地保證消息被處理。好比,消息代理把消息發送出去後,當消費進程掛掉或者因爲網絡緣由沒有收到這條消息時,就有可能形成消息丟失(由於消息代理已經把這條消息標記爲已消費了,但實際上這條消息並無被實際處理)。若是要保證消息被處理,消息代理髮送完消息後,要設置狀態爲「已發送」,只有收到消費者的確認請求後才更新爲「已消費」,這就須要消息代理中記錄全部的消費狀態,這種作法顯然是不可取的。
Kafka採用拉取模型,**由消費者本身記錄消費狀態,每一個消費者互相獨立地順序讀取每一個分區的消息。**以下圖所示,有兩個消費者(不一樣消費者組)拉取同一個主題的消息,消費者A的消費進度是3,消費者B的消費進度是6。消費者拉取的最大上限經過最高水位(watermark)控制,生產者最新寫入的消息若是尚未達到備份數量,對消費者是不可見的。這種由消費者控制偏移量的優勢是:**消費者能夠按照任意的順序消費消息。**好比,消費者能夠重置到舊的偏移量,從新處理以前已經消費過的消息;或者直接跳到最近的位置,從當前的時刻開始消費。
在一些消息系統中,消息代理會在消息被消費以後當即刪除消息。若是有不一樣類型的消費者訂閱同一個主題,消息代理可能須要冗餘地存儲同一消息;或者等全部消費者都消費完才刪除,這就須要消息代理跟蹤每一個消費者的消費狀態,這種設計很大程度上限制了消息系統的總體吞吐量和處理延遲。Kafka的作法是生產者發佈的全部消息會一致保存在Kafka集羣中,無論消息有沒有被消費。用戶能夠經過設置保留時間來清理過時的數據,好比,設置保留策略爲兩天。那麼,在消息發佈以後,它能夠被不一樣的消費者消費,在兩天以後,過時的消息就會自動清理掉。