啓動服務(每臺都須要執行)java
cd /usr/local/zookeeper/zookeeper-3.4.10/bin ./zkServer.sh start
檢查服務狀態linux
使用 ./zkServer.sh status
命令檢查服務狀態程序員
192.168.1.7 --- follower面試
192.168.1.8 --- leaderapache
192.168.1.9 --- followerbootstrap
zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。服務器
在 /usr/local
下新建 kafka
文件夾,而後把下載完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用 tar -zxvf 壓縮包
進行解壓,解壓完成後,進入到 kafka_2.12-2.3.0 目錄下,新建 log 文件夾,進入到 config 目錄下markdown
咱們能夠看到有不少 properties 配置文件,這裏主要關注 server.properties
這個文件便可。網絡
kafka 啓動方式有兩種,一種是使用 kafka 自帶的 zookeeper 配置文件來啓動(能夠按照官網來進行啓動,並使用單個服務多個節點來模擬集羣http://kafka.apache.org/quickstart#quickstart_multibroker),一種是經過使用獨立的zk集羣來啓動,這裏推薦使用第二種方式,使用 zk 集羣來啓動socket
須要爲每一個服務
都修改一下配置項,也就是server.properties
, 須要更新和添加的內容有
broker.id=0 //初始是0,每一個 server 的broker.id 都應該設置爲不同的,就和 myid 同樣 個人三個服務分別設置的是 1,2,3 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設置zookeeper的鏈接端口 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181
配置項的含義
broker.id=0 #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣 port=9092 #當前kafka對外提供服務的端口默認是9092 host.name=192.168.1.7 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。 num.network.threads=3 #這個是borker進行網絡處理的線程數 num.io.threads=8 #這個是borker進行I/O處理的線程數 log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個 socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能 socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤 socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小 num.partitions=1 #默認的分區數,一個topic默認1個分區數 log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天 message.max.byte=5242880 #消息保存的最大值5M default.replication.factor=2 #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務 replica.fetch.max.bytes=5242880 #取消息的最大直接數 log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除 log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能 zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設置zookeeper的鏈接端口
/usr/local/kafka/kafka_2.12-2.3.0/bin
目錄下# 啓動後臺進程 ./kafka-server-start.sh -daemon ../config/server.properties
# 執行命令 jps 6201 QuorumPeerMain 7035 Jps 6972 Kafka
# cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下 bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan
對上面的解釋
--replication-factor 2 複製兩份
--partitions 1 建立1個分區
--topic 建立主題
查看咱們的主題是否出建立成功
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181
啓動一個服務就能把集羣啓動起來
在一臺機器上建立一個發佈者
# 建立一個broker,發佈者 ./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic
在一臺服務器上建立一個訂閱者
# 建立一個consumer, 消費者 bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
注意:這裏使用 --zookeeper 的話可能出現
zookeeper is not a recognized option
的錯誤,這是由於 kafka 版本過高,須要使用--bootstrap-server
指令
測試結果
發佈
消費
顯示 topic
bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181 # 顯示 cxuantopic
查看 topic 狀態
bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic # 下面是顯示的詳細信息 Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs: Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 # 分區爲爲1 複製因子爲2 主題 cxuantopic 的分區爲0 # Replicas: 0,1 複製的爲1,2
Leader
負責給定分區的全部讀取和寫入的節點,每一個節點都會經過隨機選擇成爲 leader。
Replicas
是爲該分區複製日誌的節點列表,不管它們是 Leader 仍是當前處於活動狀態。
Isr
是同步副本的集合。它是副本列表的子集,當前仍處於活動狀態並追隨Leader。
至此,kafka 集羣搭建完畢。
剛剛咱們都使用的是 相同的ip 服務,下面使用其餘集羣中的節點,驗證是否可以接受到服務
在另外兩個節點上使用
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning
而後再使用 broker 進行消息發送,經測試三個節點均可以接受到消息。
在搭建 Kafka 的時候咱們簡單介紹了一下 server.properties
中配置的含義,如今咱們來詳細介紹一下參數的配置和概念
這些參數是 kafka 中最基本的配置
每一個 broker 都須要有一個標識符,使用 broker.id 來表示。它的默認值是 0,它能夠被設置成其餘任意整數,在集羣中須要保證每一個節點的 broker.id 都是惟一的。
若是使用配置樣原本啓動 kafka ,它會監聽 9092 端口,修改 port 配置參數能夠把它設置成其餘任意可用的端口。
用於保存 broker 元數據的地址是經過 zookeeper.connect 來指定。 localhost:2181 表示運行在本地 2181 端口。該配置參數是用逗號分隔的一組 hostname:port/path 列表,每一部分含義以下:
hostname 是 zookeeper 服務器的服務名或 IP 地址
port 是 zookeeper 鏈接的端口
/path 是可選的 zookeeper 路徑,做爲 Kafka 集羣的 chroot 環境。若是不指定,默認使用跟路徑
Kafka 把消息都保存在磁盤上,存放這些日誌片斷的目錄都是經過 log.dirs
來指定的。它是一組用逗號分隔的本地文件系統路徑。若是指定了多個路徑,那麼 broker 會根據 "最少使用" 原則,把同一分區的日誌片斷保存到同一路徑下。要注意,broker 會向擁有最少數目分區的路徑新增分區,而不是向擁有最小磁盤空間的路徑新增分區。
對於以下 3 種狀況,Kafka 會使用可配置的線程池來處理日誌片斷
服務器正常啓動,用於打開每一個分區的日誌片斷;
服務器崩潰後啓動,用於檢查和截斷每一個分區的日誌片斷;
服務器正常關閉,用於關閉日誌片斷
默認狀況下,每一個日誌目錄只使用一個線程。由於這些線程只是在服務器啓動和關閉時會用到,因此徹底能夠設置大量的線程來達到井行操做的目的。特別是對於包含大量分區的服務器來講,一旦發生崩憤,在進行恢復時使用井行操做可能會省下數小時的時間。設置此參數時須要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,若是 num.recovery.threads.per.data.dir 被設爲 8,而且 log.dir 指定了 3 個路徑,那麼總共須要 24 個線程。
默認狀況下,Kafka 會在以下 3 種狀況下建立主題
當一個生產者開始往主題寫入消息時
當一個消費者開始從主題讀取消息時
當任意一個客戶向主題發送元數據請求時
若是你想要刪除一個主題,你可使用主題管理工具。默認狀況下,是不容許刪除主題的,delete.topic.enable 的默認值是 false 所以你不能隨意刪除主題。這是對生產環境的合理性保護,可是在開發環境和測試環境,是能夠容許你刪除主題的,因此,若是你想要刪除主題,須要把 delete.topic.enable 設爲 true。
Kafka 爲新建立的主題提供了不少默認配置參數,下面就來一塊兒認識一下這些參數
num.partitions 參數指定了新建立的主題須要包含多少個分區。若是啓用了主題自動建立功能(該功能是默認啓用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,咱們能夠增長主題分區的個數,但不能減小分區的個數。
這個參數比較簡單,它表示 kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務default.replication.factor 的默認值爲1,這個參數在你啓用了主題自動建立功能後有效。
Kafka 一般根據時間來決定數據能夠保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一週。除此以外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數做用是同樣的,都是決定消息多久之後被刪除,推薦使用 log.retention.ms。
另外一種保留消息的方式是判斷消息是否過時。它的值經過參數 log.retention.bytes
來指定,做用在每個分區上。也就是說,若是有一個包含 8 個分區的主題,而且 log.retention.bytes 被設置爲 1GB,那麼這個主題最多能夠保留 8GB 數據。因此,當主題的分區個數增長時,整個主題能夠保留的數據也隨之增長。
上述的日誌都是做用在日誌片斷上,而不是做用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片斷上,當日志片斷大小到達 log.segment.bytes 指定上限(默認爲 1GB)時,當前日誌片斷就會被關閉,一個新的日誌片斷被打開。若是一個日誌片斷被關閉,就開始等待過時。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而下降磁盤寫入的總體效率。
上面提到日誌片斷經關閉後需等待過時,那麼 log.segment.ms
這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片斷會在大小或時間到達上限時被關閉,就看哪一個條件先獲得知足。
broker 經過設置 message.max.bytes
參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,若是生產者嘗試發送的消息超過這個大小,不只消息不會被接收,還會收到 broker 返回的錯誤消息。跟其餘與字節相關的配置參數同樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小能夠大於這個值
以上就是個人面試過程,爲了此次面試,也收集了不少的面試題,反正我已經面過了,那就免費分享出來吧!
須要的朋友:關注一下,而後點擊這裏便可免費領取
如下是部分面試題截圖