Kafka(二)Kafka集羣搭建


環境描述apache

服務器名稱 系統 配置
Srv01.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.163
Srv02.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.149
Srv03.contoso.com CentOS 七、Kafka_2.11-1.1.0 IP:172.16.48.154

Zookeeper集羣安裝不作贅述,請看Zookeeper詳解(二):Zookeeper安裝和運行
bootstrap


安裝kafka集羣緩存

下載Kafka安裝包:http://kafka.apache.org/downloads bash

Snip20180701_35.png

下載以後解壓便可。(因爲Kafka是用Scala語言開發,運行在JVM上,因此要先安裝JDK)服務器

Snip20180701_36.png

編輯Kafka服務的配置文件server.properties。這裏還有一個zookeeper.properties配置文件,該文件是若是你使用kafka自帶zookeeper那麼你就須要配置它。若是你有另外的zookeeper集羣那就忽略就行了。數據結構

Snip20180701_38.png

Snip20180701_39.png

Snip20180701_42.png

配置要鏈接的zookeeper,我這裏就使用一臺。另外這個 /kafka的意思是給它指定一個節點。這個節點你須要提早創建。讓kafka把他須要的數據結構都創建在這個節點下,不然它會使用 「/」節點。
socket

Snip20180701_43.png

最少運行起來至少要配置四項,也就是上面的內容。ide

listeners   # kafka服務器監聽IP和端口,這裏也能夠寫域名,只要能解析就行
broker.id   # 三臺服務器的ID不能相同,第一臺是0,第二臺是1,第三臺是2
log.dirs    # 日誌路徑
zookeeper.connect  # Zookeeper鏈接參數

# 另外我還配置了
message.max.bytes =5000000
default.replication.factor =2

配置好後把配置文件複製到其餘2臺服務器上,並修改broker.id和listeners字段。下面啓動Kafka,進入bin目錄運行下面的命令,有兩種方式性能

# 第一種方式(推薦)
kafka-server-start.sh -daemon ../config/server.properties
# 第二種方式
nohup kafka-server-start.sh config/server.properties&

# 中止
kafka-server-stop.sh

Snip20180701_44.png

查看進程和端口測試

Snip20180701_46.png

按照一樣的方法啓動剩餘2臺。查看日誌

日誌名稱 說明
server.log 是kafka系統日誌,很經常使用
state-change.log leader切換日誌,就是broker宕機副本切換
controller.log kafka集羣中有一臺機器是控制器,那麼控制器角色的日誌就記錄在這裏

Snip20180701_47.png

咱們看看zookeeper裏面的三個節點註冊信息

Snip20180701_49.png

測試生產者與消費者之間的消息發送,這是利用它自帶的腳本程序來建立生產者和消費者

kafka-console-producer.sh --broker-list 172.16.48.163:9092 --topic test1

Snip20180701_50.png

kafka-console-consumer.sh --bootstrap-server 172.16.48.163:9092 --topic test1 --from-beginning

Snip20180701_51.png

而後在生產者的>提示符後面輸入內容,那麼消費者就會收到

Snip20180701_52.png


Kafka命令使用

說明:全部命令都要在bin目錄下執行,除非你配置了環境變量。  --zookeeper 172.16.48.163:2181/kafka 之因此後面加一個/kafka這個是由於我在kafka配置文件中指明使用這個名稱空間。

# 建立主題
./kafka-topics.sh --create --zookeeper 172.16.48.163:2181/kafka --replication-factor 2 --partitions 1 --topic test9999
--replication-factor 該主題每一個分區的副本數,不能大於broker數量。這個副本用於備份,假設每一個分區有2個副本,那麼只有一個是leader負責讀寫,follower只是負責同步內容,對外提供讀寫的也是由leader提供。Leader宕機follower接管成爲新的leader。這裏的數量是包括Leader+Follwer
--partitions 分區數量,控制將主題切分紅多少個LOG。消費者數量應該和分區數量相等,若是超過則毫無心義。
--topic
主題名稱

Snip20180701_54.png

# 刪除主題
kafka-topics.sh --delete --zookeeper 172.16.48.163:2181/kafka --topic test9999

Snip20180701_56.png

# 查看全部主題
./kafka-topics.sh --list --zookeeper 172.16.48.163:2181/kafka

Snip20180701_55.png

# 查看指定topic信息,從下圖能夠看出當時創建這個topic的時候咱們設置3個分區,每一個分區的副本是3.
./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka --topic BBB
PartiticonCount 顯示分區數量一共有多少
ReplicationFactor 副本因子是多少
Partition 分區編號
Leader

顯示Leader副本在哪一個Broker上,這裏是不一樣分區會有不一樣,表示Leader在broker.id=0的服務器上。三個分區每一個分區有三個副本,分區編號從0開始,因此這個Leader是說後面Replicas副本里面哪一個是Leader。Leader副本提供讀寫,非Leader副本只作數據備份

從下圖能夠看出分區0和1對外提供讀寫的副本都在broker 2上。固然這不是一個好現象,意味着這個服務器將處理一個主題的2個分區讀寫,咱們要平均分開。

Replicas 顯示該partitions全部副本存儲在哪些節點上 broker.id 這個是配置文件中設置的,它包括leaderfollower節點
Isr 顯示副本都已經同步的節點集合,這個集合的全部節點都是存活的,而且跟LEADER節點同步

Snip20180701_58.png

# 平衡讀寫
kafka-preferred-replica-election.sh --zookeeper 172.16.48.163:2181/kafka

Snip20180701_59.png

再次查看BBB主題發現Leader均衡了,3個分區的Leader副本由3個broker各自承擔一個。不像以前0和1分區的Leader副本都在broker 2上

Snip20180701_60.png

# 線上集羣全部主題狀況
./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka

Snip20180701_57.png



kafka服務器配置文件說明

# ----------------------系統相關----------------------
# broker的全局惟一編號,不能重複,和zookeeper的myid是一個意思
broker.id=0

# broker監聽IP和端口也能夠是域名
listeners=PLAINTEXT://172.16.48.163:9092

# 用於接收請求的線程數量
num.network.threads=3

# 用於處理請求的線程數量,包括磁盤IO請求,這個數量和log.dirs配置的目錄數量有關,這裏的數量不能小於log.dirs的數量,
# 雖然log.dirs是配置日誌存放路徑,可是它能夠配置多個目錄後面用逗號分隔
num.io.threads=8

# 發送緩衝區大小,也就是說發送消息先發送到緩衝區,當緩衝區滿了以後一塊兒發送出去
socket.send.buffer.bytes=102400

# 接收緩衝區大小,同理接收到緩衝區,當到達這個數量時就同步到磁盤
socket.receive.buffer.bytes=102400

# 向kafka套接字請求最大字節數量,防止服務器OOM,也就是OutOfMemery,這個數量不要超過JAVA的堆棧大小,
socket.request.max.bytes=104857600

# 日誌路徑也就是分區日誌存放的地方,你所創建的topic的分區就在這裏面,可是它能夠配置多個目錄後面用逗號分隔
log.dirs=/tmp/kafka-logs

# 消息體(也就是往Kafka發送的單條消息)最大大小,單位是字節,必須小於socket.request.max.bytes值
message.max.bytes =5000000

# 自動平衡因爲某個broker故障會致使Leader副本遷移到別的broker,當以前的broker恢復後也不會遷移回來,有時候咱們須要
# 手動進行平衡避免同一個主題不一樣分區的Leader副本在同一臺broker上,下面這個參數就是開啓自動平衡功能
auto.leader.rebalance.enable=true

# 設置了上面的自動平衡,當故障轉移後,隔300秒(默認)觸發一個定時任務進行平衡操做,而只有代理的不均衡率爲10%以上纔會執行
leader.imbalance.check.interval.seconds=300

# 設置代理的不均衡率,默認是10%
leader.imbalance.per.broker.percentage=10

# ---------------分區相關-------------------------

# 默認分區數量,當創建Topic時不指定分區數量,默認就1
num.partitions=1

# 是否容許自動建立topic ,如果false,就須要經過命令建立topic
auto.create.topics.enable =true
 
# 一個topic ,默認分區的replication個數 ,不得大於集羣中broker的個數
default.replication.factor =2

# ---------------日誌相關-------------------------

# segment文件默認會被保留7天的時間,超時的話就會被清理,那麼清理這件事情就須要有一些線程來作。
# 這裏就是用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1

# 日誌文件中每一個segment的大小,默認爲1G。topic的分區是以一堆segment文件存儲的,這個控制每一個segment的大小,當超過這個大小會創建一個新日誌文件
# 這個參數會被topic建立時的指定參數覆蓋,若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準。
log.segment.bytes=1073741824

# 數據存儲的最大時間 超過這個時間 會根據log.cleanup.policy設置的策略處理數據,也就是消費端可以多久去消費數據
# log.retention.bytes和log.retention.minutes|hours任意一個達到要求,都會執行刪除
# 若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準
log.retention.hours|minutes=168

# 這個參數會在日誌segment沒有達到log.segment.bytes設置的大小默認1G的時候,也會強制新建一個segment會被
# topic建立時的指定參數覆蓋
log.roll.hours=168 

# 上面的參數設置了每個segment文件的大小是1G,那麼就須要有一個東西去按期檢查segment文件有沒有達到1G,多長時間去檢查一次,
# 就須要設置一個週期性檢查文件大小的時間(單位是毫秒)。
log.retention.check.interval.ms=300000

# 日誌清理策略 選擇有:delete和compact 主要針對過時數據的處理,或是日誌文件達到限制的額度,
# 若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準
log.cleanup.policy = delete

# 是否啓用日誌清理功能,默認是啓用的且清理策略爲compact,也就是壓縮。
log.cleaner.enable=false

# 日誌清理時所使用的緩存空間大小
log.cleaner.dedupe.buffer.size=134217728

# log文件"sync"到磁盤以前累積的消息條數,由於磁盤IO操做是一個慢操做,但又是一個"數據可靠性"的必要手段
# 因此此參數的設置,須要在"數據可靠性"與"性能"之間作必要的權衡.
# 若是此值過大,將會致使每次"fsync"的時間較長(IO阻塞)
# 若是此值太小,將會致使"fsync"的次數較多,這也意味着總體的client請求有必定的延遲.
# 物理server故障,將會致使沒有fsync的消息丟失.
log.flush.interval.messages=9223372036854775807

# 檢查是否須要固化到硬盤的時間間隔
log.flush.scheduler.interval.ms =3000
 
# 僅僅經過interval來控制消息的磁盤寫入時機,是不足的.
# 此參數用於控制"fsync"的時間間隔,若是消息量始終沒有達到閥值,可是離上一次磁盤同步的時間間隔
# 達到閥值,也將觸發.
log.flush.interval.ms = None

# --------------------------複製(Leader、replicas) 相關-------------------
# partition leader與replicas之間通信時,socket的超時時間
controller.socket.timeout.ms =30000

# replicas響應partition leader的最長等待時間,如果超過這個時間,就將replicas列入ISR(in-sync replicas),
# 並認爲它是死的,不會再加入管理中
replica.lag.time.max.ms =10000

# follower與leader之間的socket超時時間
replica.socket.timeout.ms=300000

# leader複製時候的socket緩存大小
replica.socket.receive.buffer.bytes=65536

# replicas每次獲取數據的最大大小
replica.fetch.max.bytes =1048576

# replicas同leader之間通訊的最大等待時間,失敗了會重試
replica.fetch.wait.max.ms =500

# fetch的最小數據尺寸,若是leader中還沒有同步的數據不足此值,將會阻塞,直到知足條件
replica.fetch.min.bytes =1

# leader 進行復制的線程數,增大這個數值會增長follower的IO
num.replica.fetchers=1

# 最小副本數量
min.insync.replicas = 2
相關文章
相關標籤/搜索