ubuntu14.0.4安裝kafka 重置kafka的offset

1. 下載 zookeeper-3.4.12html

zookeeper downloadgit

2 配置Zookeepergithub

進入 zookeeper 的 conf 目錄下,找到 zoo_sample.cfg 文件。首先將 zoo_sample.cfg 文件備份,並重命名爲 zoo.cfgshell

blockchain@Dao:~/zookeeper-3.4.13/conf$ cp zoo_sample.cfg zoo.cfgapache

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/blockchain/tmp/zookeeper
#dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1bootstrap

其中,
tickTime:zookeeper 服務器之間或客戶端與服務器之間心跳的時間間隔。
dataDir:zookeeper 保存數據的目錄,默認狀況下,Zookeeper 將寫數據的日誌文件也保存在這個目錄裏。
clientPort:zookeeper 服務器監聽端口,用來接受客戶端的訪問請求。
maxClientCnxns:zookeeper可以接收的最大客戶端鏈接數。ubuntu

dataDir 默認是 /tmp/zookeeper,因爲 /tmp 是 Ubuntu 的 臨時目錄,這個路徑下的數據不能長久保存,所以須要指定到別的目錄。
3. 啓動 Zookeeperruby

blockchain@Dao:~/zookeeper-3.4.13$ 
blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
blockchain@Dao:~/zookeeper-3.4.13$ 
blockchain@Dao:~/zookeeper-3.4.13$ jps
31392 Jps
25248 QuorumPeerMain
blockchain@Dao:~/zookeeper-3.4.13$ 

使用 status 參數來查看 zookeeper 的狀態bash

blockchain@Dao:~/zookeeper-3.4.13$ ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/blockchain/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: standalone
blockchain@Dao:~/zookeeper-3.4.13$ 

 

Zookeeper 客戶端

./zkCli.sh -server ip:port,默認端口爲2181服務器

bin/zkCli.sh -server localhost:2181

 

測試是否安裝成功

telnet localhost 2181

而後輸入srvr

 

下載kafka kafka_2.11-2.1.1.tgz

啓動kafka
bin/kafka-server-start.sh ~/kafka/config/server.properties > ~/kafka/kafka.log 2>&1 &

中止kafka

bin/kafka-server-stop.sh config/server.properties

3.建立一個主題

首先建立一個名爲test的topic,只使用單個分區和一個複本

1
bin /kafka-topics .sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic  test

 如今能夠運行list topic命令看到咱們的主題

1
bin /kafka-topics .sh --list --zookeeper localhost:2181

4.發送消息

1
2
3
bin /kafka-console-producer .sh --broker-list localhost:9092 --topic  test
This is a message
This is another message

若是要批量導入文件數據到kafka,參考:2.1 本地環境下kafka批量導入數據

1
bin /kafka-console-producer .sh --broker-list localhost:9092 --topic test_topic < file_pat

5.啓動一個消費者,消費者會接收到消息

舊版消費者

1
bin /kafka-console-consumer .sh --zookeeper localhost:2181 --topic  test  --from-beginning 2> /dev/null

新版消費者

1
bin /kafka-console-consumer .sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2> /dev/null

6.查看指定的topic的offset信息

對於結尾是ZK的消費者,其消費者的信息是存儲在Zookeeper中的

對於結尾是KF的消費者,其消費者的信息是存在在Kafka的broker中的

均可以使用下面的命令進行查看

1
bin /kafka-consumer-offset-checker .sh --zookeeper localhost:2181 --group xxx --topic xxx

結果

1
2
3
4
bin /kafka-consumer-offset-checker .sh --zookeeper localhost:2181 --group  test -consumer-group --topic xxx
[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped  in  releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test -consumer-group xxx              0   509             0               -509            none

或者

1
. /bin/kafka-run-class .sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx

結果

1
2
3
4
bin /kafka-run-class .sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group  test -consumer-group
[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped  in  releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test -consumer-group xxx              0   509             509             0               none

lag是負數的緣由是 topic中的消息數量過時(超過kafka默認的7天后被刪除了),變成了0,因此Lag=logSize減去Offset,因此就變成了負數

7.刪除一個topic

須要在 conf/server.properties 文件中設置

1
2
# For delete topic
delete.topic. enable = true

不然在執行了如下刪除命令後,再 list 查看全部的topic,仍是會看到該topic

1
bin /kafka-topics .sh --zookeeper localhost:2181 --delete --topic topicB

再到 配置文件 中的kafka數據存儲地址去刪除物理數據了,個人地址爲

1
/tmp/kafka-logs

最後須要到zk裏刪除kafka的元數據

1
2
3
. /bin/zkCli .sh  #進入zk shell
ls  /brokers/topics
rmr  /brokers/topics/topicA

 參考:kafka 手動刪除topic的數據

 

8.查看某個group的信息

新版

1
bin /kafka-consumer-groups .sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx

結果

1
2
3
bin /kafka-consumer-groups .sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
GROUP          TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET   LAG                 OWNER
group_id      xxx              0          509             509             0               consumer-1_ /127 .0.0.1

若是這時候消費者進程關閉了以後,使用上面的命令和下面的-list命令將不會查出這個group_id,可是當消費者進程從新開啓後,這個group_id又能從新查到,且消費的offset不會丟失

舊版

1
bin /kafka-consumer-groups .sh --zookeeper 127.0.0.1:2181 --group xxx --describe

9.查看consumer group的列表

ZK的消費者可使用下面命令查看,好比上面的例子中的 test-consumer-group

1
bin /kafka-consumer-groups .sh --zookeeper 127.0.0.1:2181 --list

KF的消費者可使用下面命令查看,好比上面的例子中的 console-consumer-xxx ,可是隻會查看到相似於 KMOffsetCache-lintong-B250M-DS3H 的結果,這是因爲這種消費者的信息是存放在 __consumer_offsets 中

對於如何查看存儲於 __consumer_offsets 中的新版消費者的信息,能夠參考huxihx的博文: Kafka 如何讀取offset topic內容 (__consumer_offsets)

1
bin /kafka-consumer-groups .sh --new-consumer --bootstrap-server localhost:9092 --list

10.在zk中刪除一個consumer group

1
rmr  /consumers/test-consumer-group

11.查看topic的offset的最小值

參考:重置kafka的offset

1
2
bin /kafka-run-class .sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx -- time  -2
xxxx:0:0

12.查看topic的offset的最大值

1
bin /kafka-run-class .sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx -- time  -1

13.重置topic的某個消費者的offset爲0,須要高版本的kafka纔有該命令,在高版本的kafka client對低版本的kafka集羣執行該命令是會生效的

1
kafka-consumer- groups  --bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute

 


安裝Kafka客戶端
librdkafka

安裝下載https://github.com/edenhill/librdkafka 

預備環境:

The GNU toolchain
GNU make
pthreads
zlib (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support)
libsasl2-dev (optional, for SASL GSSAPI support)

編譯和安裝:

./configure
make
sudo make install



ubuntu14安裝kafka
相關文章
相關標籤/搜索