Kafka及Spring Cloud Stream

安裝

下載kafka http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz java

kafka最爲重要三個配置依次爲:broker.id、log.dir、zookeeper.connectgit

在kafka server端 config/server.properties中設置web

 

必需要配置:spring

advertised.listeners=PLAINTEXT://192.168.3.201:9092    # 公佈訪問地址和端口 apache

 

啓動kafka bootstrap

 bin/kafka-server-start.sh ../config/server.properties api

檢測是否啓動 app

netstat -tunlp | egrep " (2181|9092)"tcp

或 lsof -i:9092post

測試發送信息和消費消息

建立主題

./kafka-topics.sh --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 - topic test

生產者 

 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test 

消費者 

./kafkaconsole-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

若是想在外部使用kafka必須 9092 端口加入到防火牆列表

firewall-cmd --list-ports 查詢全部放行端口
firewall-cmd --add-port=9092/tcp # 臨時端口放行
firewall-cmd --add-port=9092/tcp --permanent # 永久放行
firewall-cmd --reload # 從新載入放行列表

 

簡單API的應用 

引入依賴

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

編寫生成者

package com.example.springkafka.api;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @Date: 2018/11/6 20:25
 * @Description: 生產者
 */
public class KafkaProducerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.3.221:9092");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        String topic = "message"; // 主題
        Integer partition = 0; // 指定分區
        long timeMillis = System.currentTimeMillis(); // 毫秒值 15分鐘
        String key = "key-message"; // key
        String value = "value-message"; // value
        // 建立ProducerRecord
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, partition, timeMillis, key, value);
        // 生產消息
        kafkaProducer.send(producerRecord);
        kafkaProducer.close();
    }
}

 

編寫消費者

package com.example.springkafka.api;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.Properties;

/**
 * @Date: 2018/11/6 20:25
 * @Description: 消費者
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.3.221:9092");
        properties.setProperty("group.id", "group-1");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        // 建立kafka的消費者對象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        // 訂閱kafka主題
        kafkaConsumer.subscribe(Arrays.asList("message"));
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("========offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
        }
    }
}

 spring kafka

依賴

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

生成者與消費者配置

# 生成者配置
spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 192.168.3.221:9092
    consumer: # 消費者
      group-id: gerry-1
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafka:
  topic: gerry

生成者代碼

package com.example.springcloudkafka.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Date: 2018/11/6 21:03
 * @Description:
 */
@RestController
public class KafkaProducerController {
    public final KafkaTemplate<String, String> kafkaTemplate;
    private final String topic;

    public KafkaProducerController(KafkaTemplate<String, String> kafkaTemplate,
                                   @Value("${kafka.topic}") String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }

    @PostMapping("message/send") // 這種方式只支持post
    public boolean sendMessage(@RequestParam String message) {
        kafkaTemplate.send(topic,message);

        return true;
    }
}

消費者代碼

package com.example.springcloudkafka.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @Date: 2018/11/6 21:20
 * @Description:
 */
@Component
public class KafkaConsumerListener {

    @KafkaListener(topics={"${kafka.topic}"})
    public void getMessage(String message) {
        System.out.println("kafka 消費者監聽,接收到消息:" + message);
    }
}

Spring Cloud Stream 

官方定義三個接口
Source=> 發送者 Producer、Publisher
Sink=> 接收器 Consumer、 Subscriber Processor: 上流而言Sink、下流而言Souce

Spring Cloud Stream Binder: Kafka 

引入依賴:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

配置:

# 生成者配置
spring:
  kafka:
    bootstrap-servers: 192.168.3.221:9092
  cloud:
    stream:
      bindings:
        output:
          destination: ${kafka.topic}
        input:
          destination: ${kafka.topic}
kafka:
  topic: cloud-stream

 

生產者:

package com.example.springcloudstreamkafkademo.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
public class MessageProducerBean {
    @Autowired
    @Qualifier(Source.OUTPUT)
    private MessageChannel messageChannel;

    @Autowired
    private Source source;

    /**
     * 發送信息
     * @param message
     */
    public void send(String message) {
        // 經過消息管道發送消息
        // messageChannel.send(MessageBuilder.withPayload(message).build());
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}

消費者

package com.example.springcloudstreamkafkademo.consumer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@EnableBinding(value={Sink.class})
public class MessageConsumerBean {
    @Autowired
    @Qualifier(Sink.INPUT)
    private SubscribableChannel subscribableChannel;

    //一、 當subscribableChannel注入完成後完成回調
    @PostConstruct
    public void init() {
        subscribableChannel.subscribe(message->{
            System.out.println(message.getPayload());
        });
    }
    // 二、@ServiceActivator
    @ServiceActivator(inputChannel=Sink.INPUT)
    public void message(String message) {
        System.out.println("@ServiceActivator:"+message);
    }
    //三、@StreamListener
    @StreamListener(Sink.INPUT)
    public void onMessage(String message) {
        System.out.println("@StreamListener:"+message);
    }
}

 

Spring Cloud Stream Binder: RabbitMQ 

 引入依賴

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: ${rabbit.queue}
        input:
          destination: ${rabbit.queue}
  rabbitmq:
    host: 192.168.3.221
    port: 5672
    username: rabbit
    password: rabbit
rabbit:
  queue: cloud-stream-queue

代碼同kafka

 

完整代碼詳見:https://gitee.com/lm970585581/cloud-config/tree/master/Spring%20Cloud%20Stream%20

相關文章
相關標籤/搜索