Springboot集成Kafka

 Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,有以下特性: 經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。支持經過Kafka服務器和消費機集羣來分區消息。支持Hadoop並行數據加載。java

Springboot的基本搭建和配置我在以前的文章已經給出代碼示例了,若是還不瞭解的話能夠先按照 SpringMVC配置太多?試試SpringBoot 進行學習哦。 那麼現在很火的Springboot與kafka怎麼完美的結合呢?多說無宜,放碼過來 (talk is cheap,show me your code)!git

安裝Kafka

由於安裝kafka須要zookeeper的支持,因此Windows安裝時須要將zookeeper先安裝上,而後將kafka安裝好就能夠了。 下面我給出Mac安裝的步驟以及須要注意的點吧,windows的配置除了所在位置不太同樣其餘幾乎沒什麼不一樣。github

 brew install kafka web

對,就是這麼簡單,mac上一個命令就能夠搞定了,這個安裝過程可能須要等一下子,應該是和網絡情況有關係。安裝提示信息可能有錯誤消息,如"Error: Could not link: /usr/local/share/doc/homebrew" 這個不要緊,自動忽略掉了。 最終咱們看到下面的樣子就成功咯。spring

 ==> Summary 🍺/usr/local/Cellar/kafka/1.1.0: 157 files, 47.8MB apache

安裝的配置文件位置以下,根據本身的須要修改端口號什麼的就能夠了。bootstrap

 安裝的zoopeeper和kafka的位置 /usr/local/Cellar/ windows

配置文件 /usr/local/etc/kafka/server.properties /usr/local/etc/kafka/zookeeper.properties 瀏覽器

啓動zookeeper緩存

  ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & 

啓動kafka 

 ./bin/kafka-server-start /usr/local/etc/kafka/server.properties & 

爲kafka建立Topic,topic 名爲test,能夠配置成本身想要的名字,回頭再代碼中配置正確就能夠了。

 ./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 

代碼示例

pom.xml

<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
		<relativePath/>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.google.code.gson</groupId>
			<artifactId>gson</artifactId>
			<version>2.8.2</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>RELEASE</version>
		</dependency>

	</dependencies>

application.yml

server:
  servlet:
    context-path: /
  port: 8080
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    #生產者的配置,大部分咱們可使用默認的,這裏列出幾個比較重要的屬性
    producer:
      #每批次發送消息的數量
      batch-size: 16
      #設置大於0的值將使客戶端從新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什麼不一樣。容許重試將潛在的改變數據的順序,若是這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
      retries: 0
      #producer能夠用來緩存數據的內存大小。若是數據產生速度大於向broker發送的速度,producer會阻塞或者拋出異常,以「block.on.buffer.full」來代表。這項設置將和producer可以使用的總內存相關,但並非一個硬性的限制,由於不是producer使用的全部內存都是用於緩存。一些額外的內存會用於壓縮(若是引入壓縮機制),一樣還有一些用於維護請求。
      buffer-memory: 33554432
      #key序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #消費者的配置
    consumer:
      #Kafka中沒有初始偏移或若是當前偏移在服務器上再也不存在時,默認區最新 ,有三個選項 【latest, earliest, none】
      auto-offset-reset: latest
      #是否開啓自動提交
      enable-auto-commit: true
      #自動提交的時間間隔
      auto-commit-interval: 100
      #key的解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #value的解碼方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #在/usr/local/etc/kafka/consumer.properties中有配置
      group-id: test-consumer-group

Producer 消息生產者

@Component
public class Producer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    private static Gson gson = new GsonBuilder().create();

    //發送消息方法
    public void send() {
        Message message = new Message();
        message.setId("KFK_"+System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        kafkaTemplate.send("test", gson.toJson(message));
    }

}
public class Message {

    private String id;

    private String msg;

    private Date sendTime;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public Date getSendTime() {
        return sendTime;
    }

    public void setSendTime(Date sendTime) {
        this.sendTime = sendTime;
    }
}

Consumer 消息消費者

public class Consumer {

    @KafkaListener(topics = {"test"})
    public void listen(ConsumerRecord<?, ?> record){

        Optional<?> kafkaMessage = Optional.ofNullable(record.value());

        if (kafkaMessage.isPresent()) {

            Object message = kafkaMessage.get();
            System.out.println("---->"+record);
            System.out.println("---->"+message);

        }

    }
}

測試接口用例

這裏咱們用一個接口來測試咱們的消息發送會不會被消費者接收。

@RestController
@RequestMapping("/kafka")
public class SendController {

    @Autowired
    private Producer producer;

    @RequestMapping(value = "/send")
    public String send() {
        producer.send();
        return "{\"code\":0}";
    }
}

在Springboot啓動類啓動後在瀏覽器訪問http://127.0.0.1:8080/kafka/send,咱們能夠再IDE控制檯中看到輸出的結果,這時候咱們的整合基本上就完成啦。 具體代碼能夠在SpringBootKafkaDemo@github獲取哦。

相關文章
相關標籤/搜索