Linux下搭建Kafka集羣

摘要

Kafka 是一個分佈式的基於push-subscribe的消息系統,它具有快速、可擴展、可持久化的特色。由 LinkedIn 開源,用做 LinkedIn 的活動流(Activity Stream)和運營數據處理管道(Pipeline)的基礎。如今是Apache旗下的一個開源系統,做爲Hadoop生態系統的一部分,被各類商業公司普遍應用。它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於Hadoop的批處理系統、低延遲的實時系統、Flink/Storm/Spark流式處理引擎。能夠說是現代分佈式系統的基石,學習kafka的使用、原理變得十分必要。html

本文基於Ubuntu 16.04 LTS,介紹如何搭建1主2備的kafka集羣。java

環境依賴

  • 服務器數量 >= 2
  • Java 8+
  • 搭建好ZooKeeper集羣
  • 防火牆開放9092端口:
ufw allow 9092
ufw reload

1、下載與解壓kafka

一、下載

$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz

二、解壓到指定位置

$ tar -xzvf kafka_2.13-2.7.0.tgz -C /usr/local
$ cd /usr/local/kafka_2.13-2.7.0

2、配置文件

一、建立kafka消息存放目錄

# 建立kafka消息存放目錄,配置文件server.properties中的log.dir須要用到
$ mkdir /usr/local/kafka_2.13-2.7.0/kafka_logs

二、配置server.properties

$ vim /usr/local/kafka_2.13-2.7.0/config/server.properties

# 修改以下內容,下面以服務器1:個人主機名是:ubuntu-master爲例

#每一個broker在集羣中的惟一標識,不能重複,和zookeeper的myid性質同樣
broker.id=0
#端口
port=9092
#broker主機地址
host.name=ubuntu-master.com

#消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數
#若是配置多個目錄,新建立的topic把消息持久化的地方是:當前以逗號分割的目錄中,哪一個分區數最少就放哪個
log.dirs=/usr/local/kafka_2.13-2.7.0/kafka_logs

#在log.retention.hours=168 下面新增下面三項:
#消息保存的最大值5M
message.max.byte=5242880
#kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
default.replication.factor=2
#取消息的最大直接數
replica.fetch.max.bytes=5242880

#zookeeper集羣地址
zookeeper.connect=ubuntu-master.com:2181,ubuntu-slave1.com:2181,ubuntu-slave2.com:2181

三、[選看]server.properties相關參數解析

broker.id=0  #當前機器在集羣中的惟一標識,和zookeeper的myid性質同樣
port=9092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.1.113 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄能夠配置爲「,」逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數。若是配置多個目錄,新建立的topic他把消息持久化的地方是:當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩衝區buffer大小,數據不是一會兒就發送的,先回存儲到緩衝區了到達必定的大小後在發送,能提升性能
socket.receive.buffer.bytes=102400 #kafka接收緩衝區大小,當數據到達必定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,若是一個副本失效了,另外一個還能夠繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:由於kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過時的消息若是有,刪除
log.cleaner.enable=false #是否啓用log壓縮,通常不用啓用,啓用的話能夠提升性能
zookeeper.connect=192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:218 #設置zookeeper的鏈接端口

3、將Kafka分發到各個節點

$ cd /usr/local/
$ scp -r kafka_2.13-2.7.0 root@192.168.1.114:/usr/local/
$ scp -r kafka_2.13-2.7.0 root@192.168.1.115:/usr/local/

4、修改其餘節點配置文件

一、修改服務器2的server.properties

$ vim /usr/local/kafka_2.13-2.7.0/config/server.properties

# 需修改內容以下,個人主機名是:ubuntu-slave1

#每一個broker在集羣中的惟一標識,不能重複,和zookeeper的myid性質同樣
broker.id=1
#端口
port=9092
#broker主機地址
host.name=ubuntu-slave1.com

二、修改服務器3的server.properties

$ vim /usr/local/kafka_2.13-2.7.0/config/server.properties

# 需修改內容以下,個人主機名是:ubuntu-slave2

#每一個broker在集羣中的惟一標識,不能重複,和zookeeper的myid性質同樣
broker.id=2
#端口
port=9092
#broker主機地址
host.name=ubuntu-slave2.com

5、配置環境變量

須要在各個服務器上進行配置:shell

vim /etc/profile

#添加以下內容
export KAFKA_HOME=/usr/local/kafka_2.13-2.7.0
export PATH=$KAFKA_HOME/bin:$PATH

#激活環境變量
source /etc/profile

6、啓動Kafka集羣並測試

一、啓動服務

#從後臺啓動Kafka集羣(3臺都須要啓動)
$ cd /usr/local/kafka_2.13-2.7.0/bin #進入到kafka的bin目錄 
$ ./kafka-server-start.sh -daemon ../config/server.properties

中止服務:apache

$ cd /usr/local/kafka_2.13-2.7.0/bin #進入到kafka的bin目錄 
$ ./kafka-server-stop.sh

二、檢查服務是否啓動

#執行命令jps
3749 Jps
1767 QuorumPeerMain
3673 Kafka

三、建立Topic來驗證是否建立成功

更多請看官方文檔:http://kafka.apache.org/documentation.htmlbootstrap

$ cd /usr/local/kafka_2.13-2.7.0/bin #進入到kafka的bin目錄 

#建立Topic
$ ./kafka-topics.sh --create --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --replication-factor 2 --partitions 1 --topic foo
#解釋
# --create  表示建立
# --zookeeper 192.168.1.113:2181  後面的參數是zk的集羣節點
# --replication-factor 2  表示複本數
# --partitions 1  表示分區數
# --topic foo  表示主題名稱爲foo

#查看topic 列表:
$ ./kafka-topics.sh --list --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181
    
#查看指定topic:
$ ./kafka-topics.sh --describe --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --topic foo

#刪除topic
$ ./kafka-topics.sh --delete --zookeeper 192.168.1.113:2181,192.168.1.114:2181,192.168.1.115:2181 --topic foo

四、建立生產者

'''在一臺服務器上建立一個生產者'''
#建立一個broker,生產者
$ ./kafka-console-producer.sh --broker-list 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 --topic foo

五、建立消費者

'''在另外一臺服務器上建立一個消費者'''
$ ./kafka-console-consumer.sh --bootstrap-server 192.168.1.113:9092,192.168.1.114:9092,192.168.1.115:9092 --topic foo --from-beginning

注:Kafka 從 2.2 版本開始將kafka-topic.sh腳本中的 −−zookeeper參數標註爲 「過期」,推薦使用 −−bootstrap-server參數。ubuntu

端口也由以前的zookeeper通訊端口2181,改成了kafka通訊端口9092vim

參考

[1]Kafka【第一篇】Kafka集羣搭建[http://www.javashuo.com/article/p-azoogwkc-ee.html]服務器

[2]kafka集羣環境搭建[https://www.cnblogs.com/luzhanshi/p/13369834.html]微信

更多關於大數據、分佈式、存儲、區塊鏈、Linux相關文章請關注個人微信公衆號:asympTech漸進線實驗室網絡

相關文章
相關標籤/搜索