Spring集成Kafka-註解,xml配置2種方式實現

準備工做:

1.安裝kafka+zookeeper環境html

2.利用命令建立好topicjava

參考官網 http://kafka.apache.org/documentation/ spring

一XML配置文件方式實現

🌟1. Pom文件,引入spring-kafka jar包這裏須要注意2個地方

  1) kafka-clients 包版本與服務器端kafka-clients版本保持一致(查看服務器kafka版本方法 在kafka安裝目錄下libs 中查找kafka-clients開頭的jar文件)apache

  2)引入的spring-kafka 版本在2.0或者2.X 時Spring版本在5.0才能支持bootstrap

...
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.3.5.RELEASE</version>
    <exclusions>
         <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
          </exclusion>
    </exclusions>
</dependency>
...

 2. 配置文件結構以下圖

 

3. kafka.properties文件內容以下  

 1 # brokers集羣
 2 kafka.producer.bootstrap.servers = ip1:9092,ip2:9092,ip3:9092
 3 
 4 kafka.producer.acks = all
 5 
 6 #發送失敗重試次數
 7 kafka.producer.retries = 3
 8 
 9 kafka.producer.linger.ms =  10
10 
11 # 33554432 即32MB的批處理緩衝區
12 kafka.producer.buffer.memory = 40960
13 
14 #批處理條數:當多個記錄被髮送到同一個分區時,生產者會嘗試將記錄合併到更少的請求中。這有助於客戶端和服務器的性能
15 kafka.producer.batch.size = 4096
16 
17 kafka.producer.defaultTopic = nwbs-eval-task
18 
19 kafka.producer.key.serializer = org.apache.kafka.common.serialization.StringSerializer
20 
21 kafka.producer.value.serializer = org.apache.kafka.common.serialization.StringSerializer
22 
23 
24 ################# kafka consumer ################## ,
25 kafka.consumer.bootstrap.servers = ip1:9092,ip2,ip3:9092
26 
27 # 若是爲true,消費者的偏移量將在後臺按期提交
28 kafka.consumer.enable.auto.commit = true
29 
30 #如何設置爲自動提交(enable.auto.commit=true),這裏設置自動提交週期
31 kafka.consumer.auto.commit.interval.ms=1000 
32 
33 #order-beta 消費者羣組ID,發佈-訂閱模式,即若是一個生產者,多個消費者都要消費,那麼須要定義本身的羣組,同一羣組內的消費者只有一個能消費到消息
34 kafka.consumer.group.id = sccl-nwbs
35 
36 #在使用Kafka的組管理時,用於檢測消費者故障的超時
37 kafka.consumer.session.timeout.ms = 30000
38 
39 kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
40 kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer

4.consumer-kafka.xml 配置以下

  須要注意cn.**.kafka.KafkaConsumerSerivceImpl 此類 須要實現 MessageListener 接口服務器

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5        http://www.springframework.org/schema/beans/spring-beans.xsd">
 6         <!-- 1.定義consumer的參數 -->
 7         <!--<context:property-placeholder location="classpath*:kafka/kafka.properties" />-->
 8         <bean id="consumerProperties" class="java.util.HashMap">
 9             <constructor-arg>
10                 <map>
11                     <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}" />
12                     <entry key="group.id" value="${kafka.consumer.group.id}" />
13                     <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}" />
14                     <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}" />
15                     <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}" />
16                     <entry key="retry.backoff.ms" value="100" />
17                     <entry key="key.deserializer"
18                            value="${kafka.consumer.key.deserializer}" />
19                     <entry key="value.deserializer"
20                            value="${kafka.consumer.value.deserializer}" />
21                 </map>
22             </constructor-arg>
23         </bean>
24 
25         <!-- 2.建立consumerFactory bean -->
26         <bean id="consumerFactory"
27               class="org.springframework.kafka.core.DefaultKafkaConsumerFactory" >
28             <constructor-arg>
29                 <ref bean="consumerProperties" />
30             </constructor-arg>
31         </bean>
32 
33         <!--&lt;!&ndash; 3.定義消費實現類 &ndash;&gt;-->
34         <bean id="kafkaConsumerService" class="cn.**.kafka.KafkaConsumerSerivceImpl" />
35 
36         <!-- 4.消費者容器配置信息 -->
37         <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
38             <!-- topic -->
39             <constructor-arg name="topics">
40                 <list>
41                     <value>${kafka.task.eval.topic}</value>
42                     <value>${kafka.task.optimizeNetwork.topic}</value>
43                     <value>${kafka.task.business.topic}</value>
44                 </list>
45             </constructor-arg>
46             <property name="messageListener" ref="kafkaConsumerService" />
47         </bean>
48         <!-- 5.消費者併發消息監聽容器,執行doStart()方法 -->
49         <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart" >
50             <constructor-arg ref="consumerFactory" />
51             <constructor-arg ref="containerProperties" />
52             <property name="concurrency" value="${kafka.consumer.concurrency}" />
53         </bean>
54 </beans>

 

5. producer-kafka.xml 配置以下

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5        http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7     <!--<context:property-placeholder location="classpath:kafka/kafka.properties" />-->
 8     <!-- 定義producer的參數 -->
 9     <bean id="producerProperties" class="java.util.HashMap">
10         <constructor-arg>
11             <map>
12                 <entry key="bootstrap.servers" value="${kafka.producer.bootstrap.servers}" />
13                 <!--<entry key="group.id" value="${group.id}" />-->
14                 <entry key="retries" value="${kafka.producer.retries}" />
15                 <entry key="batch.size" value="${kafka.producer.batch.size}" />
16                 <entry key="linger.ms" value="${kafka.producer.linger.ms}" />
17                 <entry key="buffer.memory" value="${kafka.producer.buffer.memory}" />
18                 <entry key="acks" value="${kafka.producer.acks}" />
19                 <entry key="key.serializer"
20                        value="${kafka.producer.key.serializer}" />
21                 <entry key="value.serializer"
22                        value="${kafka.producer.value.serializer}"/>
23             </map>
24         </constructor-arg>
25     </bean>
26 
27     <!-- 建立kafkatemplate須要使用的producerfactory bean -->
28     <bean id="producerFactory"
29           class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
30         <constructor-arg>
31             <ref bean="producerProperties" />
32         </constructor-arg>
33     </bean>
34 
35     <!-- 建立kafkatemplate bean,使用的時候,只須要注入這個bean,便可使用template的send消息方法 -->
36     <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
37         <constructor-arg ref="producerFactory" />
38         <constructor-arg name="autoFlush" value="true" />
39         <property name="defaultTopic" value="${kafka.producer.defaultTopic}" />
40     </bean>
41 </beans>

 

6. 調用Controller -這裏 向kafka 中的 3個topic 發送了消息

/**
 * @author: hsc
 * @date: 2018/6/19 14:44
 * @description 文件描述
 */

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Value("nwbs-optimizeNetwork-task")
    private  String optimizeTopic ;

    @Value("nwbs-business-task")
    private String businessTopic;

    @RequestMapping(value = "/producer" , method = RequestMethod.POST)
    public void producer(@RequestBody JSONObject params){
        kafkaTemplate.send(optimizeTopic,params.toJSONString()+"optimizeTopic");
        kafkaTemplate.send(businessTopic,params.toJSONString()+"businessTopic");
        ListenableFuture<SendResult<String, String>> listenableFuture =  kafkaTemplate.sendDefault(params.toJSONString());;
        //發送成功回調
        SuccessCallback<SendResult<String, String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                //成功業務邏輯
                System.out.println("onSuccess");
            }
        };
        //發送失敗回調
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                //失敗業務邏輯
                System.out.println("onFailure");
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
    }
}

 二 註解方式實現

參考spring-kafka官方文檔 https://docs.spring.io/spring-kafka/reference/htmlsingle/session

1. 文件總體結構如圖

2. KafKaConsumerConfig.java代碼

 1 /**
 2  * @author: hsc
 3  * @date: 2018/6/21 15:58
 4  * @description kafka 消費者配置
 5  */
 6 @Configuration
 7 @EnableKafka
 8 public class KafkaConsumerConfig {
 9     public KafkaConsumerConfig(){
10         System.out.println("kafka消費者配置加載...");
11     }
12     @Bean
13     KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
14     kafkaListenerContainerFactory() {
15         ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
16                 new ConcurrentKafkaListenerContainerFactory();
17         factory.setConsumerFactory(consumerFactory());
18         factory.setConcurrency(3);
19         factory.getContainerProperties().setPollTimeout(3000);
20         return factory;
21     }
22 
23     @Bean
24     public ConsumerFactory<Integer, String> consumerFactory() {
25         return new DefaultKafkaConsumerFactory(consumerProperties());
26     }
27 
28     @Bean
29     public Map<String, Object> consumerProperties() {
30         Map<String, Object> props= new HashMap<String, Object>();
31         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.bootstrap.servers"));
32         props.put(ConsumerConfig.GROUP_ID_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.group.id"));
33         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.enable.auto.commit"));
34         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, PropertiesUtil.getInstance().getString("kafka.consumer.auto.commit.interval.ms"));
35         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.session.timeout.ms"));
36         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.key.deserializer"));
37         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,  PropertiesUtil.getInstance().getString("kafka.consumer.value.deserializer"));
38         return props;
39     }
40 
41     @Bean
42     public KafkaConsumerListener kafkaConsumerListener(){
43         return new KafkaConsumerListener();
44     }
45 
46 }

3.KafKaProducerConfig.java

 1 /**
 2  * @author: hsc
 3  * @date: 2018/6/21 21:30
 4  * @description kafka 生產者配置
 5  */
 6 @Configuration
 7 @EnableKafka
 8 public class KafkaProducerConfig {
 9     public KafkaProducerConfig(){
10         System.out.println("kafka生產者配置");
11     }
12     @Bean
13     public ProducerFactory<Integer, String> producerFactory() {
14         return new DefaultKafkaProducerFactory(producerProperties());
15     }
16 
17     @Bean
18     public Map<String, Object> producerProperties() {
19         Map<String, Object> props = new HashMap<String, Object>();
20         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.bootstrap.servers"));
21         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getInstance().getString("kafka.producer.key.serializer"));
22         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.value.serializer"));
23         props.put(ProducerConfig.RETRIES_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.retries"));
24         props.put(ProducerConfig.BATCH_SIZE_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.batch.size",1048576));
25         props.put(ProducerConfig.LINGER_MS_CONFIG,PropertiesUtil.getInstance().getInt("kafka.producer.linger.ms"));
26         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,PropertiesUtil.getInstance().getLong("kafka.producer.buffer.memory",33554432L));
27         props.put(ProducerConfig.ACKS_CONFIG,PropertiesUtil.getInstance().getString("kafka.producer.acks","all"));
28         return props;
29     }
30 
31     @Bean
32     public KafkaTemplate<Integer, String> kafkaTemplate() {
33         KafkaTemplate kafkaTemplate = new KafkaTemplate<Integer, String>(producerFactory(),true);
34         kafkaTemplate.setDefaultTopic(PropertiesUtil.getInstance().getString("kafka.producer.defaultTopic","default"));
35         return kafkaTemplate;
36     }
37 
38 }

4.KafkaConsumerListenser

/**
 * @author: hsc
 * @date: 2018/6/21 16:33
 * @description 消費者listener
 */
public class KafkaConsumerListener {
    /**
     * @param data
     */
    @KafkaListener(groupId="xxx" ,topics = "xxx")
    void listener(ConsumerRecord<String, String> data){
        System.out.println("消費者線程:"+Thread.currentThread().getName()+"[ 消息 來自kafkatopic:"+data.topic()+",分區:"+data.partition()
                +" ,委託時間:"+data.timestamp()+"]消息內容以下:");
        System.out.println(data.value());
    }
}

 

參考文章(結合官方文檔一塊兒看)併發

http://www.cnblogs.com/dennyzhangdd/p/7759875.htmlapp

相關文章
相關標籤/搜索