真的,搞定kafka看這一篇就夠了,linux系統入門視頻

啓動服務(每臺都須要執行)java

cd /usr/local/zookeeper/zookeeper-3.4.10/bin
./zkServer.sh start

檢查服務狀態linux

使用 ./zkServer.sh status 命令檢查服務狀態程序員

192.168.1.7 --- follower面試

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

192.168.1.8 --- leaderapache

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

192.168.1.9 --- followerbootstrap

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

zk集羣通常只有一個leader,多個follower,主通常是相應客戶端的讀寫請求,而從主同步數據,當主掛掉以後就會從follower裏投票選舉一個leader出來。服務器

Kafka 集羣搭建

準備條件

  • 搭建好的 Zookeeper 集羣
  • Kafka 壓縮包

/usr/local 下新建 kafka 文件夾,而後把下載完成的 tar.gz 包移到 /usr/local/kafka 目錄下,使用 tar -zxvf 壓縮包 進行解壓,解壓完成後,進入到 kafka_2.12-2.3.0 目錄下,新建 log 文件夾,進入到 config 目錄下markdown

咱們能夠看到有不少 properties 配置文件,這裏主要關注 server.properties 這個文件便可。
真的,搞定kafka看這一篇就夠了,linux系統入門視頻網絡

kafka 啓動方式有兩種,一種是使用 kafka 自帶的 zookeeper 配置文件來啓動(能夠按照官網來進行啓動,並使用單個服務多個節點來模擬集羣http://kafka.apache.org/quickstart#quickstart_multibroker),一種是經過使用獨立的zk集羣來啓動,這裏推薦使用第二種方式,使用 zk 集羣來啓動socket

修改配置項

須要爲每一個服務都修改一下配置項,也就是server.properties, 須要更新和添加的內容有

broker.id=0 //初始是0,每一個 server 的broker.id 都應該設置爲不同的,就和 myid 同樣 個人三個服務分別設置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設置zookeeper的鏈接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

配置項的含義

broker.id=0  #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣
port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.1.7 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,若是配置多個目錄,新建立的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能
socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除
log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #設置zookeeper的鏈接端口

啓動 Kafka 集羣並測試

  • 啓動服務,進入到 /usr/local/kafka/kafka_2.12-2.3.0/bin 目錄下
# 啓動後臺進程
./kafka-server-start.sh -daemon ../config/server.properties
  • 檢查服務是否啓動
# 執行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka
  • kafka 已經啓動
  • 建立 Topic 來驗證是否建立成功
# cd .. 往回退一層 到 /usr/local/kafka/kafka_2.12-2.3.0 目錄下
bin/kafka-topics.sh --create --zookeeper 192.168.1.7:2181 --replication-factor 2 --partitions 1 --topic cxuan

對上面的解釋

--replication-factor 2 複製兩份

--partitions 1 建立1個分區

--topic 建立主題

查看咱們的主題是否出建立成功

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

啓動一個服務就能把集羣啓動起來

在一臺機器上建立一個發佈者

# 建立一個broker,發佈者
./kafka-console-producer.sh --broker-list 192.168.1.7:9092 --topic cxuantopic

在一臺服務器上建立一個訂閱者

# 建立一個consumer, 消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

注意:這裏使用 --zookeeper 的話可能出現 zookeeper is not a recognized option 的錯誤,這是由於 kafka 版本過高,須要使用 --bootstrap-server 指令

測試結果

發佈

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

消費

真的,搞定kafka看這一篇就夠了,linux系統入門視頻

其餘命令

顯示 topic

bin/kafka-topics.sh --list --zookeeper 192.168.1.7:2181

# 顯示
cxuantopic

查看 topic 狀態

bin/kafka-topics.sh --describe --zookeeper 192.168.1.7:2181 --topic cxuantopic

# 下面是顯示的詳細信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

# 分區爲爲1  複製因子爲2   主題 cxuantopic 的分區爲0 
# Replicas: 0,1   複製的爲1,2

Leader 負責給定分區的全部讀取和寫入的節點,每一個節點都會經過隨機選擇成爲 leader。

Replicas 是爲該分區複製日誌的節點列表,不管它們是 Leader 仍是當前處於活動狀態。

Isr 是同步副本的集合。它是副本列表的子集,當前仍處於活動狀態並追隨Leader。

至此,kafka 集羣搭建完畢。

驗證多節點接收數據

剛剛咱們都使用的是 相同的ip 服務,下面使用其餘集羣中的節點,驗證是否可以接受到服務

在另外兩個節點上使用

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.7:9092 --topic cxuantopic --from-beginning

而後再使用 broker 進行消息發送,經測試三個節點均可以接受到消息。

配置詳解

在搭建 Kafka 的時候咱們簡單介紹了一下 server.properties 中配置的含義,如今咱們來詳細介紹一下參數的配置和概念

常規配置

這些參數是 kafka 中最基本的配置

  • broker.id

每一個 broker 都須要有一個標識符,使用 broker.id 來表示。它的默認值是 0,它能夠被設置成其餘任意整數,在集羣中須要保證每一個節點的 broker.id 都是惟一的。

  • port

若是使用配置樣原本啓動 kafka ,它會監聽 9092 端口,修改 port 配置參數能夠把它設置成其餘任意可用的端口。

  • zookeeper.connect

用於保存 broker 元數據的地址是經過 zookeeper.connect 來指定。 localhost:2181 表示運行在本地 2181 端口。該配置參數是用逗號分隔的一組 hostname:port/path 列表,每一部分含義以下:

hostname 是 zookeeper 服務器的服務名或 IP 地址

port 是 zookeeper 鏈接的端口

/path 是可選的 zookeeper 路徑,做爲 Kafka 集羣的 chroot 環境。若是不指定,默認使用跟路徑

  • log.dirs

Kafka 把消息都保存在磁盤上,存放這些日誌片斷的目錄都是經過 log.dirs 來指定的。它是一組用逗號分隔的本地文件系統路徑。若是指定了多個路徑,那麼 broker 會根據 "最少使用" 原則,把同一分區的日誌片斷保存到同一路徑下。要注意,broker 會向擁有最少數目分區的路徑新增分區,而不是向擁有最小磁盤空間的路徑新增分區。

  • num.recovery.threads.per.data.dir

對於以下 3 種狀況,Kafka 會使用可配置的線程池來處理日誌片斷

服務器正常啓動,用於打開每一個分區的日誌片斷;

服務器崩潰後啓動,用於檢查和截斷每一個分區的日誌片斷;

服務器正常關閉,用於關閉日誌片斷

默認狀況下,每一個日誌目錄只使用一個線程。由於這些線程只是在服務器啓動和關閉時會用到,因此徹底能夠設置大量的線程來達到井行操做的目的。特別是對於包含大量分區的服務器來講,一旦發生崩憤,在進行恢復時使用井行操做可能會省下數小時的時間。設置此參數時須要注意,所配置的數字對應的是 log.dirs 指定的單個日誌目錄。也就是說,若是 num.recovery.threads.per.data.dir 被設爲 8,而且 log.dir 指定了 3 個路徑,那麼總共須要 24 個線程。

  • auto.create.topics.enable

默認狀況下,Kafka 會在以下 3 種狀況下建立主題

當一個生產者開始往主題寫入消息時

當一個消費者開始從主題讀取消息時

當任意一個客戶向主題發送元數據請求時

  • delete.topic.enable

若是你想要刪除一個主題,你可使用主題管理工具。默認狀況下,是不容許刪除主題的,delete.topic.enable 的默認值是 false 所以你不能隨意刪除主題。這是對生產環境的合理性保護,可是在開發環境和測試環境,是能夠容許你刪除主題的,因此,若是你想要刪除主題,須要把 delete.topic.enable 設爲 true。

主題默認配置

Kafka 爲新建立的主題提供了不少默認配置參數,下面就來一塊兒認識一下這些參數

  • num.partitions

num.partitions 參數指定了新建立的主題須要包含多少個分區。若是啓用了主題自動建立功能(該功能是默認啓用的),主題分區的個數就是該參數指定的值。該參數的默認值是 1。要注意,咱們能夠增長主題分區的個數,但不能減小分區的個數。

  • default.replication.factor

這個參數比較簡單,它表示 kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務default.replication.factor 的默認值爲1,這個參數在你啓用了主題自動建立功能後有效。

  • log.retention.ms

Kafka 一般根據時間來決定數據能夠保留多久。默認使用 log.retention.hours 參數來配置時間,默認是 168 個小時,也就是一週。除此以外,還有兩個參數 log.retention.minutes 和 log.retentiion.ms 。這三個參數做用是同樣的,都是決定消息多久之後被刪除,推薦使用 log.retention.ms。

  • log.retention.bytes

另外一種保留消息的方式是判斷消息是否過時。它的值經過參數 log.retention.bytes 來指定,做用在每個分區上。也就是說,若是有一個包含 8 個分區的主題,而且 log.retention.bytes 被設置爲 1GB,那麼這個主題最多能夠保留 8GB 數據。因此,當主題的分區個數增長時,整個主題能夠保留的數據也隨之增長。

  • log.segment.bytes

上述的日誌都是做用在日誌片斷上,而不是做用在單個消息上。當消息到達 broker 時,它們被追加到分區的當前日誌片斷上,當日志片斷大小到達 log.segment.bytes 指定上限(默認爲 1GB)時,當前日誌片斷就會被關閉,一個新的日誌片斷被打開。若是一個日誌片斷被關閉,就開始等待過時。這個參數的值越小,就越會頻繁的關閉和分配新文件,從而下降磁盤寫入的總體效率。

  • log.segment.ms

上面提到日誌片斷經關閉後需等待過時,那麼 log.segment.ms 這個參數就是指定日誌多長時間被關閉的參數和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日誌片斷會在大小或時間到達上限時被關閉,就看哪一個條件先獲得知足。

  • message.max.bytes

broker 經過設置 message.max.bytes 參數來限制單個消息的大小,默認是 1000 000, 也就是 1MB,若是生產者嘗試發送的消息超過這個大小,不只消息不會被接收,還會收到 broker 返回的錯誤消息。跟其餘與字節相關的配置參數同樣,該參數指的是壓縮後的消息大小,也就是說,只要壓縮後的消息小於 mesage.max.bytes,那麼消息的實際大小能夠大於這個值

寫在最後

以上就是個人面試過程,爲了此次面試,也收集了不少的面試題,反正我已經面過了,那就免費分享出來吧!

須要的朋友:關注一下,而後點擊這裏便可免費領取

如下是部分面試題截圖

Java程序員秋招三面螞蟻金服,我總結了全部面試題,也不過如此真的,搞定kafka看這一篇就夠了,linux系統入門視頻

相關文章
相關標籤/搜索