環境描述apache
服務器名稱 | 系統 | 配置 |
Srv01.contoso.com | CentOS 七、Kafka_2.11-1.1.0 | IP:172.16.48.163 |
Srv02.contoso.com | CentOS 七、Kafka_2.11-1.1.0 | IP:172.16.48.149 |
Srv03.contoso.com | CentOS 七、Kafka_2.11-1.1.0 | IP:172.16.48.154 |
Zookeeper集羣安裝不作贅述,請看Zookeeper詳解(二):Zookeeper安裝和運行
bootstrap
安裝kafka集羣緩存
下載Kafka安裝包:http://kafka.apache.org/downloads bash
下載以後解壓便可。(因爲Kafka是用Scala語言開發,運行在JVM上,因此要先安裝JDK)服務器
編輯Kafka服務的配置文件server.properties。這裏還有一個zookeeper.properties配置文件,該文件是若是你使用kafka自帶zookeeper那麼你就須要配置它。若是你有另外的zookeeper集羣那就忽略就行了。數據結構
配置要鏈接的zookeeper,我這裏就使用一臺。另外這個 /kafka的意思是給它指定一個節點。這個節點你須要提早創建。讓kafka把他須要的數據結構都創建在這個節點下,不然它會使用 「/」節點。
socket
最少運行起來至少要配置四項,也就是上面的內容。ide
listeners # kafka服務器監聽IP和端口,這裏也能夠寫域名,只要能解析就行 broker.id # 三臺服務器的ID不能相同,第一臺是0,第二臺是1,第三臺是2 log.dirs # 日誌路徑 zookeeper.connect # Zookeeper鏈接參數 # 另外我還配置了 message.max.bytes =5000000 default.replication.factor =2
配置好後把配置文件複製到其餘2臺服務器上,並修改broker.id和listeners字段。下面啓動Kafka,進入bin目錄運行下面的命令,有兩種方式性能
# 第一種方式(推薦) kafka-server-start.sh -daemon ../config/server.properties # 第二種方式 nohup kafka-server-start.sh config/server.properties& # 中止 kafka-server-stop.sh
查看進程和端口測試
按照一樣的方法啓動剩餘2臺。查看日誌
日誌名稱 | 說明 |
server.log | 是kafka系統日誌,很經常使用 |
state-change.log | leader切換日誌,就是broker宕機副本切換 |
controller.log | kafka集羣中有一臺機器是控制器,那麼控制器角色的日誌就記錄在這裏 |
咱們看看zookeeper裏面的三個節點註冊信息
測試生產者與消費者之間的消息發送,這是利用它自帶的腳本程序來建立生產者和消費者
kafka-console-producer.sh --broker-list 172.16.48.163:9092 --topic test1
kafka-console-consumer.sh --bootstrap-server 172.16.48.163:9092 --topic test1 --from-beginning
而後在生產者的>提示符後面輸入內容,那麼消費者就會收到
Kafka命令使用
說明:全部命令都要在bin目錄下執行,除非你配置了環境變量。 --zookeeper 172.16.48.163:2181/kafka 之因此後面加一個/kafka這個是由於我在kafka配置文件中指明使用這個名稱空間。
# 建立主題 ./kafka-topics.sh --create --zookeeper 172.16.48.163:2181/kafka --replication-factor 2 --partitions 1 --topic test9999
--replication-factor | 該主題每一個分區的副本數,不能大於broker數量。這個副本用於備份,假設每一個分區有2個副本,那麼只有一個是leader負責讀寫,follower只是負責同步內容,對外提供讀寫的也是由leader提供。Leader宕機follower接管成爲新的leader。這裏的數量是包括Leader+Follwer的 |
--partitions | 分區數量,控制將主題切分紅多少個LOG。消費者數量應該和分區數量相等,若是超過則毫無心義。 |
--topic |
主題名稱 |
# 刪除主題 kafka-topics.sh --delete --zookeeper 172.16.48.163:2181/kafka --topic test9999
# 查看全部主題 ./kafka-topics.sh --list --zookeeper 172.16.48.163:2181/kafka
# 查看指定topic信息,從下圖能夠看出當時創建這個topic的時候咱們設置3個分區,每一個分區的副本是3. ./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka --topic BBB
PartiticonCount | 顯示分區數量一共有多少 |
ReplicationFactor | 副本因子是多少 |
Partition | 分區編號 |
Leader | 顯示Leader副本在哪一個Broker上,這裏是不一樣分區會有不一樣,表示Leader在broker.id=0的服務器上。三個分區每一個分區有三個副本,分區編號從0開始,因此這個Leader是說後面Replicas副本里面哪一個是Leader。Leader副本提供讀寫,非Leader副本只作數據備份。 從下圖能夠看出分區0和1對外提供讀寫的副本都在broker 2上。固然這不是一個好現象,意味着這個服務器將處理一個主題的2個分區讀寫,咱們要平均分開。 |
Replicas | 顯示該partitions全部副本存儲在哪些節點上 broker.id 這個是配置文件中設置的,它包括leader和follower節點 |
Isr | 顯示副本都已經同步的節點集合,這個集合的全部節點都是存活的,而且跟LEADER節點同步 |
# 平衡讀寫 kafka-preferred-replica-election.sh --zookeeper 172.16.48.163:2181/kafka
再次查看BBB主題發現Leader均衡了,3個分區的Leader副本由3個broker各自承擔一個。不像以前0和1分區的Leader副本都在broker 2上
# 線上集羣全部主題狀況 ./kafka-topics.sh --describe --zookeeper 172.16.48.163:2181/kafka
kafka服務器配置文件說明
# ----------------------系統相關---------------------- # broker的全局惟一編號,不能重複,和zookeeper的myid是一個意思 broker.id=0 # broker監聽IP和端口也能夠是域名 listeners=PLAINTEXT://172.16.48.163:9092 # 用於接收請求的線程數量 num.network.threads=3 # 用於處理請求的線程數量,包括磁盤IO請求,這個數量和log.dirs配置的目錄數量有關,這裏的數量不能小於log.dirs的數量, # 雖然log.dirs是配置日誌存放路徑,可是它能夠配置多個目錄後面用逗號分隔 num.io.threads=8 # 發送緩衝區大小,也就是說發送消息先發送到緩衝區,當緩衝區滿了以後一塊兒發送出去 socket.send.buffer.bytes=102400 # 接收緩衝區大小,同理接收到緩衝區,當到達這個數量時就同步到磁盤 socket.receive.buffer.bytes=102400 # 向kafka套接字請求最大字節數量,防止服務器OOM,也就是OutOfMemery,這個數量不要超過JAVA的堆棧大小, socket.request.max.bytes=104857600 # 日誌路徑也就是分區日誌存放的地方,你所創建的topic的分區就在這裏面,可是它能夠配置多個目錄後面用逗號分隔 log.dirs=/tmp/kafka-logs # 消息體(也就是往Kafka發送的單條消息)最大大小,單位是字節,必須小於socket.request.max.bytes值 message.max.bytes =5000000 # 自動平衡因爲某個broker故障會致使Leader副本遷移到別的broker,當以前的broker恢復後也不會遷移回來,有時候咱們須要 # 手動進行平衡避免同一個主題不一樣分區的Leader副本在同一臺broker上,下面這個參數就是開啓自動平衡功能 auto.leader.rebalance.enable=true # 設置了上面的自動平衡,當故障轉移後,隔300秒(默認)觸發一個定時任務進行平衡操做,而只有代理的不均衡率爲10%以上纔會執行 leader.imbalance.check.interval.seconds=300 # 設置代理的不均衡率,默認是10% leader.imbalance.per.broker.percentage=10 # ---------------分區相關------------------------- # 默認分區數量,當創建Topic時不指定分區數量,默認就1 num.partitions=1 # 是否容許自動建立topic ,如果false,就須要經過命令建立topic auto.create.topics.enable =true # 一個topic ,默認分區的replication個數 ,不得大於集羣中broker的個數 default.replication.factor =2 # ---------------日誌相關------------------------- # segment文件默認會被保留7天的時間,超時的話就會被清理,那麼清理這件事情就須要有一些線程來作。 # 這裏就是用來設置恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 # 日誌文件中每一個segment的大小,默認爲1G。topic的分區是以一堆segment文件存儲的,這個控制每一個segment的大小,當超過這個大小會創建一個新日誌文件 # 這個參數會被topic建立時的指定參數覆蓋,若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準。 log.segment.bytes=1073741824 # 數據存儲的最大時間 超過這個時間 會根據log.cleanup.policy設置的策略處理數據,也就是消費端可以多久去消費數據 # log.retention.bytes和log.retention.minutes|hours任意一個達到要求,都會執行刪除 # 若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準 log.retention.hours|minutes=168 # 這個參數會在日誌segment沒有達到log.segment.bytes設置的大小默認1G的時候,也會強制新建一個segment會被 # topic建立時的指定參數覆蓋 log.roll.hours=168 # 上面的參數設置了每個segment文件的大小是1G,那麼就須要有一個東西去按期檢查segment文件有沒有達到1G,多長時間去檢查一次, # 就須要設置一個週期性檢查文件大小的時間(單位是毫秒)。 log.retention.check.interval.ms=300000 # 日誌清理策略 選擇有:delete和compact 主要針對過時數據的處理,或是日誌文件達到限制的額度, # 若是你建立Topic的時候指定了這個參數,那麼你以你指定的爲準 log.cleanup.policy = delete # 是否啓用日誌清理功能,默認是啓用的且清理策略爲compact,也就是壓縮。 log.cleaner.enable=false # 日誌清理時所使用的緩存空間大小 log.cleaner.dedupe.buffer.size=134217728 # log文件"sync"到磁盤以前累積的消息條數,由於磁盤IO操做是一個慢操做,但又是一個"數據可靠性"的必要手段 # 因此此參數的設置,須要在"數據可靠性"與"性能"之間作必要的權衡. # 若是此值過大,將會致使每次"fsync"的時間較長(IO阻塞) # 若是此值太小,將會致使"fsync"的次數較多,這也意味着總體的client請求有必定的延遲. # 物理server故障,將會致使沒有fsync的消息丟失. log.flush.interval.messages=9223372036854775807 # 檢查是否須要固化到硬盤的時間間隔 log.flush.scheduler.interval.ms =3000 # 僅僅經過interval來控制消息的磁盤寫入時機,是不足的. # 此參數用於控制"fsync"的時間間隔,若是消息量始終沒有達到閥值,可是離上一次磁盤同步的時間間隔 # 達到閥值,也將觸發. log.flush.interval.ms = None # --------------------------複製(Leader、replicas) 相關------------------- # partition leader與replicas之間通信時,socket的超時時間 controller.socket.timeout.ms =30000 # replicas響應partition leader的最長等待時間,如果超過這個時間,就將replicas列入ISR(in-sync replicas), # 並認爲它是死的,不會再加入管理中 replica.lag.time.max.ms =10000 # follower與leader之間的socket超時時間 replica.socket.timeout.ms=300000 # leader複製時候的socket緩存大小 replica.socket.receive.buffer.bytes=65536 # replicas每次獲取數據的最大大小 replica.fetch.max.bytes =1048576 # replicas同leader之間通訊的最大等待時間,失敗了會重試 replica.fetch.wait.max.ms =500 # fetch的最小數據尺寸,若是leader中還沒有同步的數據不足此值,將會阻塞,直到知足條件 replica.fetch.min.bytes =1 # leader 進行復制的線程數,增大這個數值會增長follower的IO num.replica.fetchers=1 # 最小副本數量 min.insync.replicas = 2