springboot之kafka安裝與實踐

環境:騰訊雲centos7java

一、下載git

http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz

二、解壓github

tar -xvf kafka_2.11-2.3.0.tgz
mv kafka_2.11-2.3.0 /usr/java/kafka2.11
cd /usr/java/kafka2.11

三、啓動與測試web

(a)zookeeper啓動
        bin/zookeeper-server-start.sh config/zookeeper.properties
    (b)kafka服務端啓動
        bin/kafka-server-start.sh config/server.properties 
    (c)列出topic
        bin/kafka-topics.sh --zookeeper localhost:2181 --list
    (d)建立topic
        bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1
    (e)描述Topic
        bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1
    (f)發佈消息到指定的Topic
        bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1
    (g)消費指定Topic上的消息
        (已過期,老版本使用,不然報zookeeper is not a recognized option)
        bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1
        
        bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Demo1 --from-beginning

四、安裝kafka web界面spring

  a)下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jarapache

  b) 運行bootstrap

  mkdir /mydata/kafkamonitorlogs

    java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.1.jar \
     com.quantifind.kafka.offsetapp.OffsetGetterWeb \
     --zk 132.232.44.82:2181 \
     --port 8787 \
     --refresh 10.seconds \
     --retain 7.days 1>/mydata/kafkamonitorlogs/stdout.log 2>/mydata/kafkamonitorlogs/stderr.log &

  c) web訪問  centos

http://ip:8787

本人虛擬機內存過小了,因此沒法查看到消息列表,可是web界面確實能夠用!springboot

完畢!app

 ########springboot集成實踐###########

一、pom.xml添加依賴

   <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
   </dependency>

二、yml文件添加配置

spring:
  profiles:
    active: @activatedProperties@
  kafka:
    bootstrap-servers: 132.232.44.82:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

三、在Kafka的config/server.properties文件中添加

advertised.listeners=PLAINTEXT://132.232.44.89:9092

四、KafkaConsumer.java

package com.cn.commodity.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消費者測試
 */
@Component
public class KafkaConsumer {
    @KafkaListener(topics = "test_topic1")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {
        System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
    }
}

五、KafkaProducer.java

package com.cn.commodity.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 測試kafka生產者
 */
@RestController
@RequestMapping("kafka")
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("send")
    public String send(String msg){
        kafkaTemplate.send("test_topic1", msg);
        return "success";
    }

}

啓動運行,完畢!

相關文章
相關標籤/搜索