當前基於kafaka最新版 kafka_2.12-2.2.1.tgz 進行配置 。php
官網地址:http://kafka.apache.org/introhtml
kafka的一些基礎知識 參考:http://www.hechunbo.com/index.php/archives/140.htmljava
最新版 kafka_2.12-2.2.1.tgz 進行配置 。單機生產者消費者圖解配配置,多機模擬配置。以及文件讀寫配置,經驗掌握,集成zookeeper不用再安裝apache
配置java環境安裝jdkbootstrap
解壓kafakaapp
[root@localhost hcb]# tar -zxvf kafka_2.12-2.2.1.tgz -C /usr/local
啓動zookeeper .由於最新版 已經包含有zookeeper 因此不用另外安裝了測試
[root@localhost kafka_2.12-2.2.1]# bin/zookeeper-server-start.sh config/zookeeper.properties [2019-06-22 17:47:49,667] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
從新開一個鏈接 。輸入jps 發現多了一個進程code
[root@localhost ~]# jps 3136 Jps 2842 QuorumPeerMain
啓動kafkaserver
[root@localhost kafka_2.12-2.2.1]# ./bin/kafka-server-start.sh config/server.properties [2019-06-22 17:51:18,786] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2019-06-22 17:51:20,624] INFO starting (kafka.server.KafkaServer)
再開一個鏈接 輸入jps查看當前運行的進程
發現多了一個kafka
[root@localhost ~]# jps 3504 Jps 2842 QuorumPeerMain 3147 Kafka
建立一個topic
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test [root@localhost kafka_2.12-2.2.1]#
查看topic消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 test
發送消息 到test
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test hi,>welcome to to kafka >hi ,how are you
消費者取消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning hi,welcome to to kafka hi ,how are you
生產者發送消息之後,消費者有通知 ,
進行多臺機子測試
由於咱們是單臺機子,因此把配置文件複製兩份,更改端口和id配置進行第二臺,第三臺的模擬
[root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/ [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-1.properties [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-2.properties
修改第二臺機子的配置
vi config/server-1.properties log.dirs=/tmp/kafka-logs-1 listeners=PLAINTEXT://:9093 broker.id=1
修改第三臺機子
vi config/server-2.properties log.dirs=/tmp/kafka-logs-2 listeners=PLAINTEXT://:9094 broker.id=2
啓動新模擬的兩臺服務器
[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-1.properties [2019-06-22 18:23:56,237] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新開鏈接 繼續啓動第三臺,順便查看下當前的進程 。發現有兩個kafka存在了
[root@localhost ~]# jps 4370 ConsoleProducer 2842 QuorumPeerMain 5642 Jps 3147 Kafka 4955 ConsoleConsumer 5278 Kafka [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/ ^C[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-2.properties [2019-06-22 18:27:31,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
新開一個鏈接 ,查看下當前進程 ,三個kafka正常啓動了
[root@localhost ~]# jps 4370 ConsoleProducer 6307 Jps 2842 QuorumPeerMain 3147 Kafka 4955 ConsoleConsumer 5948 Kafka 5278 Kafka
建立一個帶有備份的topic
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replication-topic
查看哪一個borke【kafka服務器】在工做
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:哪一個broker在讀寫
replicas:當前能夠正常工做的kafka集羣。當leader掛掉時會自動替補
isr:同步消息的列表集合
查看咱們以前建立的topic消息
當時咱們只有一個kafka服務器。能夠看只leader是0,替被和備份的都是0,
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
在新的topic中發佈新的消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replication-topic >message one >message two
消費者去獲取消息
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replication-topic message one message two
檢查當前的leader
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
模擬leader1掛掉之後的狀態
把leader1關掉
檢查leader1的進程
ps aux 顯示用戶當前的全部進程 。並根據grep後面的內容進行搜索
用kill殺死相關進程
[root@localhost kafka_2.12-2.2.1]# ps aux | grep server-1.properties root 5278 3.5 20.5 3232460 205560 pts/5 Sl+ 18:23 1:06 /usr/local/jdk1.8.0_211/bin/java -Xmx1G [root@localhost kafka_2.12-2.2.1]# kill -9 5278
再次檢查當前topic的消息
發現leader已經從1變成了2.
[root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic Topic:my-replication-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824 Topic: my-replication-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
使用kafka connect 導入導出數據
souce connector 從text.txt讀取文件 ,把內容發送到connect-test., sink connector 從conect-test讀寫消息
[root@localhost kafka_2.12-2.2.1]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties [2019-06-22 19:05:55,493] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
進行jps分發現多了一個ConnectStandalone的進程
[root@localhost ~]# jps 4370 ConsoleProducer 9478 Jps 9160 ConnectStandalone 2842 QuorumPeerMain 3147 Kafka 4955 ConsoleConsumer 5948 Kafka
顯示文件內容
more 命令相似 cat ,不過會以一頁一頁的形式顯示,更方便使用者逐頁閱讀,
[root@localhost kafka_2.12-2.2.1]# more test.sink.txt foo bar
使用消費者控制 臺顯示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}
繼續測試
生產者進行消息追加
[root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3" > test.txt [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3\new append" > test.txt
消費者進行實時顯示
[root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"dddd"} {"schema":{"type":"string","optional":false},"payload":"aaaaaaad"} {"schema":{"type":"string","optional":false},"payload":"dd"} ^[[A^[[A^[[B{"schema":{"type":"string","optional":false},"payload":"1\\2\\2\\3"} {"schema":{"type":"string","optional":false},"payload":"ew append"}