1.在清華鏡像站下載kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gzhtml
分別解壓到/usr/local目錄下shell
2.進入zookeeper目錄,在conf目錄下將zoo_sample.cfg文件拷貝,並改名爲zoo.cfgapache
參考 https://my.oschina.net/phoebus789/blog/730787bootstrap
zoo.cfg文件的內容bash
# The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/common/zookeeper/zookeeperdir/zookeeper-data dataLogDir=/home/common/zookeeper/zookeeperdir/logs # the port at which the clients will connect clientPort=2181 server.1=10.10.100.10:2888:3888 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1
新建下面這兩個目錄服務器
/home/common/zookeeper/zookeeperdir/zookeeper-data /home/common/zookeeper/zookeeperdir/logs
在zookeeper-data目錄下新建一個myid文件,內容爲1,表明這個服務器的編號是1,具體參考上面網址中的內容ide
最後在/etc/profile中添加環境變量,並sourcepost
export ZOOKEEPER_HOME=/usr/local/zookeeper export PATH=${ZOOKEEPER_HOME}/bin:$PATH
如今zookeeper就安裝好了,如今啓動zookeeperui
bin/zkServer.sh start
查看狀態this
bin/zkServer.sh status
啓動客戶端腳本
bin/zkCli.sh -server localhost:2181
中止zookeeper
bin/zkServer.sh stop
1.如今安裝kafka,一樣是解壓以後就安裝好了
參考 http://www.jianshu.com/p/efc8b9dbd3bd
2.進入kafka目錄下
kafka須要使用Zookeeper,首先須要啓動Zookeeper服務,上面的操做就已經啓動了Zookeeper服務
若是沒有的話,可使用kafka自帶的腳本啓動一個簡單的單一節點Zookeeper實例
bin/zookeeper-server-start.sh config/zookeeper.properties
啓動 Kafka服務
bin/kafka-server-start.sh config/server.properties
中止 Kafka服務
bin/kafka-server-stop.sh config/server.properties
3.建立一個主題
首先建立一個名爲test
的topic,只使用單個分區和一個複本
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
如今能夠運行list topic命令看到咱們的主題
bin/kafka-topics.sh --list --zookeeper localhost:2181
4.發送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
若是要批量導入文件數據到kafka,參考:2.1 本地環境下kafka批量導入數據
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat
若是要模擬實時數據到打入kafka的狀況,能夠寫一個shell腳本
#!/usr/bin/env bash cat XXXX.log | while read line do sleep 0.1 echo "${line}" echo "${line}" | /home/lintong/software/apache/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA done
5.啓動一個消費者,消費者會接收到消息
舊版消費者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null
新版消費者
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null
6.查看指定的topic的offset信息
對於結尾是ZK的消費者,其消費者的信息是存儲在Zookeeper中的
對於結尾是KF的消費者,其消費者的信息是存在在Kafka的broker中的
均可以使用下面的命令進行查看
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx
結果
bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx [2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner test-consumer-group xxx 0 509 0 -509 none
或者
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx
結果
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group [2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$) Group Topic Pid Offset logSize Lag Owner test-consumer-group xxx 0 509 509 0 none
lag是負數的緣由是 topic中的消息數量過時(超過kafka默認的7天后被刪除了),變成了0,因此Lag=logSize減去Offset,因此就變成了負數
7.刪除一個topic
須要在 conf/server.properties 文件中設置
# For delete topic delete.topic.enable=true
不然在執行了如下刪除命令後,再 list 查看全部的topic,仍是會看到該topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB
再到 配置文件 中的kafka數據存儲地址去刪除物理數據了,個人地址爲
/tmp/kafka-logs
最後須要到zk裏刪除kafka的元數據
./bin/zkCli.sh #進入zk shell ls /brokers/topics rmr /brokers/topics/topicA
8.查看某個group的信息
新版
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx
結果
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER group_id xxx 0 509 509 0 consumer-1_/127.0.0.1
若是這時候消費者進程關閉了以後,使用上面的命令和下面的-list命令將不會查出這個group_id,可是當消費者進程從新開啓後,這個group_id又能從新查到,且消費的offset不會丟失
舊版
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe
9.查看consumer group的列表
ZK的消費者可使用下面命令查看,好比上面的例子中的 test-consumer-group
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
KF的消費者可使用下面命令查看,好比上面的例子中的 console-consumer-xxx ,可是隻會查看到相似於 KMOffsetCache-lintong-B250M-DS3H 的結果,這是因爲這種消費者的信息是存放在 __consumer_offsets 中
對於如何查看存儲於 __consumer_offsets 中的新版消費者的信息,能夠參考huxihx的博文: Kafka 如何讀取offset topic內容 (__consumer_offsets)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
10.在zk中刪除一個consumer group
rmr /consumers/test-consumer-group
11.查看topic的offset的最小值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2 xxxx:0:0
12.查看topic的offset的最大值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1
13.重置topic的某個消費者的offset爲0,須要高版本的kafka纔有該命令,在高版本的kafka client對低版本的kafka集羣執行該命令是會生效的
kafka-consumer-groups --bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute