Kafka快速入門(六)——Kafka集羣部署

Kafka快速入門(六)——Kafka集羣部署

1、Kafka集羣部署方案規劃

一、操做系統選擇

一般,生產環境應該將Kafka集羣部署在Linux操做系統上,緣由以下:
(1)Kafka客戶端底層使用了Java的selector,selector在Linux上的實現機制是epoll,而在Windows平臺上的實現機制是select,所以Kafka部署在Linux上可以得到更高效的I/O性能。
(2)網絡傳輸效率的差異。Kafka須要在磁盤和網絡間進行大量數據傳輸,在Linux部署Kafka可以享受到零拷貝(Zero Copy)技術所帶來的快速數據傳輸特性。
(3)社區的支持度。Apache Kafka社區目前對Windows平臺上發現的Kafka Bug不作任何承諾。java

二、磁盤

(1)Kafka實現了冗餘機制來提供高可靠性,並經過分區機制在軟件層面實現負載均衡,所以Kafka的磁盤存儲能夠不使用磁盤陣列(RAID),使用普通磁盤組成存儲空間便可。
(2)使用機械磁盤可以勝任Kafka線上環境,但SSD顯然性能更好。git

三、磁盤容量

規劃磁盤容量時須要考慮:新增消息數、消息留存時間、平均消息大小、備份數、是否啓用壓縮等因素。
假設公司業務天天須要向Kafka集羣發送100000000條消息,每條消息保存兩份以防止數據丟失,消息默認保存7天時間,消息的平均大小是1KB,Kafka的數據壓縮比是0.75。
天天100000000條1KB大小的消息,保存兩份,壓縮比0.75,佔用空間大小就等於150GB(100000000*1KB*2/1000/1000*0.75),考慮到Kafka集羣的索引數據等,須要預留出10%的磁盤空間,所以天天總存儲容量是165GB。數據留存7天,所以規劃磁盤容量爲1155GB(165GB*7)。github

四、網絡帶寬

假設公司的機房環境是千兆網絡,即1Gbps,業務須要在1小時內處理1TB的業務數據。假設Kafka Broker會用到70%的帶寬資源,超過70%的閾值可能網絡丟包,單臺Kafka Broker最多能使用大約700Mb的帶寬資源,但一般須要再額外爲其它服務預留出2/3的資源,即Kafka Broker能夠爲Kafka服務分配帶寬240Mbps(700Mb/3)。1小時處理1TB數據,則每秒須要處理2336Mb(1024*1024*8/3600)數據,除以240,約等於10臺服務器。若是還須要額外複製兩份,那麼服務器臺數還要乘以3,即30臺。web

2、Kafka集羣參數配置

一、Broker端參數

Broker端參數也被稱爲靜態參數(Static Configs),靜態參數只能在Kafka的配置文件server.properties中進行設置,必須重啓Broker進程才能生效。
log.dirs:指定Broker須要使用的若干個文件目錄路徑,沒有默認值,必須指定。在生產環境中必定要爲log.dirs配置多個路徑,若是條件容許,須要保證目錄被掛載到不一樣的物理磁盤上。優點在於,提高讀寫性能,多塊物理磁盤同時讀寫數據具備更高的吞吐量;可以實現故障轉移(Failover),Kafka 1.1版本引入Failover功能,壞掉磁盤上的數據會自動地轉移到其它正常的磁盤上,並且Broker還能正常工做,基於Failover機制,Kafka能夠捨棄RAID方案。
zookeeper.connect:CS格式參數,能夠指定值爲zk1:2181,zk2:2181,zk3:2181,不一樣Kafka集羣能夠指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只須要寫一次。
listeners:設置內網訪問Kafka服務的監聽器。
advertised.listeners:設置外網訪問Kafka服務的監聽器。
auto.create.topics.enable:是否容許自動建立Topic。
unclean.leader.election.enable:是否容許Unclean Leader 選舉。
auto.leader.rebalance.enable:是否容許按期進行Leader選舉,生產環境中建議設置成false。
log.retention.{hours|minutes|ms}:控制一條消息數據被保存多長時間。優先級:ms設置最高、minutes次之、hours最低。
log.retention.bytes:指定Broker爲消息保存的總磁盤容量大小。message.max.bytes:控制Broker可以接收的最大消息大小。算法

二、Topic級別參數

若是同時設置了Topic級別參數和全局Broker參數,Topic級別參數會覆蓋全局Broker參數,而每一個Topic都能設置本身的參數值。
生產環境中,應當容許不一樣部門的Topic根據自身業務須要,設置本身的留存時間。若是隻能設置全局Broker參數,那麼勢必要提取全部業務留存時間的最大值做爲全局參數值,此時設置Topic級別參數對Broker參數進行覆蓋就是一個不錯的選擇。
retention.ms:指定Topic消息被保存的時長,默認是7天,只保存最近7天的消息,會覆蓋掉Broker端的全局參數值。
retention.bytes:指定爲Topic預留多大的磁盤空間。一般在多租戶的Kafka集羣中使用,默認值是 -1,表示能夠無限使用磁盤空間。
max.message.bytes:指定Kafka Broker可以正常接收Topic 的最大消息大小。
Topic級別參數能夠在建立Topic時進行設置,也能夠在修改Topic 時設置,推薦在修改Topic時進行設置,Apache Kafka社區將來可能統一使用kafka-configs腳原本設置Topic級別參數。docker

三、JVM參數

Kafka 2.0.0版本已經正式摒棄對Java 7的支持。
Kafka Broker在與客戶端進行交互時會在JVM堆上建立大量的Byte Buffer實例,所以JVM端設置的Heap Size不能過小,建議設置6GB。
export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
JVM端配置的一個重要參數是垃圾回收器的設置。對於Java 7,若是Broker所在機器的CPU資源很是充裕,建議使用CMS收集器。啓用方法是指定-XX:+UseCurrentMarkSweepGC。不然,使用吞吐量收集器,開啓方法是指定-XX:+UseParallelGC。對於Java 9,用默認的G1收集器,在沒有任何調優的狀況下,G1表現得要比CMS出色,主要體如今更少的Full GC,須要調整的參數更少等,因此使用G1就好。
export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=truebootstrap

四、操做系統參數

件描述符限制:ulimit -n。建議設置成一個超大的值,如ulimit -n 1000000。
文件系統類型:文件系統類型的選擇。根據官網的測試報告,XFS 的性能要強於ext4。
Swappiness:推薦設置爲一個較小值,如1。若是將swap設置爲0,將會徹底禁止Kafka Broker進程使用swap空間;當物理內存耗盡時,操做系統會觸發OOM killer組件,隨機挑選一個進程kill掉,不給用戶任何預警。若是設置一個比較小值,當開始使用swap空間時,Broker性能會出現急劇降低,從而給進一步調優和診斷問題的時間。
提交時間:提交時間(Flush落盤時間)。向Kafka發送數據並非真要等數據被寫入磁盤纔會認爲成功,而是隻要數據被寫入到操做系統的頁緩存(Page Cache)上就認爲寫入成功,隨後操做系統根據LRU算法會按期將頁緩存上的髒數據落盤到物理磁盤上。頁緩存數據寫入磁盤的週期由提交時間來肯定,默認是5秒,能夠適當地增長提交間隔來下降物理磁盤的寫操做。若是在頁緩存中的數據在寫入到磁盤前機器宕機,數據會丟失,但鑑於Kafka在軟件層面已經提供了多副本的冗餘機制,拉大提交間隔換取性能是一個合理的作法。緩存

3、Docker鏡像選擇

一、安裝docker

安裝Docker:sudo yum install docker
啓動Docker:sudo systemctl start docker
docker版本檢查:docker version安全

二、docker-compose安裝

docker-compose下載:sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-uname -s-uname -m-o /usr/local/bin/docker-compose
docker-compose安裝:sudo chmod +x /usr/local/bin/docker-compose
docker-compose版本檢查:docker-compose versionbash

三、docker鏡像選擇

zookeeper鏡像選擇:
docker search zookeeper
選擇star最多的鏡像:docker.io/zookeeper
Kafka鏡像選擇:
docker search kafka
選擇star最多的鏡像:docker.io/wurstmeister/kafka
kafka-manager鏡像選擇:
docker search kafka-manager
選擇鏡像:kafkamanager/kafka-manager

4、Kafka單機部署方案

一、編寫docker-compose.yml文件

# 單機 zookeeper + kafka + kafka-manager集羣
version: '2'

services:
  # 定義zookeeper服務
  zookeeper-test:
    image: zookeeper # zookeeper鏡像
    restart: always
    hostname: zookeeper-test
    ports:
      - "12181:2181" # 宿主機端口:docker內部端口
    container_name: zookeeper-test # 容器名稱

  # 定義kafka服務
  kafka-test:
    image: wurstmeister/kafka # kafka鏡像
    restart: always
    hostname: kafka-test
    ports:
      - "9092:9092" # 對外暴露端口號
      - "9999:9999" # 對外暴露JMX_PORT
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 #
      KAFKA_ADVERTISED_PORT: 9092 # 
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服務
      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper鏈接超時
      KAFKA_LOG_CLEANUP_POLICY: "delete"
      KAFKA_LOG_RETENTION_HOURS: 120 # 設置消息數據保存的最長時間爲120小時
      KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息體的最大字節數
      KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 # 
      KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 # 
      KAFKA_NUM_PARTITIONS: 1 # 分區數量
      KAFKA_DELETE_RETENTION_MS: 10000 # 
      KAFKA_BROKER_ID: 1 # kafka的ID
      KAFKA_COMPRESSION_TYPE: lz4
      KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999"  # 導入KAFKA_JMX_OPTS環境變量
      JMX_PORT: 9999  # 導入JMX_PORT環境變量
    depends_on:
      - zookeeper-test # 依賴
    container_name: kafka-test

  # 定義kafka-manager服務
  kafka-manager-test:
    image: kafkamanager/kafka-manager # kafka-manager鏡像
    restart: always
    container_name: kafka-manager-test
    hostname: kafka-manager-test
    ports:
      - "9000:9000"  # 對外暴露端口,提供web訪問
    depends_on:
      - kafka-test # 依賴
    environment:
      ZK_HOSTS: zookeeper-test:2181 # 宿主機IP
      KAFKA_BROKERS: kafka-test:9090 # kafka
      KAFKA_MANAGER_AUTH_ENABLED: "true"  # 開啓安全認證
      KAFKA_MANAGER_USERNAME: kafka-manager  # Kafka Manager登陸用戶
      KAFKA_MANAGER_PASSWORD: 123456  # Kafka Manager登陸密碼

須要確認相應端口是否被佔用。

二、啓動服務

建立kafka目錄,將docker-compose.yml文件放入kafka目錄,在kafka目錄執行命令。
啓動:
docker-compose up -d
關閉:
docker-compose down

三、kafka服務查看

進入docker容器:
docker exec -it kafka /bin/bash
建立Topic:
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test
查看Topic:
kafka-topics.sh --list --zookeeper zookeeper:2181
生產消息:
kafka-console-producer.sh --broker-list kafka:9092 --topic test
消費消息:
kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
打開兩個Terminal,一個執行生產消息的命令,一個執行消費消息的命令,每生產一條消息時消費消息Terminal就會顯示一條消息,實現消息隊列。

四、Kafka版本查詢

wurstmeister/kafka鏡像中,kafka安裝在/opt目錄下,進入/opt目錄,kafka_2.12-2.4.0目錄即爲kafka安裝目錄。
Scala版本:2.12
Kafka版本:2.4

五、kafka-manager監控

Web方式訪問:http://127.0.0.1:9000

5、錯誤解決

一、容器刪除失敗

docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
報錯信息:
ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy
grep docker /proc/*/mountinfo|grep containerid | awk -F ":" '{print $1}' | awk -F "/" '{print $3}'
sudo kill -9 3119

二、kafka服務一直重啓

報錯信息:
Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running
錯誤緣由:
docker-compose.yml文件對kafka服務配置restart: always,若是kafka服務啓動失敗會一直重啓,能夠經過docker logs kafka查看kafka服務啓動的日誌信息,查找錯誤緣由。

6、Kafka集羣參數配置

############################# System ######################
# 惟一標識在集羣中的ID,要求是正數。  
broker.id = 0
# 服務端口,默認9092  
port = 9092
# 監聽地址,不設爲全部地址  
host.name = debugo01

# 處理網絡請求的最大線程數  
num.network.threads = 2
# 處理磁盤I/O的線程數  
num.io.threads = 8
# 一些後臺線程數  
background.threads = 4
# 等待IO線程處理的請求隊列最大數  
queued.max.requests = 500

#  socket的發送緩衝區(SO_SNDBUF)  
socket.send.buffer.bytes = 1048576
# socket的接收緩衝區 (SO_RCVBUF)   
socket.receive.buffer.bytes = 1048576
# socket請求的最大字節數。爲了防止內存溢出,message.max.bytes必然要小於  
socket.request.max.bytes = 104857600

############################# Topic ########################
# 每一個topic的分區個數,更多的partition會產生更多的segment file  
num.partitions = 2
# 是否容許自動建立topic ,如果false,就須要經過命令建立topic  
auto.create.topics.enable = true
# 一個topic ,默認分區的replication個數 ,不能大於集羣中broker的個數。  
default.replication.factor = 1
# 消息體的最大大小,單位是字節  
message.max.bytes = 1000000

############################# ZooKeeper ####################
# Zookeeper quorum設置。若是有多個使用逗號分割  
zookeeper.connect = debugo01:2181, debugo02, debugo03
# 鏈接zk的超時時間  
zookeeper.connection.timeout.ms = 1000000
# ZooKeeper集羣中leader和follower之間的同步實際  
zookeeper.sync.time.ms = 2000

############################# Log #########################
# 日誌存放目錄,多個目錄使用逗號分割  
log.dirs = / var / log / kafka

# 當達到下面的消息數量時,會將數據flush到日誌文件中。默認10000  
# log.flush.interval.messages=10000  
# 當達到下面的時間(ms)時,執行一次強制的flush操做。interval.ms和interval.messages不管哪一個達到,都會flush。默認3000ms  
# log.flush.interval.ms=1000  
# 檢查是否須要將日誌flush的時間間隔  
log.flush.scheduler.interval.ms = 3000

# 日誌清理策略(delete|compact)  
log.cleanup.policy = delete
# 日誌保存時間 (hours|minutes),默認爲7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes不管哪一個先達到都會觸發。  
log.retention.hours = 168
# 日誌數據存儲的最大字節數。超過這個時間會根據policy處理數據。  
# log.retention.bytes=1073741824  

# 控制日誌segment文件的大小,超出該大小則追加到一個新的日誌segment文件中(-1表示沒有限制)  
log.segment.bytes = 536870912
# 當達到下面時間,會強制新建一個segment  
log.roll.hours = 24 * 7
# 日誌片斷文件的檢查週期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)  
log.retention.check.interval.ms = 60000

# 是否開啓壓縮  
log.cleaner.enable = false
# 對於壓縮的日誌保留的最長時間  
log.cleaner.delete.retention.ms = 1
day

# 對於segment日誌的索引文件大小限制  
log.index.size.max.bytes = 10 * 1024 * 1024
# y索引計算的一個緩衝區,通常不須要設置。  
log.index.interval.bytes = 4096

############################# replica #######################
# partition management controller 與replicas之間通信的超時時間  
controller.socket.timeout.ms = 30000
# controller-to-broker-channels消息隊列的尺寸大小  
controller.message.queue.size = 10
# replicas響應leader的最長等待時間,如果超過這個時間,就將replicas排除在管理以外  
replica.lag.time.max.ms = 10000
# 是否容許控制器關閉broker ,如果設置爲true,會關閉全部在這個broker上的leader,並轉移到其餘broker  
controlled.shutdown.enable = false
# 控制器關閉的嘗試次數  
controlled.shutdown.max.retries = 3
# 每次關閉嘗試的時間間隔  
controlled.shutdown.retry.backoff.ms = 5000

# 若是relicas落後太多,將會認爲此partition relicas已經失效。而通常狀況下,由於網絡延遲等緣由,總會致使replicas中消息同步滯後。若是消息嚴重滯後,leader將認爲此relicas網絡延遲較大或者消息吞吐能力有限。在broker數量較少,或者網絡不足的環境中,建議提升此值.  
replica.lag.max.messages = 4000
# leader與relicas的socket超時時間  
replica.socket.timeout.ms = 30 * 1000
# leader複製的socket緩存大小  
replica.socket.receive.buffer.bytes = 64 * 1024
# replicas每次獲取數據的最大字節數  
replica.fetch.max.bytes = 1024 * 1024
# replicas同leader之間通訊的最大等待時間,失敗了會重試  
replica.fetch.wait.max.ms = 500
# 每個fetch操做的最小數據尺寸,若是leader中還沒有同步的數據不足此值,將會等待直到數據達到這個大小  
replica.fetch.min.bytes = 1
# leader中進行復制的線程數,增大這個數值會增長relipca的IO  
num.replica.fetchers = 1
# 每一個replica將最高水位進行flush的時間間隔  
replica.high.watermark.checkpoint.interval.ms = 5000

# 是否自動平衡broker之間的分配策略  
auto.leader.rebalance.enable = false
# leader的不平衡比例,如果超過這個數值,會對分區進行從新的平衡  
leader.imbalance.per.broker.percentage = 10
# 檢查leader是否不平衡的時間間隔  
leader.imbalance.check.interval.seconds = 300
# 客戶端保留offset信息的最大空間大小  
offset.metadata.max.bytes = 1024

#############################Consumer #####################  
# Consumer端核心的配置是group.id、zookeeper.connect  
# 決定該Consumer歸屬的惟一組ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group.  
group.id
# 消費者的ID,如果沒有設置的話,會自增  
consumer.id
# 一個用於跟蹤調查的ID ,最好同group.id相同  
client.id = < group_id >

# 對於zookeeper集羣的指定,必須和broker使用一樣的zk配置  
zookeeper.connect = debugo01:2182, debugo02: 2182, debugo03: 2182
# zookeeper的心跳超時時間,查過這個時間就認爲是無效的消費者  
zookeeper.session.timeout.ms = 6000
# zookeeper的等待鏈接時間  
zookeeper.connection.timeout.ms = 6000
# zookeeper的follower同leader的同步時間  
zookeeper.sync.time.ms = 2000
# 當zookeeper中沒有初始的offset時,或者超出offset上限時的處理方式 。  
# smallest :重置爲最小值   
# largest:重置爲最大值   
# anything else:拋出異常給consumer  
auto.offset.reset = largest

# socket的超時時間,實際的超時時間爲max.fetch.wait + socket.timeout.ms.  
socket.timeout.ms = 30 * 1000
# socket的接收緩存空間大小  
socket.receive.buffer.bytes = 64 * 1024
# 從每一個分區fetch的消息大小限制  
fetch.message.max.bytes = 1024 * 1024

# true時,Consumer會在消費消息後將offset同步到zookeeper,這樣當Consumer失敗後,新的consumer就能從zookeeper獲取最新的offset  
auto.commit.enable = true
# 自動提交的時間間隔  
auto.commit.interval.ms = 60 * 1000

# 用於消費的最大數量的消息塊緩衝大小,每一個塊能夠等同於fetch.message.max.bytes中數值  
queued.max.message.chunks = 10

# 當有新的consumer加入到group時,將嘗試reblance,將partitions的消費端遷移到新的consumer中, 該設置是嘗試的次數  
rebalance.max.retries = 4
# 每次reblance的時間間隔  
rebalance.backoff.ms = 2000
# 每次從新選舉leader的時間  
refresh.leader.backoff.ms

# server發送到消費端的最小數據,如果不知足這個數值則會等待直到知足指定大小。默認爲1表示當即接收。  
fetch.min.bytes = 1
# 如果不知足fetch.min.bytes時,等待消費端請求的最長等待時間  
fetch.wait.max.ms = 100
# 若是指定時間內沒有新消息可用於消費,就拋出異常,默認-1表示不受限  
consumer.timeout.ms = -1

#############################Producer######################  
# 核心的配置包括:  
# metadata.broker.list  
# request.required.acks  
# producer.type  
# serializer.class  

# 消費者獲取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也能夠在外面設置一個vip  
metadata.broker.list

# 消息的確認模式  
# 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP  
# 1:發送消息,並會等待leader 收到確認後,必定的可靠性  
# -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性  
request.required.acks = 0

# 消息發送的最長等待時間  
request.timeout.ms = 10000
# socket的緩存大小  
send.buffer.bytes = 100 * 1024
# key的序列化方式,如果沒有設置,同serializer.class  
key.serializer.class
# 分區的策略,默認是取模  
partitioner.class =kafka.producer.DefaultPartitioner
# 消息的壓縮模式,默認是none,能夠有gzip和snappy  
compression.codec = none
# 能夠針對默寫特定的topic進行壓縮  
compressed.topics = null
# 消息發送失敗後的重試次數  
message.send.max.retries = 3
# 每次失敗後的間隔時間  
retry.backoff.ms = 100
# 生產者定時更新topic元信息的時間間隔 ,如果設置爲0,那麼會在每一個消息發送後都去更新數據  
topic.metadata.refresh.interval.ms = 600 * 1000
# 用戶隨意指定,可是不能重複,主要用於跟蹤記錄消息  
client.id = ""

# 異步模式下緩衝數據的最大時間。例如設置爲100則會集合100ms內的消息後發送,這樣會提升吞吐量,可是會增長消息發送的延時  
queue.buffering.max.ms = 5000
# 異步模式下緩衝的最大消息數,同上  
queue.buffering.max.messages = 10000
# 異步模式下,消息進入隊列的等待時間。如果設置爲0,則消息不等待,若是進入不了隊列,則直接被拋棄  
queue.enqueue.timeout.ms = -1
# 異步模式下,每次發送的消息數,當queue.buffering.max.messages或queue.buffering.max.ms知足條件之一時producer會觸發發送。  
batch.num.messages = 200
相關文章
相關標籤/搜索