1. 下載 zookeeper-3.4.12html
2 配置Zookeepergithub
進入 zookeeper 的 conf 目錄下,找到 zoo_sample.cfg 文件。首先將 zoo_sample.cfg 文件備份,並重命名爲 zoo.cfgshell
blockchain@Dao:~/zookeeper-3.4.13/conf$ cp zoo_sample.cfg zoo.cfgapache
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/blockchain/tmp/zookeeper
#dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1bootstrap
其中,
tickTime:zookeeper 服務器之間或客戶端與服務器之間心跳的時間間隔。
dataDir:zookeeper 保存數據的目錄,默認狀況下,Zookeeper 將寫數據的日誌文件也保存在這個目錄裏。
clientPort:zookeeper 服務器監聽端口,用來接受客戶端的訪問請求。
maxClientCnxns:zookeeper可以接收的最大客戶端鏈接數。ubuntu
dataDir 默認是 /tmp/zookeeper,因爲 /tmp 是 Ubuntu 的 臨時目錄,這個路徑下的數據不能長久保存,所以須要指定到別的目錄。
3. 啓動 Zookeeperruby
blockchain@Dao:~/zookeeper-3.4.13$ blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh start ZooKeeper JMX enabled by default Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg Starting zookeeper ... STARTED blockchain@Dao:~/zookeeper-3.4.13$ blockchain@Dao:~/zookeeper-3.4.13$ jps 31392 Jps 25248 QuorumPeerMain blockchain@Dao:~/zookeeper-3.4.13$
使用 status 參數來查看 zookeeper 的狀態bash
blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh status ZooKeeper JMX enabled by default Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg Mode: standalone blockchain@Dao:~/zookeeper-3.4.13$
./zkCli.sh -server ip:port,默認端口爲2181服務器
bin/zkCli.sh -server localhost:2181
測試是否安裝成功
telnet localhost 2181
而後輸入srvr
下載kafka kafka_2.11-2.1.1.tgz
啓動kafka
bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &
中止kafka
bin/kafka-server-stop.sh config/server.properties
3.建立一個主題
首先建立一個名爲test
的topic,只使用單個分區和一個複本
1
|
bin
/kafka-topics
.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
|
如今能夠運行list topic命令看到咱們的主題
1
|
bin
/kafka-topics
.sh --list --zookeeper localhost:2181
|
4.發送消息
1
2
3
|
bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic
test
This is a message
This is another message
|
若是要批量導入文件數據到kafka,參考:2.1 本地環境下kafka批量導入數據
1
|
bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic test_topic < file_pat
|
5.啓動一個消費者,消費者會接收到消息
舊版消費者
1
|
bin
/kafka-console-consumer
.sh --zookeeper localhost:2181 --topic
test
--from-beginning 2>
/dev/null
|
新版消費者
1
|
bin
/kafka-console-consumer
.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>
/dev/null
|
6.查看指定的topic的offset信息
對於結尾是ZK的消費者,其消費者的信息是存儲在Zookeeper中的
對於結尾是KF的消費者,其消費者的信息是存在在Kafka的broker中的
均可以使用下面的命令進行查看
1
|
bin
/kafka-consumer-offset-checker
.sh --zookeeper localhost:2181 --group xxx --topic xxx
|
結果
1
2
3
4
|
bin
/kafka-consumer-offset-checker
.sh --zookeeper localhost:2181 --group
test
-consumer-group --topic xxx
[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped
in
releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test
-consumer-group xxx 0 509 0 -509 none
|
或者
1
|
.
/bin/kafka-run-class
.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx
|
結果
1
2
3
4
|
bin
/kafka-run-class
.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group
test
-consumer-group
[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped
in
releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test
-consumer-group xxx 0 509 509 0 none
|
lag是負數的緣由是 topic中的消息數量過時(超過kafka默認的7天后被刪除了),變成了0,因此Lag=logSize減去Offset,因此就變成了負數
7.刪除一個topic
須要在 conf/server.properties 文件中設置
1
2
|
# For delete topic
delete.topic.
enable
=
true
|
不然在執行了如下刪除命令後,再 list 查看全部的topic,仍是會看到該topic
1
|
bin
/kafka-topics
.sh --zookeeper localhost:2181 --delete --topic topicB
|
再到 配置文件 中的kafka數據存儲地址去刪除物理數據了,個人地址爲
1
|
/tmp/kafka-logs
|
最後須要到zk裏刪除kafka的元數據
1
2
3
|
.
/bin/zkCli
.sh
#進入zk shell
ls
/brokers/topics
rmr
/brokers/topics/topicA
|
8.查看某個group的信息
新版
1
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx
|
結果
1
2
3
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
group_id xxx 0 509 509 0 consumer-1_
/127
.0.0.1
|
若是這時候消費者進程關閉了以後,使用上面的命令和下面的-list命令將不會查出這個group_id,可是當消費者進程從新開啓後,這個group_id又能從新查到,且消費的offset不會丟失
舊版
1
|
bin
/kafka-consumer-groups
.sh --zookeeper 127.0.0.1:2181 --group xxx --describe
|
9.查看consumer group的列表
ZK的消費者可使用下面命令查看,好比上面的例子中的 test-consumer-group
1
|
bin
/kafka-consumer-groups
.sh --zookeeper 127.0.0.1:2181 --list
|
KF的消費者可使用下面命令查看,好比上面的例子中的 console-consumer-xxx ,可是隻會查看到相似於 KMOffsetCache-lintong-B250M-DS3H 的結果,這是因爲這種消費者的信息是存放在 __consumer_offsets 中
對於如何查看存儲於 __consumer_offsets 中的新版消費者的信息,能夠參考huxihx的博文: Kafka 如何讀取offset topic內容 (__consumer_offsets)
1
|
bin
/kafka-consumer-groups
.sh --new-consumer --bootstrap-server localhost:9092 --list
|
10.在zk中刪除一個consumer group
1
|
rmr
/consumers/test-consumer-group
|
11.查看topic的offset的最小值
1
2
|
bin
/kafka-run-class
.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --
time
-2
xxxx:0:0
|
12.查看topic的offset的最大值
1
|
bin
/kafka-run-class
.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --
time
-1
|
13.重置topic的某個消費者的offset爲0,須要高版本的kafka纔有該命令,在高版本的kafka client對低版本的kafka集羣執行該命令是會生效的
1
|
kafka-consumer-
groups
--bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute
|
安裝Kafka客戶端librdkafka
安裝下載https://github.com/edenhill/librdkafka
預備環境:
The GNU toolchain
GNU make
pthreads
zlib (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support)
libsasl2-dev (optional, for SASL GSSAPI support)
編譯和安裝:
./configure
make
sudo make install
ubuntu14安裝kafka