Kafka 環境部署與配置詳解

什麼是Kafka

Kafka是一種高吞吐量 的分佈式發佈訂閱消息系統,有以下特性:
    1>.經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。
    2>.高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。
    3>.支持經過Kafka服務器和消費機集羣來分區消息。
    4>.支持Hadoop並行數據加載。

Kafka經常使用術語

Broker

Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker

Topic

每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息
雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)

Partition

Partition是物理上的概念,每一個Topic包含一個或多個Partition.爲了更好的實現負載均衡和消息的順序性,kafka的producer在
分發消息時能夠經過分發策略發送給指定的partition。實現分發的程序是須要制定消息的key值,而kafka經過key進行策略分發。

Producer

負責發佈消息到Kafka broker

Consumer

消息消費者,向Kafka broker讀取消息的客戶端。

Consumer Group

每一個Consumer屬於一個特定的ConsumerGroup(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

Kafka官網在哪?

http://kafka.apache.org/

Kafka安裝與配置

JDK安裝

下載JDK  (注意:直接複製到下載工具進行下載,版本請使用JDK7,JDK8有可能不兼容kafka_2.11-0.8.2.1)
cd /usr/local/src
 wget http://download.oracle.com/otn-pub/java/jdk/7u79-b15/jdk-7u79-linux-x64.rpm
 chmod +x jdk-7u79-linux-x64.rpm # 添加執行權限
 rpm -ivh jdk-7u79-linux-x64.rpm #安裝
 cd /usr/java/ #安裝完成以後,能夠cd /usr/java/ 到安裝目錄查看到安裝目錄查看
 添加JDK到系統環境變量
 vim /etc/profile  #編輯,在最後添加如下代碼
 export JAVA_HOME=/usr/java/jdk1.7.0_79
 export PATH=$PATH:$JAVA_HOME/bin:/usr/bin:/usr/sbin:/bin:/sbin:/usr/X11R6/bin
 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
 wq! #保存退出
 source /etc/profile  #使配置文件當即生效
 java -version #出現下述信息 則安裝成功 可進行下一步
   java version "1.7.0_79"
   Java(TM) SE Runtime Environment (build 1.7.0_79-b15)
   Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
   You have new mail in /var/spool/mail/root

kafka服務配置

cd /usr/local/src
wget http://archive.apache.org/dist/kafka/0.8.2.1/kafka_2.11-0.8.2.1.tgz 
#(kafka_2.11-0.8.2.1.tgz版本是已經編譯好的版本,解壓就能使用)
tar -xzvf kafka_2.11-0.8.2.1.tgz #解壓
mv kafka_2.11-0.8.2.1 /usr/local/kafka #移動到安裝目錄

mkdir /usr/local/kafka/log/kafka #建立kafka日誌目錄
mkdir /usr/local/kafka/zookeeper #建立zookeeper目錄
mkdir /usr/local/kafka/log/zookeeper #建立zookeeper日誌目錄
cd /usr/local/kafka/config #進入配置目錄
vim server.properties #編輯修改相應的參數
 ---------------------------------------------server.properties-------------------------------------
 broker.id=0
 port=9092                                              #端口號
 host.name=192.168.0.11                                 #服務器IP地址,修改成本身的服務器IP
 log.dirs=/usr/local/kafka/log/kafka                    #日誌存放路徑,上面建立的目錄
 zookeeper.connect=localhost:2181                       #zookeeper地址和端口,單機配置部署,localhost:2181
 log.cleaner.enable=false                               #topic是否可刪除
 zookeeper.connection.timeout.ms=6000                   #超時設置
 -----------------------------------------------------------------------------------------------------
 vim zookeeper.properties #編輯修改相應的參數
 ---------------------------------------------zookeeper.properties------------------------------------
 dataDir=/usr/local/kafka/zookeeper #zookeeper數據目錄
 dataLogDir=/usr/local/kafka/log/zookeeper #zookeeper日誌目錄
 clientPort=2181
 maxClientCnxns=100
 tickTime=2000
 initLimit=10
 syncLimit=5
 ------------------------------------------------------------------------------------------------------
 cd /usr/local/kafka
 vim kafkastart.sh #建立啓動腳本
 ---------------------------------------------kafkastart.sh--------------------------------------------
 #!/bin/sh
 #啓動zookeeper
 /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &
 sleep 3 #等3秒後執行
 #啓動kafka
 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
 -------------------------------------------------------------------------------------------------------
 vim kafkastop.sh #建立中止腳本
 ----------------------------------------------kafkastop.sh---------------------------------------------
 #!/bin/sh
 #啓動zookeeper
 /usr/local/kafka/bin/zookeeper-server-stop.sh /usr/local/kafka/config/zookeeper.properties &
 sleep 3 #等3秒後執行
 #啓動kafka
 /usr/local/kafka/bin/kafka-server-stop.sh /usr/local/kafka/config/server.properties &
 -------------------------------------------------------------------------------------------------------
 chmod +x kafkastart.sh
 chmod +x kafkastop.sh

server.properties 配置說明

name	默認值	描述
broker.id  none  每個boker都有一個惟一的id做爲它們的名字.這就容許boker切換到別的主機/端口上,consumer依然知道
enable.zookeeper	true	容許註冊到zookeeper
log.flush.interval.messages	Long.MaxValue	在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms	Long.MaxValue	在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms	Long.MaxValue	檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours	168	控制一個log保留多長個小時
log.retention.bytes	-1	控制log文件最大尺寸
log.cleaner.enable	false	是否log cleaning
log.cleanup.policy  delete  delete仍是compat.其它控制參數還包括
log.cleaner.threads,log.cleaner.io.max.bytes.per.second,
log.cleaner.dedupe.buffer.size,log.cleaner.io.buffer.size,log.cleaner.io.buffer.load.factor,
log.cleaner.backoff.ms,log.cleaner.min.cleanable.ratio,log.cleaner.delete.retention.ms
log.dir	/tmp/kafka-logs	指定log文件的根目錄
log.segment.bytes	110241024*1024	單一的log segment文件大小
log.roll.hours	24 * 7	開始一個新的log文件片斷的最大時間
message.max.bytes	1000000 + MessageSet.LogOverhead	一個socket 請求的最大字節數
num.network.threads	3	處理網絡請求的線程數
num.io.threads	8	處理IO的線程數
background.threads	10	後臺線程序
num.partitions	1	默認分區數
socket.send.buffer.bytes	102400	socket SO_SNDBUFF參數
socket.receive.buffer.bytes	102400	socket SO_RCVBUFF參數
zookeeper.connect	localhost:2182/kafka	指定zookeeper鏈接字符串, 格式如hostname:port/chroot。
chroot是一個namespace 
zookeeper.connection.timeout.ms	6000	指定客戶端鏈接zookeeper的最大超時時間
zookeeper.session.timeout.ms	6000	鏈接zk的session超時時間
zookeeper.sync.time.ms	2000	zk follower落後於zk leader的最長時間

producer.properties 配置說明

#指定kafka節點列表,用於獲取metadata,沒必要所有指定
    metadata.broker.list=192.168.2.105:9092,192.168.2.106:9092

    # 指定分區處理類。默認kafka.producer.DefaultPartitioner,表經過key哈希到對應分區
    #partitioner.class=com.meituan.mafka.client.producer.CustomizePartitioner

    # 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後消息中會有頭來指明消息壓縮類型,
    # 故在消費者端消息解壓是透明的無需指定。
    compression.codec=none
      
    # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultEncoder,
    # 即byte[]
    serializer.class=com.meituan.mafka.client.codec.MafkaMessageEncoder
    # serializer.class=kafka.serializer.DefaultEncoder
    # serializer.class=kafka.serializer.StringEncoder

    # 若是要壓縮消息,這裏指定哪些topic要壓縮消息,默認empty,表示不壓縮。
    #compressed.topics=

    ########### request ack ###############
    #消息的確認模式 默認爲0
    # 0:不保證消息的到達確認,只管發送,低延遲可是會出現消息的丟失,在某個server失敗的狀況下,有點像TCP
    # 1:發送消息,並會等待leader 收到確認後,必定的可靠性
    # -1:發送消息,等待leader收到確認,並進行復制操做後,才返回,最高的可靠性
    request.required.acks=0

    # 在向producer發送ack以前,broker容許等待的最大時間
    # 若是超時,broker將會向producer發送一個error ACK.意味着上一次消息由於某種
    # 緣由未能成功(好比follower未能同步成功)
    request.timeout.ms=10000
    ########## end #####################

    # 同步仍是異步發送消息,默認「sync」表同步,"async"表異步。異步能夠提升發送吞吐量,
    # 也意味着消息將會在本地buffer中,並適時批量發送,可是也可能致使丟失未發送過去的消息
    producer.type=sync

    ############## 異步發送 (如下四個異步參數可選) ####################
    # 在async模式下,當message被緩存的時間超過此值後,將會批量發送給broker,默認爲5000ms
    # 此值和batch.num.messages協同工做.
    queue.buffering.max.ms = 5000

    # 在async模式下,producer端容許buffer的最大消息量
    # 不管如何,producer都沒法儘快的將消息發送給broker,從而致使消息在producer端大量沉積
    # 此時,若是消息的條數達到閥值,將會致使producer端阻塞或者消息被拋棄,默認爲10000
    queue.buffering.max.messages=20000

    # 若是是異步,指定每次批量發送數據量,默認爲200
    batch.num.messages=500

    # 當消息在producer端沉積的條數達到"queue.buffering.max.meesages"後
    # 阻塞必定時間後,隊列仍然沒有enqueue(producer仍然沒有發送出任何消息)
    # 此時producer能夠繼續阻塞或者將消息拋棄,此timeout值用於控制"阻塞"的時間
    # -1: 無阻塞超時限制,消息不會被拋棄
    # 0:當即清空隊列,消息被拋棄
    queue.enqueue.timeout.ms=-1
    ################ end ###############

    # 當producer接收到error ACK,或者沒有接收到ACK時,容許消息重發的次數
    # 由於broker並無完整的機制來避免消息重複,因此當網絡異常時(好比ACK丟失)
    # 有可能致使broker接收到重複的消息,默認值爲3.
    message.send.max.retries=3

    # producer刷新topic metada的時間間隔,producer須要知道partition leader的位置,以及當前topic的狀況
    # 所以producer須要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會當即刷新
    # (好比topic失效,partition丟失,leader失效等),此外也能夠經過此參數來配置額外的刷新機制,默認值600000
    topic.metadata.refresh.interval.ms=60000

consumer.properties 配置說明

# zookeeper鏈接服務器地址,此處爲線下測試環境配置(kafka消息服務-->kafka broker集羣線上部署環境wiki)
    # 配置例子:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
    zookeeper.connect=192.168.2.225:2181,192.168.2.225:2182,192.168.2.225:2183/config/mobile/mq/mafka

    # zookeeper的session過時時間,默認5000ms,用於檢測消費者是否掛掉,當消費者掛掉,其餘消費者要等該指定時間
    # 才能檢查到而且觸發從新負載均衡
    zookeeper.session.timeout.ms=5000
    zookeeper.connection.timeout.ms=10000

    # 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper
    # 發生異常並重啓,將可能拿到已拿到過的消息
    zookeeper.sync.time.ms=2000

    #指定消費組
    group.id=xxx

    # 當consumer消費必定量的消息以後,將會自動向zookeeper提交offset信息
    # 注意offset信息並非每消費一次消息就向zk提交一次,而是如今本地保存(內存),並按期提交,默認爲true
    auto.commit.enable=true

    # 自動更新時間。默認60 * 1000
    auto.commit.interval.ms=1000

    # 當前consumer的標識,能夠設定,也能夠有系統生成,主要用來跟蹤消息消費狀況,便於觀察
    conusmer.id=xxx

    # 消費者客戶端編號,用於區分不一樣客戶端,默認客戶端程序自動產生
    client.id=xxxx

    # 最大取多少塊緩存到消費者(默認10)
    queued.max.message.chunks=50

    # 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新
    # 的consumer上,若是一個consumer得到了某個partition的消費權限,那麼它將會向zk註冊
    # "Partition Owner registry"節點信息,可是有可能此時舊的consumer尚沒有釋放此節點,
    # 此值用於控制,註冊節點的重試次數.
    rebalance.max.retries=5

    # 獲取消息的最大尺寸,broker不會像consumer輸出大於此值的消息chunk
    # 每次feth將獲得多條消息,此值爲總大小,提高此值,將會消耗更多的consumer端內存
    fetch.min.bytes=6553600

    # 當消息的尺寸不足時,server阻塞的時間,若是超時,消息將當即發送給consumer
    fetch.wait.max.ms=5000
    socket.receive.buffer.bytes=655360

    # 若是zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、
    # anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
    auto.offset.reset=smallest

    # 指定序列化處理類(mafka client API調用說明-->3.序列化約定wiki),默認爲kafka.serializer.DefaultDecoder,
    # 即byte[]
    derializer.class=com.meituan.mafka.client.codec.MafkaMessageDecoder
參數參考可見:http://debugo.com/kafka-params/

kafka 服務啓動

sh /usr/local/kafka/kafkastart.sh        #kafka啓動
sh /usr/local/kafka/kafkastop.sh         #kafka中止

kafka經常使用命令

建立一個名爲test的topic
 bin/kafka-topics.sh --create --zookeeper 192.168.1.3:2181 --replication-factor 1 --partitions 5 --topic b
 查看當前建立的topic
 bin/kafka-topics.sh --zookeeper 192.168.1.3::2181 --list
 驗證生產消息成功 
 bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.3::9092 --topic boyaa --time -1 
 #(參數解釋:--time -1 表示從最新的時間的offset中獲得數據條數) 
 #輸出結果每一個字段分別表示topic、partition、untilOffset

 開啓消息生產者
 bin/kafka-console-producer.sh --broker-list 192.168.1.3:9092 --sync --topic b
 開啓消費者
 bin/kafka-console-consumer.sh --zookeeper 192.168.201.73:2181 --topic boyaa --from-beginning 
 #(--from-beginning 若是consumer以前沒有創建offset,則從producer最開始的數據讀取。)
相關文章
相關標籤/搜索