目錄 1java
1. 前言 2git
2. Broker默認端口號 2github
3. 安裝Kafka 2shell
4. 啓動Kafka 2apache
5. 建立Topic 2json
6. 列出全部Topic 3bootstrap
7. 刪除Topic 3app
8. 查看Topic 3運維
9. 增長topic的partition數 4ssh
10. 生產消息 4
11. 消費消息 4
12. 查看有哪些消費者Group 4
13. 查看新消費者詳情 5
14. 查看Group詳情 5
15. 刪除Group 5
16. 設置consumer group的offset 5
17. RdKafka自帶示例 6
18. 平衡leader 6
19. 自帶壓測工具 6
20. 查看topic指定分區offset的最大值或最小值 6
21. 查看__consumer_offsets 6
22. 獲取指定consumer group的位移信息 7
23. 20) 查看kafka的zookeeper 7
24. 如何增長__consumer_offsets的副本數? 9
25. 問題 11
附1:進程監控工具process_monitor.sh 12
附2:批量操做工具 12
附2.1:批量執行命令工具:mooon_ssh 12
附2.2:批量上傳文件工具:mooon_upload 13
附2.3:使用示例 13
附3:批量設置broker.id和listeners工具 15
附4:批量設置hostname工具 16
附5:Kafka監控工具kafka-manager 16
附6:kafka的安裝 16
附7:__consumer_offsets 17
本文內容主要來自兩個方面:一是網上的分享,二是自研的隨手記。日記月累,收錄kafka各類命令,會持續更新。
在0.9.0.0以後的Kafka,出現了幾個新變更,一個是在Server端增長了GroupCoordinator這個角色,另外一個較大的變更是將topic的offset 信息由以前存儲在zookeeper上改成存儲到一個特殊的topic(__consumer_offsets)中。
Kafka的瓶頸容易發生在網卡,而不是CPU、內存和磁盤,因此應當考慮log的壓縮。
相關網址:
1) Kafka官網:http://kafka.apache.org/
2) 下載地址:http://kafka.apache.org/downloads
3) 客戶端庫:https://cwiki.apache.org/confluence/display/KAFKA/Clients
4) librdkafka庫:https://github.com/edenhill/librdkafka
5) confluent-kafka-go:https://github.com/confluentinc/confluent-kafka-go
9092,建議安裝時,在zookeeper中指定kafka的根目錄,好比「/kafka」,而不是直接使用「/」,這樣多套kafka也可共享同一個zookeeper集羣。
相比Hadoop、HBase、Spark,甚至Redis等,Kafka的安裝到跑起來比較簡單,參考官方的介紹便可:http://kafka.apache.org/quickstart。
Kafka依賴Zookeeper,自己自帶了Zookeeper,不過建議另外安裝Zookeeper。
kafka-server-start.sh config/server.properties
後臺常駐方式,請帶上參數「-daemon」,如:
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties |
參數--topic指定Topic名,--partitions指定分區數,--replication-factor指定備份數:
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
注意,若是配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,不然會報無可用的brokers,如:
kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test |
kafka-topics.sh --list --zookeeper localhost:2181
注意,若是配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,不然會報無可用的brokers,如:
kafka-topics.sh --list --zookeeper localhost:2181/kafka
輸出示例:
__consumer_offsets my-replicated-topic test |
1) kafka-topics.sh --zookeeper localhost:2181 --topic test --delete
2) kafka-topics.sh --zookeeper localhost:2181/kafka --topic test --delete
3) kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
注意,若是配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,不然會報無可用的brokers,如:
kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic test
輸出示例:
Topic:test PartitionCount:3 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 140 Replicas: 140,214 Isr: 140,214 Topic: test Partition: 1 Leader: 214 Replicas: 214,215 Isr: 214,215 Topic: test Partition: 2 Leader: 215 Replicas: 215,138 Isr: 215,138 |
kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5
kafka-console-producer.sh --broker-list localhost:9092 --topic test
1) 從頭開始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
2) 從尾部開始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest
3) 指定分區
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1
4) 取指定個數
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1 --max-messages 1
5) 新消費者(ver>=0.9)
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties
1) 分ZooKeeper方式(老)
kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181/kafka --list
2) API方式(新)
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list
輸出示例:
test console-consumer-37602 console-consumer-75637 console-consumer-59893 |
僅支持offset存儲在zookeeper上的:
|
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe
輸出示例:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 1 87 87 0 - - - |
老版本的ZooKeeper方式能夠刪除Group,新版本則自動刪除,當執行:
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete
輸出以下提示:
Option '[delete]' is only valid with '[zookeeper]'. Note that there's no need to delete group metadata for the new consumer as the group is deleted when the last committed offset for that group expires. |
執行zkCli.sh進入zookeeper命令行界面,假設需將group爲testgroup的topic的offset設置爲2018,則:set /consumers/testgroup/offsets/test/0 2018
若是kakfa在zookeeper中的根目錄不是「/」,而是「/kafka」,則:
set /kafka/consumers/testgroup/offsets/test/0 2018
另外,還可使用kafka自帶工具kafka-run-class.sh kafka.tools.UpdateOffsetsInZK修改,命令用法:
kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic
從用法提示能夠看出,只能修改成earliest或latest,沒有直接修改zookeeper靈活。
rdkafka_consumer_example -b 127.0.0.1:9092 -g test test
rdkafka_consumer_example -e -b 127.0.0.1:9092 -g test test
kafka-preferred-replica-election.sh --zookeeper localhost:2181/chroot
kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092
time爲-1時表示最大值,爲-2時表示最小值:
kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list 127.0.0.1:9092 --partitions 0
需consumer.properties中設置exclude.internal.topics=false:
1) 0.11.0.0以前版本
kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
2) 0.11.0.0以後版本(含)
kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
需consumer.properties中設置exclude.internal.topics=false:
1) 0.11.0.0版本以前:
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
2) 0.11.0.0版本之後(含):
kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
1) 查看Kakfa在zookeeper的根目錄
[zk: localhost:2181(CONNECTED) 0] ls /kafka [cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, config] |
2) 查看brokers
[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers [ids, topics, seqid] |
3) 查看有哪些brokers(214和215等爲server.properties中配置的broker.id值):
[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids [214, 215, 138, 139] |
4) 查看broker 214,下列數據顯示該broker沒有設置JMX_PORT:
[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/ids/214 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://test-204:9092"],"jmx_port":-1,"host":"test-204","timestamp":"1498467464861","port":9092,"version":4} cZxid = 0x200002400 ctime = Mon Jun 26 16:57:44 CST 2017 mZxid = 0x200002400 mtime = Mon Jun 26 16:57:44 CST 2017 pZxid = 0x200002400 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x45b9d9e841f0136 dataLength = 190 numChildren = 0 |
5) 查看controller,下列數據顯示broker 214爲controller:
[zk: localhost:2181(CONNECTED) 9] get /kafka/controller {"version":1,"brokerid":214,"timestamp":"1498467946988"} cZxid = 0x200002438 ctime = Mon Jun 26 17:05:46 CST 2017 mZxid = 0x200002438 mtime = Mon Jun 26 17:05:46 CST 2017 pZxid = 0x200002438 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x45b9d9e841f0136 dataLength = 56 numChildren = 0 |
6) 查看kafka集羣的id:
[zk: localhost:2181(CONNECTED) 13] get /kafka/cluster/id {"version":"1","id":"OCAEJy4qSf29bhwOfO7kNQ"} cZxid = 0x2000023e7 ctime = Mon Jun 26 16:57:28 CST 2017 mZxid = 0x2000023e7 mtime = Mon Jun 26 16:57:28 CST 2017 pZxid = 0x2000023e7 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 45 numChildren = 0 |
7) 查看有哪些topics:
[zk: localhost:2181(CONNECTED) 16] ls /kafka/brokers/topics [test, my-replicated-topic, test1, test2, test3, test123, __consumer_offsets, info] |
8) 查看topic下有哪些partitions:
[zk: localhost:2181(CONNECTED) 19] ls /kafka/brokers/topics/__consumer_offsets/partitions [44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43] |
9) 查看「partition 0」的狀態:
[zk: localhost:2181(CONNECTED) 22] get /kafka/brokers/topics/__consumer_offsets/partitions/0/state {"controller_epoch":2,"leader":215,"version":1,"leader_epoch":1,"isr":[215,214]} cZxid = 0x2000024c6 ctime = Mon Jun 26 18:02:07 CST 2017 mZxid = 0x200bc4fc3 mtime = Mon Aug 27 18:58:10 CST 2018 pZxid = 0x2000024c6 cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 80 numChildren = 0 |
可以使用kafka-reassign-partitions.sh來增長__consumer_offsets的副本數,方法以下,構造一JSON文件reassign.json:
{ "version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]}, {"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]}, {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]}, ... {"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]} ] } |
而後執行:
kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute |
「[1,2,3]」中的數字爲broker.id值。
若是執行報錯「Partitions reassignment failed due to Partition reassignment data file is empty」,多是由於reasign.json文件格式不對,好比成下列格式了(中間的沒有以逗號結尾):
{"topic":"__consumer_offsets","partition":29,"replicas":[5,3,2]}, {"topic":"__consumer_offsets","partition":30,"replicas":[1,4,3]} {"topic":"__consumer_offsets","partition":31,"replicas":[2,5,4]} {"topic":"__consumer_offsets","partition":32,"replicas":[3,2,5]} {"topic":"__consumer_offsets","partition":33,"replicas":[4,3,1]}, |
若是執行遇到下列錯誤:
Partitions reassignment failed due to Partition replica lists may not contain duplicate entries: __consumer_offsets-16 contains multiple entries for 2. __consumer_offsets-39 contains multiple entries for 2. __consumer_offsets-40 contains multiple entries for 3. __consumer_offsets-44 contains multiple entries for 3 |
緣由是一個分區的兩個副本被指定在同一個broker上,以16號分區爲列,有兩個副本落在了broker 2上:
{"topic":"__consumer_offsets","partition":16,"replicas":[2,5,2]}, |
執行成功後的輸出:
$ ../bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.35.31:2181/kafka --reassignment-json-file __consumer_offsets.reassign --execute Current partition replica assignment
{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":22,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":30,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":8,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":21,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":4,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":27,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":9,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":46,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":25,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":35,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":41,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":33,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":23,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":49,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":47,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":16,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":28,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":31,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":36,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":42,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":18,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":37,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":15,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":24,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":38,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":17,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":48,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":19,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":11,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":13,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":43,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":6,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":14,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":20,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":0,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":44,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":39,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":12,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":45,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":5,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":26,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":29,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":34,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":10,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":32,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":40,"replicas":[5],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. |
1) -190,Local: Unknown partition
好比單機版只有一個分區,但prodcue參數的分區值爲1等。
2) Rdkafka程序日誌「delivery failed. errMsg:[Local: Message timed out]」
同一個程序,在有些機器上會這個錯誤,有些機器則工做正常,相關的issues:
https://github.com/edenhill/librdkafka/issues/474
實測是由於在運行Kafka應用程序的機器上沒有配置Kafka Brokers機器的hosts。
另外的解決辦法是在server.properties配置listeners和advertised.listeners,而且使用IP而不是hostname做爲值。
3) Name or service not known (after 9020595078ms in state INIT)
event_callback: type(0), severity(3), (-193)kafka-204:9092/214: Failed to resolve 'kafka-204:9092': Name or service not known (after 9020595078ms in state INIT)
緣由是運行kafka應用程序(非kafka自己)的機器不能識別主機名kafka-204(Kafka Brokers機器能夠識別),
解決辦法是在server.properties配置listeners和advertised.listeners,而且使用IP而不是hostname做爲值。
process_monitor.sh爲shell腳本,自己含詳細的使用說明和幫助提示。適合放在crontab中,檢測到進程不在時,3秒左右時間重拉起。支持不一樣用戶運行相同程序,也支持同一用戶帶不一樣參數運行相同程序。
下載網址:
https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh
使用示例:
* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties" |
因爲全部的java程序均運行在JVM中,因此程序名均爲java,「kafkaServer」用於限定只監控kafka。若是同一用戶運行多個kafka實例,則需加端口號區分,而且要求端口號爲命令行參數,和「kafkaServer」共同組成匹配模式。
當檢測到進程不存在時,則執行第三列的重啓指令「/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties」。
使用示例2,監控zooekeeper:
* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java -Dzookeeper" "/data/zookeeper/bin/zkServer.sh start" |
適用用來批量安裝kafka和平常運維。
下載網址:
https://github.com/eyjian/libmooon/releases
監控工具備兩個版本:一是C++版本,另外一是GO版本。當前C++版本比較成熟,GO版本至關簡略,但C++版本依賴C++運行時庫,不一樣環境須要特定編譯,而GO版本可不依賴C和C++運行時庫,因此不需編譯便可應用到普遍的Linux環境。
使用簡單,直接執行命令,即會提示用法。
參數名 |
默認值 |
說明 |
-u |
無 |
用戶名參數,可用環境變量U替代 |
-p |
無 |
密碼參數,可用環境變量P替代 |
-h |
無 |
IP列表參數,可用環境變量H替代 |
-P |
22,可修改源碼,編譯爲經常使用端口號 |
SSH端口參數,可用環境變量PORT替代 |
-c |
無 |
在遠程機器上執行的命令,建議單引號方式指定值,除非要執行的命令自己已經包含了單引號有衝突。使用雙引號時,要注意轉義,不然會被本地shell解釋 |
-v |
1 |
工具輸出的詳細度 |
參數名 |
默認值 |
說明 |
-u |
無 |
用戶名參數,可用環境變量U替代 |
-p |
無 |
密碼參數,可用環境變量P替代 |
-h |
無 |
IP列表參數,可用環境變量H替代 |
-P |
22,可修改源碼,編譯爲經常使用端口號 |
SSH端口參數,可用環境變量PORT替代 |
-s |
無 |
以逗號分隔的,須要上傳的本地文件列表,能夠帶相對或絕對目錄 |
-d |
無 |
文件上傳到遠程機器的目錄,只能爲單個目錄 |
1) 使用示例1:上傳/etc/hosts
mooon_upload -s=/etc/hosts -d=/etc |
2) 使用示例2:檢查/etc/profile文件是否一致
mooon_ssh -c='md5sum /etc/hosts' |
3) 使用示例3:批量查看crontab
mooon_ssh -c='crontab -l' |
4) 使用示例4:批量清空crontab
mooon_ssh -c='rm -f /tmp/crontab.empty;touch /tmp/crontab.empty' mooon_ssh -c='crontab /tmp/crontab.emtpy' |
5) 使用示例5:批量更新crontab
mooon_ssh -c='crontab /tmp/crontab.online' |
6) 使用示例6:取遠端機器IP
由於awk用單引號,因此參數「-c」的值不能使用單引號,因此內容須要轉義,相對其它來講要複雜點:
mooon_ssh -c="netstat -ie | awk -F[\\ :]+ 'BEGIN{ok=0;}{if (match(\$0, \"eth1\")) ok=1; if ((1==ok) && match(\$0,\"inet\")) { ok=0; if (7==NF) printf(\"%s\\n\",\$3); else printf(\"%s\\n\",\$4);} }'" |
不一樣的環境,IP在「netstat -ie」輸出中的位置稍有不一樣,因此awk中加了「7==NF」判斷,但仍不必定適用於全部的環境。須要轉義的字符包含:雙引號、美圓符和斜槓。
7) 使用示例7:批量查看kafka進程(環境變量方式)
$ export H=192.168.31.9,192.168.31.10,192.168.31.11,192.168.31.12,192.168.31.13 $ export U=kafka $ export P='123456'
$ mooon_ssh -c='/usr/local/jdk/bin/jps -m' [192.168.31.15] 50928 Kafka /data/kafka/config/server.properties 125735 Jps -m [192.168.31.15] SUCCESS
[192.168.31.16] 147842 Jps -m 174902 Kafka /data/kafka/config/server.properties [192.168.31.16] SUCCESS
[192.168.31.17] 51409 Kafka /data/kafka/config/server.properties 178771 Jps -m [192.168.31.17] SUCCESS
[192.168.31.18] 73568 Jps -m 62314 Kafka /data/kafka/config/server.properties [192.168.31.18] SUCCESS
[192.168.31.19] 123908 Jps -m 182845 Kafka /data/kafka/config/server.properties [192.168.31.19] SUCCESS
================================ [192.168.31.15 SUCCESS] 0 seconds [192.168.31.16 SUCCESS] 0 seconds [192.168.31.17 SUCCESS] 0 seconds [192.168.31.18 SUCCESS] 0 seconds [192.168.31.19 SUCCESS] 0 seconds SUCCESS: 5, FAILURE: 0 |
8) 使用示例8:批量中止kafka進程(參數方式)
$ mooon_ssh -c='/data/kafka/bin/kafka-server-stop.sh' -u=kafka -p='123456' -h=192.168.31.15,192.168.31.16,192.168.31.17,192.168.31.18,192.168.31.19 [192.168.31.15] No kafka server to stop command return 1
[192.168.31.16] No kafka server to stop command return 1
[192.168.31.17] No kafka server to stop command return 1
[192.168.31.18] No kafka server to stop command return 1
[192.168.31.19] No kafka server to stop command return 1
================================ [192.168.31.15 FAILURE] 0 seconds [192.168.31.16 FAILURE] 0 seconds [192.168.31.17 FAILURE] 0 seconds [192.168.31.18 FAILURE] 0 seconds [192.168.31.19 FAILURE] 0 seconds SUCCESS: 0, FAILURE: 5 |
爲shell腳本,有詳細的使用說明和幫助提示,依賴mooon_ssh和mooon_upload:
https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh
爲shell腳本,有詳細的使用說明和幫助提示,依賴mooon_ssh和mooon_upload:
https://github.com/eyjian/libmooon/blob/master/shell/set_hostname.sh
官網:https://github.com/yahoo/kafka-manager
kafka-manager的數據主要來源兩個方便:一是kafka的zookeeper數據,二是kafka的JMX數據。
kafka-manager要求JDK版本不低於1.8,從源碼編譯kafka-manager相對複雜,但編譯拿到二進制包後,只需修改application.conf中的「kafka-manager.zkhosts」值,便可開始啓動kafka-manager。「kafka-manager.zkhosts」值,不是kafka的zookeeper配置值,而是kafka-manager本身用的zookeeper配置,因此二者能夠爲不一樣的zookeeper,注意值用雙引號引發來。
crontab啓動示例:
JMX_PORT=9999 * * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties" |
指定JMX_PORT不是必須的,但建議設置,這樣kafka-manager能夠更詳細的查看brokers。
crontab中啓動kafka-manager示例(指定服務端口爲8080,不指定的默認值爲9000):
* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafka-manager" "/data/kafka/kafka-manager/bin/kafka-manager -Dconfig.file=/data/kafka/kafka-manager/conf/application.conf -Dhttp.port=8080 > /dev/null 2>&1" |
process_monitor.sh下載:
https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh
注意crontab的用戶密碼有效,crontab才能正常執行。
最基本的兩個配置項爲server.properties文件中的:
1) Broker.id
2) zookeeper.connect
其中broker.id每一個節點要求不一樣,zookeeper.connect值建議指定目錄,不要直接放在zookeeper根目錄下。另外也建議設置listeners值,否則須要客戶端配置hostname和IP的映射關係。
因broker.id和listeners的緣由,每一個節點的server.properties不一致,可利用工具set_kafka_id_and_ip.sh實現批量的替換,以簡化kafka集羣的部署。set_kafka_id_and_ip.sh下載地址:https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh。
crontab中啓動kafka示例:
JMX_PORT=9999 * * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties" |
設置JMX_PORT是爲方便kafka-manager管理kafka。
__consumer_offsets是kafka內置的Topic,在0.9.0.0以後的Kafka,將topic的offset 信息由以前存儲在zookeeper上改成存儲到內置的__consumer_offsets中。
server.properties中的配置項num.partitions和default.replication.factor對__consumer_offsets無效,而是受offsets.topic.num.partitions和offsets.topic.replication.factor兩個控制。