Kafka 最新版配置

當前基於kafaka最新版 kafka_2.12-2.2.1.tgz 進行配置 。php

官網地址:http://kafka.apache.org/introhtml

kafka的一些基礎知識 參考:http://www.hechunbo.com/index.php/archives/140.htmljava

最新版 kafka_2.12-2.2.1.tgz 進行配置 。單機生產者消費者圖解配配置,多機模擬配置。以及文件讀寫配置,經驗掌握,集成zookeeper不用再安裝apache

  1. 配置java環境安裝jdkbootstrap

    參考http://www.hechunbo.com/index.php/archives/132.html服務器

  2. 解壓kafakaapp

    [root@localhost hcb]# tar -zxvf kafka_2.12-2.2.1.tgz -C /usr/local
  3. 啓動zookeeper .由於最新版 已經包含有zookeeper 因此不用另外安裝了測試

    [root@localhost kafka_2.12-2.2.1]# bin/zookeeper-server-start.sh config/zookeeper.properties 
    [2019-06-22 17:47:49,667] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  4. 從新開一個鏈接 。輸入jps 發現多了一個進程code

    [root@localhost ~]# jps
    3136 Jps
    2842 QuorumPeerMain
  5. 啓動kafkaserver

    [root@localhost kafka_2.12-2.2.1]# ./bin/kafka-server-start.sh config/server.properties 
    [2019-06-22 17:51:18,786] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
    [2019-06-22 17:51:20,624] INFO starting (kafka.server.KafkaServer)
  6. 再開一個鏈接 輸入jps查看當前運行的進程
    發現多了一個kafka

    [root@localhost ~]# jps
    3504 Jps
    2842 QuorumPeerMain
    3147 Kafka
  7. 建立一個topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    [root@localhost kafka_2.12-2.2.1]#
  8. 查看topic消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
  9. 發送消息 到test

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    hi,>welcome to to kafka    
    >hi ,how are you
  10. 消費者取消息

    [root@localhost kafka_2.12-2.2.1]#  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    hi,welcome to to kafka
    hi ,how are you

    生產者發送消息之後,消費者有通知 ,

    1561197972428

  11. 進行多臺機子測試
    由於咱們是單臺機子,因此把配置文件複製兩份,更改端口和id配置進行第二臺,第三臺的模擬

    1. [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-1.properties
      [root@localhost kafka_2.12-2.2.1]# cp config/server.properties config/server-2.properties

      修改第二臺機子的配置

      vi config/server-1.properties
      log.dirs=/tmp/kafka-logs-1
      listeners=PLAINTEXT://:9093
      broker.id=1

    1561198817482

    修改第三臺機子

    vi config/server-2.properties
    log.dirs=/tmp/kafka-logs-2
    listeners=PLAINTEXT://:9094
    broker.id=2

    1561198876385

  12. 啓動新模擬的兩臺服務器

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-1.properties 
    [2019-06-22 18:23:56,237] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

    新開鏈接 繼續啓動第三臺,順便查看下當前的進程 。發現有兩個kafka存在了

    [root@localhost ~]# jps
    4370 ConsoleProducer
    2842 QuorumPeerMain
    5642 Jps
    3147 Kafka
    4955 ConsoleConsumer
    5278 Kafka
    [root@localhost ~]# cd /usr/local/kafka_2.12-2.2.1/
    ^C[root@localhost kafka_2.12-2.2.1]# bin/kafka-server-start.sh config/server-2.properties 
    [2019-06-22 18:27:31,947] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

    新開一個鏈接 ,查看下當前進程 ,三個kafka正常啓動了

    [root@localhost ~]# jps
    4370 ConsoleProducer
    6307 Jps
    2842 QuorumPeerMain
    3147 Kafka
    4955 ConsoleConsumer
    5948 Kafka
    5278 Kafka
  13. 建立一個帶有備份的topic

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replication-topic
  14. 查看哪一個borke【kafka服務器】在工做

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

    leader:哪一個broker在讀寫

    replicas:當前能夠正常工做的kafka集羣。當leader掛掉時會自動替補

    isr:同步消息的列表集合

  15. 查看咱們以前建立的topic消息

    當時咱們只有一個kafka服務器。能夠看只leader是0,替被和備份的都是0,

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
    Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:segment.bytes=1073741824
        Topic: test Partition: 0    Leader: 0   Replicas: 0 Isr: 0
  16. 在新的topic中發佈新的消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replication-topic
    >message one
    >message two
  17. 消費者去獲取消息

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --from-beginning --topic my-replication-topic
    message one
    message two
  18. 檢查當前的leader

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0
  19. 模擬leader1掛掉之後的狀態

    把leader1關掉

    檢查leader1的進程

    ps aux 顯示用戶當前的全部進程 。並根據grep後面的內容進行搜索

    用kill殺死相關進程

    [root@localhost kafka_2.12-2.2.1]# ps aux | grep server-1.properties
    root       5278  3.5 20.5 3232460 205560 pts/5  Sl+  18:23   1:06 /usr/local/jdk1.8.0_211/bin/java -Xmx1G
    [root@localhost kafka_2.12-2.2.1]# kill -9 5278
  20. 再次檢查當前topic的消息

    發現leader已經從1變成了2.

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replication-topic
    Topic:my-replication-topic  PartitionCount:1    ReplicationFactor:3 Configs:segment.bytes=1073741824
        Topic: my-replication-topic Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0
  21. 使用kafka connect 導入導出數據

    souce connector 從text.txt讀取文件 ,把內容發送到connect-test., sink connector 從conect-test讀寫消息

    [root@localhost kafka_2.12-2.2.1]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties  config/connect-file-sink.properties 
    [2019-06-22 19:05:55,493] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)

    進行jps分發現多了一個ConnectStandalone的進程

    [root@localhost ~]# jps
    4370 ConsoleProducer
    9478 Jps
    9160 ConnectStandalone
    2842 QuorumPeerMain
    3147 Kafka
    4955 ConsoleConsumer
    5948 Kafka

    顯示文件內容

    more 命令相似 cat ,不過會以一頁一頁的形式顯示,更方便使用者逐頁閱讀,

    [root@localhost kafka_2.12-2.2.1]# more test.sink.txt 
    foo
    bar

    使用消費者控制 臺顯示

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}

    繼續測試

    生產者進行消息追加

    [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3" > test.txt
    [root@localhost kafka_2.12-2.2.1]# echo -e "foo\nbarddddaaa\aaaaa\dddd\1\2\2\3\new append" > test.txt

    消費者進行實時顯示

    [root@localhost kafka_2.12-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
    {"schema":{"type":"string","optional":false},"payload":"foo"}
    {"schema":{"type":"string","optional":false},"payload":"bar"}
    {"schema":{"type":"string","optional":false},"payload":"dddd"}
    {"schema":{"type":"string","optional":false},"payload":"aaaaaaad"}
    {"schema":{"type":"string","optional":false},"payload":"dd"}
    ^[[A^[[A^[[B{"schema":{"type":"string","optional":false},"payload":"1\\2\\2\\3"}
    {"schema":{"type":"string","optional":false},"payload":"ew append"}
相關文章
相關標籤/搜索