Kafka集成Spring-AcknowledgeMessageListener接口實現

前言

因工做須要,需在系統利用Kafka監聽接口,實現消息隊列中,對消息的消費,首選Kafka,由於看中其超高的吞吐量。java

基本概念

  • 1 Producer: 特指消息的生產者
  • 2 Consumer :特指消息的消費者
  • 3 Consumer Group :消費者組,能夠並行消費Topic中partition的消息
  • 4 Broker:緩存代理,Kafa 集羣中的一臺或多臺服務器統稱爲 broker。
  • 5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不一樣分類。
  • 6 Partition:Topic 物理上的分組,一個 topic 能夠分爲多個 partition,每一個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)
  • 7 Message:消息,是通訊的基本單位,每一個 producer 能夠向一個 topic(主題)發佈一些消息
  • 8 稀疏索引:採用稀疏索引的方式,利用二分查找,定位消息。

集成Spring

  • 添加Maven依賴 
    因爲項目使用Maven進行管理,引入Kafka-Spring相關Jar包,須要添加依賴,此處使用的是Kafka0.10.2
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.2.2.RELEASE</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 1 版本兼容性 
    配置完Maven依賴之後,還須要確認,由於Kafka與Spring有依賴關係,須要肯定Spring的版本是否能和Kafka0.10.2完美兼容,查閱Spring For Apache Kafka 文檔可知: 
      Compatibilityspring

    • Apache Kafka 0.10.2.0
    • Tested with Spring Framework version dependency is 4.3.7 but it is expected that the framework will work with earlier versions of Spring.
    • Annotation-based listeners require Spring Framework 4.1 or higher, however.
    • Minimum Java version: 7. 
      Kafka 0.10.2 須要SpringFrameWork 4.3.7,但後續會逐漸兼容SpringFrameWork更早期的版本,實踐發現,Kafka的生產者裏面的api會受SpringFrameWork版本影響,而消費者無影響,所以,能夠保持項目中原有springframework不變。
  • 2 排除重複包 
    引入Maven依賴之後,Kafka的maven依賴,自動包含了springframework相關jar包,須要排除。apache

<dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-beans</artifactId>
      <version>4.3.9.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>4.3.9.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-core</artifactId>
      <version>4.3.9.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-oxm</artifactId>
      <version>4.3.9.RELEASE</version>
      <scope>compile</scope>
      <optional>true</optional>
    </dependency>
  • 3 接口區別 
    Kafka消費者,實現有兩種方式:client客戶端和listener監聽接口,這裏因業務須要,採用監聽接口的方式實現,Spring提供了四種接口,以下所示:
public interface MessageListener<K, V> {} 1

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> {} 2

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface BatchMessageListener<K, V> {} 3

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> {} 4

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

對應的解釋以下 
一、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. 
使用MessageListener接口實現時,當消費者拉取消息以後,消費完成會自動提交offset,即enable.auto.commit爲true時,適合使用此接口 
二、Use this for processing individual ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods. 
使用AcknowledgeMessageListener時,當消費者消費一條消息以後,不會自動提交offset,須要手動ack,即enable.auto.commit爲false時,適合使用此接口 
三、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using auto-commit, or one of the container-managed commit methods. AckMode.RECORD is not supported when using this interface since the listener is given the complete batch.bootstrap

四、Use this for processing all ConsumerRecord s received from the kafka consumer poll() operation when using one of the manual commit methods.api

BatchMessageListener和BatchAcknowledgingMessageListener接口做用與上述兩個接口大致相似,只是適合批量消費消息決定是否自動提交offset緩存

因爲業務較重,且offset自動提交時,出現消費異常或者消費失敗的狀況,消費者容易丟失消息,因此須要採用手動提交offset的方式,所以,這裏實現了AcknowledgeMessageListener接口。服務器

Spring配置文件

配置思路: 
一、肯定須要定義的beans:session

  • 1 consumerProperties 消費者的基本屬性,包括指定bootstrap.servers,group.id等
  • 2 consumerFactory :消費者工廠,配置完consumerProperties 後,須要將consumerProperties 做爲參數,配置進consumerFactory中
  • 3 containProperties: 消費者容器屬性對象的bean,這個bean會指定後續自定義的監聽接口bean及ackMode(手動提交時,採起什麼提交方式)
  • 4 messageListenerContainer:消費者容器,啓動監聽接口的bean,須要將先前定義的consumerFactory 、containProperties配置進這個bean,並定義其init-method = doStart,在啓動spring時,便會自動啓動監聽接口,同時,此bean指定了topic
  • 5 kafkaMessageListener:監聽接口,這個接口由本身定義,須要將其配置進containProperties中, 
    具體完整消費者的配置文件以下所示:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
        <!--一、consumer屬性配置,hashMap-->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
                <entry key="group.id" value="${kafka.group.id}"/>
                <entry key="enable.auto.commit" value="false"/>
                <entry key="session.timeout.ms" value="15000"/>
                <!--<entry key="auto.offset.reset" value="earliest"/>-->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer.encoding" value="UTF8"/>
                <entry key="value.deserializer.encoding" value="UTF8"/>
            </map>
        </constructor-arg>
    </bean>
     <!--二、Kafka消費者工廠,DefaultKafkaConsumerFactory-->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>
     <!--三、監聽接口,AcknowledgingMessageListener-->
    <bean id="kafkaMessageListener" class="com.lianjia.bigdata.dataarch.auth.kafka.KafkaMessageListener">
    <property name="threadPool" ref="kafkaWorkerThreadPool"/>
    </bean>
    <bean id="kafkaWorkerThreadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="20"/>
        <property name="maxPoolSize" value="200"/>
        <property name="queueCapacity" value="500"/>
        <property name="keepAliveSeconds" value="1800"/>
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
     <!--四、Kafka消費者容器,屬性配置-->
    <bean id="containProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="${kafka.topic}"/>
        <property name="ackMode" value="MANUAL_IMMEDIATE"/>
        <property name="messageListener" ref="kafkaMessageListener"/>
    </bean>
    <!--五、Kafka消費者容器-->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart" >
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containProperties"/>
    </bean>
</bean>

示例代碼

寫了個簡單的測試用例 
生產者: 
實現每秒定時向brokers發送一條消息maven

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

public class SimpleKafkaProducer implements Runnable {
    protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaProducer.class);

    @Override
    public void run() {
        Map<String, Object> sendProps = senderProps();
        Producer producer = new KafkaProducer(sendProps);
        Integer currentNum = 0;
        try {
            LOGGER.info("start produce message");
            while (true){
                ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("testTopic",currentNum, currentNum);
                producer.send(producerRecord);
                LOGGER.info("send message:" + currentNum + " And value is " + producerRecord.value());
                currentNum++;
                Thread.sleep(1000);
            }
        }catch (Exception e){
            LOGGER.error("send message fail", e);
        }finally {
            producer.close();
        }
    }
    public static void main(String[] args) {
        SimpleKafkaProducer simpleKafkaProducer = new SimpleKafkaProducer();
        new Thread(simpleKafkaProducer).start();
    }


    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

消費者ide

public class KafkaMessageListener implements AcknowledgingMessageListener<Integer, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class);
     @Override
    public void onMessage(final ConsumerRecord<Integer, String> message, final Acknowledgment acknowledgment) {
    //TODO 這裏具體實現我的業務邏輯
    // 最後 調用acknowledgment的ack方法,提交offset
    acknowledgment.acknowledge();
    }
}

消費者使用示例:這裏參考spring官方文檔,簡單實現了一個消費者監聽接口示例

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

public class SimpleKafkaConsumer extends SpringUnitTest {
    protected static final Logger LOGGER = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
    @Resource(name = "kafkaMessageListener")
    private  KafkaMessageListener kafkaMessageListener;

    @Test
    public void TestLinstener(){
        ContainerProperties containerProps = new ContainerProperties("testTopic");
   containerProps.setMessageListener(kafkaMessageListener);
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("messageListenerContainer");
        container.start();
    }
    private static KafkaMessageListenerContainer<Integer, String> createContainer(
            ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(props);
        KafkaMessageListenerContainer<Integer, String> container =
                new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }

    private static Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.33.106.94:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

}

實現acknowledgeMessageListener接口以前,查閱了網上現有的文檔,結果不盡如人意,只能試着本身去參考spring官方文檔,慢慢摸索,最終實現手動提交offset的監聽接口,固然,Kafka的知識點,遠不止這些,後續還將繼續學習。

相關文章
相關標籤/搜索