Spring Boot2.0 整合 Kafka

Kafka 概述html

Apache Kafka 是一個分佈式流處理平臺,用於構建實時的數據管道和流式的應用.它可讓你發佈和訂閱流式的記錄,能夠儲存流式的記錄,而且有較好的容錯性,能夠在流式記錄產生時就進行處理。java

Apache Kafka是分佈式發佈-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分佈式發佈-訂閱消息傳遞系統。 git

Kafka 特性github

  1. 高吞吐量、低延遲:kafka每秒能夠處理幾十萬條消息,它的延遲最低只有幾毫秒,每一個topic能夠分多個partition, consumer group 對partition進行consume操做;
  2. 可擴展性:kafka集羣支持熱擴展;
  3. 持久性、可靠性:消息被持久化到本地磁盤,而且支持數據備份防止數據丟失;
  4. 容錯性:容許集羣中節點失敗(若副本數量爲n,則容許n-1個節點失敗);
  5. 高併發:支持數千個客戶端同時讀寫;
  6. 支持實時在線處理和離線處理:可使用Storm這種實時流處理系統對消息進行實時進行處理,同時還可使用Hadoop這種批處理系統進行離線處理;

Kafka 使用場景web

  1. 日誌收集:一個公司能夠用Kafka能夠收集各類服務的log,經過kafka以統一接口服務的方式開放給各類consumer,例如Hadoop、Hbase、Solr等;
  2. 消息系統:解耦和生產者和消費者、緩存消息等;
  3. 用戶活動跟蹤:Kafka常常被用來記錄web用戶或者app用戶的各類活動,如瀏覽網頁、搜索、點擊等活動,這些活動信息被各個服務器發佈到kafka的topic中,而後訂閱者經過訂閱這些topic來作實時的監控分析,或者裝載到Hadoop、數據倉庫中作離線分析和挖掘;
  4. 運營指標:Kafka也常常用來記錄運營監控數據。包括收集各類分佈式應用的數據,生產各類操做的集中反饋,好比報警和報告;
  5. 流式處理:好比spark streaming和storm;
  6. 事件源;

Spring Boot2.0 + Kafkaspring

1,安裝配置Kafka ,Zookeeperapache

安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quick...json

使用命令啓動Kafka: bin/kafka-server-start.sh config/server.properties bootstrap

下面給出個人環境:緩存

Centos 7.5,  Kafka 2.11, Zookeeper-3.4.13,  JDK1.8+

2,建立 Spring Boot 項目

注意版本:該項目使用Spring Boot 2.0 +,低版本不保證正確

  1. pom.xml引用

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
  2. 定義消息生產者
    直接使用 KafkaTemplate 發送消息 ,Spring Boot自動裝配,不須要本身定義一個Kafka配置類,吐槽一下網站的文章,全都是互相抄,全都寫一個 ProduceConfig Consumerconfig 類, Kafka 的參數配置 硬編碼在代碼中,簡直沒法直視。。
    定義一個泛型類 KafkaSender<T> T 就是你須要發送的消息 對象,序列化使用阿里的 fastjson
/**
        * 消息生產者
        *
        * @author Jarvis
        * @date 2018/8/3
        */
       @Component
       public class KafkaSender<T> {
       
           private Logger logger = LoggerFactory.getLogger(KafkaSender.class);
       
           @Autowired
           private KafkaTemplate<String, Object> kafkaTemplate;
       
           /**
            * kafka 發送消息
            *
            * @param obj 消息對象
            */
           public void send(T obj) {
               String jsonObj = JSON.toJSONString(obj);
               logger.info("------------ message = {}", jsonObj);
       
               //發送消息
               ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj);
               future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                   @Override
                   public void onFailure(Throwable throwable) {
                       logger.info("Produce: The message failed to be sent:" + throwable.getMessage());
                   }
       
                   @Override
                   public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                       //TODO 業務處理
                       logger.info("Produce: The message was sent successfully:");
                       logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString());
                   }
               });
           }
       }
  1. 定義消息消費者

使用 @KafkaListener 註解監聽 topics 消息,此處的 topics 必須和 send函數中的 一致
@Header(KafkaHeaders.RECEIVED_TOPI 直接獲取 topic

/**
        * 消息消費者
        *
        * @author Jarvis
        * @date 2018/8/3
        */
       @Component
       public class KafkaConsumer {
       
           private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
       
           /**
            * 監聽kafka.tut 的topic,不作其餘業務
            *
            * @param record
            * @param topic  topic
            */
           @KafkaListener(id = "tut", topics = "kafka.tut")
           public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
               Optional<?> kafkaMessage = Optional.ofNullable(record.value());
       
               if (kafkaMessage.isPresent()) {
                   Object message = kafkaMessage.get();
       
                   logger.info("Receive: +++++++++++++++ Topic:" + topic);
                   logger.info("Receive: +++++++++++++++ Record:" + record);
                   logger.info("Receive: +++++++++++++++ Message:" + message);
               }
           }
       
       }
  1. 配置文件 application.yml
spring:
         application:
           name: kafka-tutorial
         kafka:
           # 指定kafka 代理地址,能夠多個
           bootstrap-servers: 192.168.10.100:9092
           producer:
             retries: 0
             # 每次批量發送消息的數量
             batch-size: 16384
             # 緩存容量
             buffer-memory: 33554432
             # 指定消息key和消息體的編解碼方式
             key-serializer: org.apache.kafka.common.serialization.StringSerializer
             value-serializer: org.apache.kafka.common.serialization.StringSerializer
           consumer:
             # 指定默認消費者group id
             group-id: consumer-tutorial
             auto-commit-interval: 100
             auto-offset-reset: earliest
             enable-auto-commit: true
             # 指定消息key和消息體的編解碼方式
             key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
             value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
           # 指定listener 容器中的線程數,用於提升併發量
           listener:
             concurrency: 3
  1. 直接使用 @Autowired 對類 KafkaSender<T> 自動裝配,而後調用 send 方法發送消息便可,下面給出代碼:

    @Autowired
        private KafkaSender<User> kafkaSender;
    
        @Test
        public void kafkaSend() throws InterruptedException {
            //模擬發消息
            for (int i = 0; i < 5; i++) {
    
                User user = new User();
                user.setId(System.currentTimeMillis());
                user.setMsg(UUID.randomUUID().toString());
                user.setSendTime(new Date());
    
                kafkaSender.send(message);
                Thread.sleep(3000);
    
            }
        }

    控制檯能夠看到執行成功:

圖片描述

在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181 能夠看到topic
圖片描述

Kafka如何保證數據的不丟失

1.生產者數據的不丟失

  • 新版本的producer採用異步發送機制。KafkaProducer.send(ProducerRecord)方法僅僅是把這條消息放入一個緩存中(即RecordAccumulator,本質上使用了隊列來緩存記錄),同時後臺的IO線程會不斷掃描該緩存區,將知足條件的消息封裝到某個batch中而後發送出去。顯然,這個過程當中就有一個數據丟失的窗口:若IO線程發送以前client端掛掉了,累積在accumulator中的數據的確有可能會丟失。 kafka的ack機制:在kafka發送數據的時候,每次發送消息都會有一個確認反饋機制,確保消息正常的可以被收到。
  • 若是是同步模式:ack機制可以保證數據的不丟失,若是ack設置爲0,風險很大,通常不建議設置爲0

    producer.type=sync 
        request.required.acks=1
  • 若是是異步模式:經過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,若是buffer滿了數據尚未發送出去,若是設置的是當即清理模式,風險很大,必定要設置爲阻塞模式

    producer.type=async 
        request.required.acks=1 
        queue.buffering.max.ms=5000 
        queue.buffering.max.messages=10000 
        queue.enqueue.timeout.ms = -1 
        batch.num.messages=200
  • 結論:producer有丟數據的可能,可是能夠經過配置保證消息的不丟失

2.消費者數據的不丟失

  • 若是在消息處理完成前就提交了offset,那麼就有可能形成數據的丟失。因爲Kafka consumer默認是自動提交位移的,因此在後臺提交位移前必定要保證消息被正常處理了,所以不建議採用很重的處理邏輯,若是處理耗時很長,則建議把邏輯放到另外一個線程中去作。爲了不數據丟失,現給出兩點建議:

    enable.auto.commit=false  關閉自動提交位移
        在消息被完整處理以後再手動提交位移
  • 若是使用了storm,要開啓storm的ackfail機制;
  • 若是沒有使用storm,確認數據被完成處理以後,再更新offset值。低級API中須要手動控制offset值。經過offset commit 來保證數據的不丟失,kafka本身記錄了每次消費的offset數值,下次繼續消費的時候,接着上次的offset進行消費便可。

源碼github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial

參考:

  1. http://kafka.apache.org/quick...
  2. https://docs.spring.io/spring...
  3. https://blog.csdn.net/tzs_104...

加粗文字

相關文章
相關標籤/搜索