一般,生產環境應該將Kafka集羣部署在Linux操做系統上,緣由以下:
(1)Kafka客戶端底層使用了Java的selector,selector在Linux上的實現機制是epoll,而在Windows平臺上的實現機制是select,所以Kafka部署在Linux上可以得到更高效的I/O性能。
(2)網絡傳輸效率的差異。Kafka須要在磁盤和網絡間進行大量數據傳輸,在Linux部署Kafka可以享受到零拷貝(Zero Copy)技術所帶來的快速數據傳輸特性。
(3)社區的支持度。Apache Kafka社區目前對Windows平臺上發現的Kafka Bug不作任何承諾。java
(1)Kafka實現了冗餘機制來提供高可靠性,並經過分區機制在軟件層面實現負載均衡,所以Kafka的磁盤存儲能夠不使用磁盤陣列(RAID),使用普通磁盤組成存儲空間便可。
(2)使用機械磁盤可以勝任Kafka線上環境,但SSD顯然性能更好。git
規劃磁盤容量時須要考慮:新增消息數、消息留存時間、平均消息大小、備份數、是否啓用壓縮等因素。
假設公司業務天天須要向Kafka集羣發送100000000條消息,每條消息保存兩份以防止數據丟失,消息默認保存7天時間,消息的平均大小是1KB,Kafka的數據壓縮比是0.75。
天天100000000條1KB大小的消息,保存兩份,壓縮比0.75,佔用空間大小就等於150GB(100000000*1KB*2/1000/1000*0.75
),考慮到Kafka集羣的索引數據等,須要預留出10%的磁盤空間,所以天天總存儲容量是165GB。數據留存7天,所以規劃磁盤容量爲1155GB(165GB*7
)。github
假設公司的機房環境是千兆網絡,即1Gbps,業務須要在1小時內處理1TB的業務數據。假設Kafka Broker會用到70%的帶寬資源,超過70%的閾值可能網絡丟包,單臺Kafka Broker最多能使用大約700Mb的帶寬資源,但一般須要再額外爲其它服務預留出2/3的資源,即Kafka Broker能夠爲Kafka服務分配帶寬240Mbps(700Mb/3)。1小時處理1TB數據,則每秒須要處理2336Mb(1024*1024*8/3600
)數據,除以240,約等於10臺服務器。若是還須要額外複製兩份,那麼服務器臺數還要乘以3,即30臺。web
Broker端參數也被稱爲靜態參數(Static Configs),靜態參數只能在Kafka的配置文件server.properties中進行設置,必須重啓Broker進程才能生效。
log.dirs:指定Broker須要使用的若干個文件目錄路徑,沒有默認值,必須指定。在生產環境中必定要爲log.dirs配置多個路徑,若是條件容許,須要保證目錄被掛載到不一樣的物理磁盤上。優點在於,提高讀寫性能,多塊物理磁盤同時讀寫數據具備更高的吞吐量;可以實現故障轉移(Failover),Kafka 1.1版本引入Failover功能,壞掉磁盤上的數據會自動地轉移到其它正常的磁盤上,並且Broker還能正常工做,基於Failover機制,Kafka能夠捨棄RAID方案。
zookeeper.connect:CS格式參數,能夠指定值爲zk1:2181,zk2:2181,zk3:2181,不一樣Kafka集羣能夠指定:zk1:2181,zk2:2181,zk3:2181/kafka1,chroot只須要寫一次。
listeners:設置內網訪問Kafka服務的監聽器。
advertised.listeners:設置外網訪問Kafka服務的監聽器。
auto.create.topics.enable:是否容許自動建立Topic。
unclean.leader.election.enable:是否容許Unclean Leader 選舉。
auto.leader.rebalance.enable:是否容許按期進行Leader選舉,生產環境中建議設置成false。
log.retention.{hours|minutes|ms}:控制一條消息數據被保存多長時間。優先級:ms設置最高、minutes次之、hours最低。
log.retention.bytes:指定Broker爲消息保存的總磁盤容量大小。message.max.bytes:控制Broker可以接收的最大消息大小。算法
若是同時設置了Topic級別參數和全局Broker參數,Topic級別參數會覆蓋全局Broker參數,而每一個Topic都能設置本身的參數值。
生產環境中,應當容許不一樣部門的Topic根據自身業務須要,設置本身的留存時間。若是隻能設置全局Broker參數,那麼勢必要提取全部業務留存時間的最大值做爲全局參數值,此時設置Topic級別參數對Broker參數進行覆蓋就是一個不錯的選擇。
retention.ms:指定Topic消息被保存的時長,默認是7天,只保存最近7天的消息,會覆蓋掉Broker端的全局參數值。
retention.bytes:指定爲Topic預留多大的磁盤空間。一般在多租戶的Kafka集羣中使用,默認值是 -1,表示能夠無限使用磁盤空間。
max.message.bytes:指定Kafka Broker可以正常接收Topic 的最大消息大小。
Topic級別參數能夠在建立Topic時進行設置,也能夠在修改Topic 時設置,推薦在修改Topic時進行設置,Apache Kafka社區將來可能統一使用kafka-configs腳原本設置Topic級別參數。docker
Kafka 2.0.0版本已經正式摒棄對Java 7的支持。
Kafka Broker在與客戶端進行交互時會在JVM堆上建立大量的Byte Buffer實例,所以JVM端設置的Heap Size不能過小,建議設置6GB。export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
JVM端配置的一個重要參數是垃圾回收器的設置。對於Java 7,若是Broker所在機器的CPU資源很是充裕,建議使用CMS收集器。啓用方法是指定-XX:+UseCurrentMarkSweepGC。不然,使用吞吐量收集器,開啓方法是指定-XX:+UseParallelGC。對於Java 9,用默認的G1收集器,在沒有任何調優的狀況下,G1表現得要比CMS出色,主要體如今更少的Full GC,須要調整的參數更少等,因此使用G1就好。export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
bootstrap
件描述符限制:ulimit -n。建議設置成一個超大的值,如ulimit -n 1000000。
文件系統類型:文件系統類型的選擇。根據官網的測試報告,XFS 的性能要強於ext4。
Swappiness:推薦設置爲一個較小值,如1。若是將swap設置爲0,將會徹底禁止Kafka Broker進程使用swap空間;當物理內存耗盡時,操做系統會觸發OOM killer組件,隨機挑選一個進程kill掉,不給用戶任何預警。若是設置一個比較小值,當開始使用swap空間時,Broker性能會出現急劇降低,從而給進一步調優和診斷問題的時間。
提交時間:提交時間(Flush落盤時間)。向Kafka發送數據並非真要等數據被寫入磁盤纔會認爲成功,而是隻要數據被寫入到操做系統的頁緩存(Page Cache)上就認爲寫入成功,隨後操做系統根據LRU算法會按期將頁緩存上的髒數據落盤到物理磁盤上。頁緩存數據寫入磁盤的週期由提交時間來肯定,默認是5秒,能夠適當地增長提交間隔來下降物理磁盤的寫操做。若是在頁緩存中的數據在寫入到磁盤前機器宕機,數據會丟失,但鑑於Kafka在軟件層面已經提供了多副本的冗餘機制,拉大提交間隔換取性能是一個合理的作法。緩存
安裝Docker:sudo yum install docker
啓動Docker:sudo systemctl start docker
docker版本檢查:docker version
安全
docker-compose下載:sudo curl -L https://github.com/docker/compose/releases/download/1.23.0-rc3/docker-compose-
uname -s-
uname -m-o /usr/local/bin/docker-compose
docker-compose安裝:sudo chmod +x /usr/local/bin/docker-compose
docker-compose版本檢查:docker-compose version
bash
zookeeper鏡像選擇:docker search zookeeper
選擇star最多的鏡像:docker.io/zookeeper
Kafka鏡像選擇:docker search kafka
選擇star最多的鏡像:docker.io/wurstmeister/kafka
kafka-manager鏡像選擇:docker search kafka-manager
選擇鏡像:kafkamanager/kafka-manager
# 單機 zookeeper + kafka + kafka-manager集羣 version: '2' services: # 定義zookeeper服務 zookeeper-test: image: zookeeper # zookeeper鏡像 restart: always hostname: zookeeper-test ports: - "12181:2181" # 宿主機端口:docker內部端口 container_name: zookeeper-test # 容器名稱 # 定義kafka服務 kafka-test: image: wurstmeister/kafka # kafka鏡像 restart: always hostname: kafka-test ports: - "9092:9092" # 對外暴露端口號 - "9999:9999" # 對外暴露JMX_PORT environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.0.105 # KAFKA_ADVERTISED_PORT: 9092 # KAFKA_ZOOKEEPER_CONNECT: zookeeper-test:2181 # zookeeper服務 KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 30000 # zookeeper鏈接超時 KAFKA_LOG_CLEANUP_POLICY: "delete" KAFKA_LOG_RETENTION_HOURS: 120 # 設置消息數據保存的最長時間爲120小時 KAFKA_MESSAGE_MAX_BYTES: 10000000 # 消息體的最大字節數 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 # KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 # KAFKA_NUM_PARTITIONS: 1 # 分區數量 KAFKA_DELETE_RETENTION_MS: 10000 # KAFKA_BROKER_ID: 1 # kafka的ID KAFKA_COMPRESSION_TYPE: lz4 KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.0.105 -Dcom.sun.management.jmxremote.rmi.port=9999" # 導入KAFKA_JMX_OPTS環境變量 JMX_PORT: 9999 # 導入JMX_PORT環境變量 depends_on: - zookeeper-test # 依賴 container_name: kafka-test # 定義kafka-manager服務 kafka-manager-test: image: kafkamanager/kafka-manager # kafka-manager鏡像 restart: always container_name: kafka-manager-test hostname: kafka-manager-test ports: - "9000:9000" # 對外暴露端口,提供web訪問 depends_on: - kafka-test # 依賴 environment: ZK_HOSTS: zookeeper-test:2181 # 宿主機IP KAFKA_BROKERS: kafka-test:9090 # kafka KAFKA_MANAGER_AUTH_ENABLED: "true" # 開啓安全認證 KAFKA_MANAGER_USERNAME: kafka-manager # Kafka Manager登陸用戶 KAFKA_MANAGER_PASSWORD: 123456 # Kafka Manager登陸密碼
須要確認相應端口是否被佔用。
建立kafka目錄,將docker-compose.yml文件放入kafka目錄,在kafka目錄執行命令。
啓動:docker-compose up -d
關閉:docker-compose down
進入docker容器:docker exec -it kafka /bin/bash
建立Topic:kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic test
查看Topic:kafka-topics.sh --list --zookeeper zookeeper:2181
生產消息:kafka-console-producer.sh --broker-list kafka:9092 --topic test
消費消息:kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic test --from-beginning
打開兩個Terminal,一個執行生產消息的命令,一個執行消費消息的命令,每生產一條消息時消費消息Terminal就會顯示一條消息,實現消息隊列。
wurstmeister/kafka鏡像中,kafka安裝在/opt目錄下,進入/opt目錄,kafka_2.12-2.4.0目錄即爲kafka安裝目錄。
Scala版本:2.12
Kafka版本:2.4
Web方式訪問:http://127.0.0.1:9000
docker rm -f $(docker ps -a --filter status=dead -q |head -n 1)
報錯信息:ERROR: for f78856fb92e9_zoo1 Driver overlay2 failed to remove root filesystem f78856fb92e97f75ff4c255077de544b39351a4a2a3319737ada2a54df568032: remove /var/lib/docker/overlay2/2c257b8071b6a3d79e216838522f76ba7263d466a470dc92cdbef25c4dd04dc3/merged: device or resource busy
grep docker /proc/*/mountinfo|grep containerid | awk -F ":" '{print $1}' | awk -F "/" '{print $3}'
sudo kill -9 3119
報錯信息:Error response from daemon: Container 9b3f9af8a1196f2ad3cf74fe2b1eeb7ccbd231fe2a93ec09f594d3a0fbb5783c is restarting, wait until the container is running
錯誤緣由:
docker-compose.yml文件對kafka服務配置restart: always,若是kafka服務啓動失敗會一直重啓,能夠經過docker logs kafka查看kafka服務啓動的日誌信息,查找錯誤緣由。
############################# System ###################### # 惟一標識在集羣中的ID,要求是正數。 broker.id = 0 # 服務端口,默認9092 port = 9092 # 監聽地址,不設爲全部地址 host.name = debugo01 # 處理網絡請求的最大線程數 num.network.threads = 2 # 處理磁盤I/O的線程數 num.io.threads = 8 # 一些後臺線程數 background.threads = 4 # 等待IO線程處理的請求隊列最大數 queued.max.requests = 500 # socket的發送緩衝區(SO_SNDBUF) socket.send.buffer.bytes = 1048576 # socket的接收緩衝區 (SO_RCVBUF) socket.receive.buffer.bytes = 1048576 # socket請求的最大字節數。爲了防止內存溢出,message.max.bytes必然要小於 socket.request.max.bytes = 104857600 ############################# Topic ######################## # 每一個topic的分區個數,更多的partition會產生更多的segment file num.partitions = 2 # 是否容許自動建立topic ,如果false,就須要經過命令建立topic auto.create.topics.enable = true # 一個topic ,默認分區的replication個數 ,不能大於集羣中broker的個數。 default.replication.factor = 1 # 消息體的最大大小,單位是字節 message.max.bytes = 1000000 ############################# ZooKeeper #################### # Zookeeper quorum設置。若是有多個使用逗號分割 zookeeper.connect = debugo01:2181, debugo02, debugo03 # 鏈接zk的超時時間 zookeeper.connection.timeout.ms = 1000000 # ZooKeeper集羣中leader和follower之間的同步實際 zookeeper.sync.time.ms = 2000 ############################# Log ######################### # 日誌存放目錄,多個目錄使用逗號分割 log.dirs = / var / log / kafka # 當達到下面的消息數量時,會將數據flush到日誌文件中。默認10000 # log.flush.interval.messages=10000 # 當達到下面的時間(ms)時,執行一次強制的flush操做。interval.ms和interval.messages不管哪一個達到,都會flush。默認3000ms # log.flush.interval.ms=1000 # 檢查是否須要將日誌flush的時間間隔 log.flush.scheduler.interval.ms = 3000 # 日誌清理策略(delete|compact) log.cleanup.policy = delete # 日誌保存時間 (hours|minutes),默認爲7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes不管哪一個先達到都會觸發。 log.retention.hours = 168 # 日誌數據存儲的最大字節數。超過這個時間會根據policy處理數據。 # log.retention.bytes=1073741824 # 控制日誌segment文件的大小,超出該大小則追加到一個新的日誌segment文件中(-1表示沒有限制) log.segment.bytes = 536870912 # 當達到下面時間,會強制新建一個segment log.roll.hours = 24 * 7 # 日誌片斷文件的檢查週期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes) log.retention.check.interval.ms = 60000 # 是否開啓壓縮 log.cleaner.enable = false # 對於壓縮的日誌保留的最長時間 log.cleaner.delete.retention.ms = 1 day # 對於segment日誌的索引文件大小限制 log.index.size.max.bytes = 10 * 1024 * 1024 # y索引計算的一個緩衝區,通常不須要設置。 log.index.interval.bytes = 4096 ############################# replica ####################### # partition management controller 與replicas之間通信的超時時間 controller.socket.timeout.ms = 30000 # controller-to-broker-channels消息隊列的尺寸大小 controller.message.queue.size = 10 # replicas響應leader的最長等待時間,如果超過這個時間,就將replicas排除在管理以外 replica.lag.time.max.ms = 10000 # 是否容許控制器關閉broker ,如果設置爲true,會關閉全部在這個broker上的leader,並轉移到其餘broker controlled.shutdown.enable = false # 控制器關閉的嘗試次數 controlled.shutdown.max.retries = 3 # 每次關閉嘗試的時間間隔 controlled.shutdown.retry.backoff.ms = 5000 # 若是relicas落後太多,將會認爲此partition relicas已經失效。而通常狀況下,由於網絡延遲等緣由,總會致使replicas中消息同步滯後。若是消息嚴重滯後,leader將認爲此relicas網絡延遲較大或者消息吞吐能力有限。在broker數量較少,或者網絡不足的環境中,建議提升此值. replica.lag.max.messages = 4000 # leader與relicas的socket超時時間 replica.socket.timeout.ms = 30 * 1000 # leader複製的socket緩存大小 replica.socket.receive.buffer.bytes = 64 * 1024 # replicas每次獲取數據的最大字節數 replica.fetch.max.bytes = 1024 * 1024 # replicas同leader之間通訊的最大等待時間,失敗了會重試 replica.fetch.wait.max.ms = 500 # 每個fetch操做的最小數據尺寸,若是leader中還沒有同步的數據不足此值,將會等待直到數據達到這個大小 replica.fetch.min.bytes = 1 # leader中進行復制的線程數,增大這個數值會增長relipca的IO num.replica.fetchers = 1 # 每一個replica將最高水位進行flush的時間間隔 replica.high.watermark.checkpoint.interval.ms = 5000 # 是否自動平衡broker之間的分配策略 auto.leader.rebalance.enable = false # leader的不平衡比例,如果超過這個數值,會對分區進行從新的平衡 leader.imbalance.per.broker.percentage = 10 # 檢查leader是否不平衡的時間間隔 leader.imbalance.check.interval.seconds = 300 # 客戶端保留offset信息的最大空間大小 offset.metadata.max.bytes = 1024 #############################Consumer ##################### # Consumer端核心的配置是group.id、zookeeper.connect # 決定該Consumer歸屬的惟一組ID,By setting the same group id multiple processes indicate that they are all part of the same consumer group. group.id # 消費者的ID,如果沒有設置的話,會自增 consumer.id # 一個用於跟蹤調查的ID ,最好同group.id相同 client.id = < group_id > # 對於zookeeper集羣的指定,必須和broker使用一樣的zk配置 zookeeper.connect = debugo01:2182, debugo02: 2182, debugo03: 2182 # zookeeper的心跳超時時間,查過這個時間就認爲是無效的消費者 zookeeper.session.timeout.ms = 6000 # zookeeper的等待鏈接時間 zookeeper.connection.timeout.ms = 6000 # zookeeper的follower同leader的同步時間 zookeeper.sync.time.ms = 2000 # 當zookeeper中沒有初始的offset時,或者超出offset上限時的處理方式 。 # smallest :重置爲最小值 # largest:重置爲最大值 # anything else:拋出異常給consumer auto.offset.reset = largest # socket的超時時間,實際的超時時間爲max.fetch.wait + socket.timeout.ms. socket.timeout.ms = 30 * 1000 # socket的接收緩存空間大小 socket.receive.buffer.bytes = 64 * 1024 # 從每一個分區fetch的消息大小限制 fetch.message.max.bytes = 1024 * 1024 # true時,Consumer會在消費消息後將offset同步到zookeeper,這樣當Consumer失敗後,新的consumer就能從zookeeper獲取最新的offset auto.commit.enable = true # 自動提交的時間間隔 auto.commit.interval.ms = 60 * 1000 # 用於消費的最大數量的消息塊緩衝大小,每一個塊能夠等同於fetch.message.max.bytes中數值 queued.max.message.chunks = 10 # 當有新的consumer加入到group時,將嘗試reblance,將partitions的消費端遷移到新的consumer中, 該設置是嘗試的次數 rebalance.max.retries = 4 # 每次reblance的時間間隔 rebalance.backoff.ms = 2000 # 每次從新選舉leader的時間 refresh.leader.backoff.ms # server發送到消費端的最小數據,如果不知足這個數值則會等待直到知足指定大小。默認爲1表示當即接收。 fetch.min.bytes = 1 # 如果不知足fetch.min.bytes時,等待消費端請求的最長等待時間 fetch.wait.max.ms = 100 # 若是指定時間內沒有新消息可用於消費,就拋出異常,默認-1表示不受限 consumer.timeout.ms = -1 #############################Producer###################### # 核心的配置包括: # metadata.broker.list # request.required.acks # producer.type # serializer.class # 消費者獲取消息元信息(topics, partitions and replicas)的地址,配置格式是:host1:port1,host2:port2,也能夠在外面設置一個vip metadata.broker.list # 消息的確認模式 # 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP # 1:發送消息,並會等待leader 收到確認後,必定的可靠性 # -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性 request.required.acks = 0 # 消息發送的最長等待時間 request.timeout.ms = 10000 # socket的緩存大小 send.buffer.bytes = 100 * 1024 # key的序列化方式,如果沒有設置,同serializer.class key.serializer.class # 分區的策略,默認是取模 partitioner.class =kafka.producer.DefaultPartitioner # 消息的壓縮模式,默認是none,能夠有gzip和snappy compression.codec = none # 能夠針對默寫特定的topic進行壓縮 compressed.topics = null # 消息發送失敗後的重試次數 message.send.max.retries = 3 # 每次失敗後的間隔時間 retry.backoff.ms = 100 # 生產者定時更新topic元信息的時間間隔 ,如果設置爲0,那麼會在每一個消息發送後都去更新數據 topic.metadata.refresh.interval.ms = 600 * 1000 # 用戶隨意指定,可是不能重複,主要用於跟蹤記錄消息 client.id = "" # 異步模式下緩衝數據的最大時間。例如設置爲100則會集合100ms內的消息後發送,這樣會提升吞吐量,可是會增長消息發送的延時 queue.buffering.max.ms = 5000 # 異步模式下緩衝的最大消息數,同上 queue.buffering.max.messages = 10000 # 異步模式下,消息進入隊列的等待時間。如果設置爲0,則消息不等待,若是進入不了隊列,則直接被拋棄 queue.enqueue.timeout.ms = -1 # 異步模式下,每次發送的消息數,當queue.buffering.max.messages或queue.buffering.max.ms知足條件之一時producer會觸發發送。 batch.num.messages = 200