spring kafka使用總結

1 pom文件配置

<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.0.0.RC1</version>
        </dependency>

2 consumer配置

2.1 xml配置文件

創建kafka-consumer.xml文件配置java

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
       default-autowire="byName">


    <!-- 定義consumer的參數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
                <entry key="group.id" value="data_center_group"/>
                <!--消費端口收不到消息-->
                <!--spring-kafka-consumer.xml的auto-startup設置爲true-->
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="5000"/>
                <entry key="session.timeout.ms" value="30000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>

    <!-- 實際執行消息消費的類 -->
    <bean id="messageListenerConsumerService" class="com.hisense.fusion.kafka.consumer.KafkaConsumerServer"/>

    <!-- 聚好看消費者容器配置信息 -->
    <bean id="mediaContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="${kafka.topic.media}"/>
        <property name="messageListener" ref="messageListenerConsumerService"/>
    </bean>

    <!-- mediaMessageListenerContainer bean,使用的時候,只須要注入這個bean -->
    <bean id="mediaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="mediaContainerProperties"/>
    </bean>

</beans>

2.2  監聽器配置

創建監聽文件,實現messageListener接口spring

package com.hisense.fusion.kafka.consumer;

import com.hisense.fusion.kafka.parse.IncDataParse;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.MessageListener;

/**
 * kafka監聽器啓動
 * 自動監聽是否有消息須要消費
 * @author bill
 *
 */
public class KafkaConsumerServer implements MessageListener<String, String> {

    protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
    /**
     * 監聽器自動執行該方法
     * 	消費消息
     * 	自動提交offset
     * 	執行業務代碼
     * 	(high level api 不提供offset管理,不能指定offset進行消費)
     */
    public void onMessage(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String value = record.value();
        long offset = record.offset();
        int partition = record.partition();
        LOG.info("topic:{}-partition:{}-offset:{}",topic,partition,offset);
    }

}

3 producer配置

3.1 kafka配置

建立kafka-producer.xml配置文件apache

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	   xmlns:context="http://www.springframework.org/schema/context"
	   xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"
	   default-autowire="byName">

	<!-- 定義producer的參數 -->
	<bean id="producerProperties" class="java.util.HashMap">
		<constructor-arg>
			<map>
				<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/>
				<entry key="retries" value="3" />
				<entry key="retry.backoff.ms" value="10000" />
				<entry key="request.timeout.ms" value="10000" />
                <entry key="acks" value="1"/>
				<!--<entry key="batch.size" value="5242880" />-->
                <!--<entry key="compression.type" value="gzip"/>-->
				<entry key="linger.ms" value="1" />
				<entry key="buffer.memory" value="33554432" />
				<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
				<entry key="value.serializer" value="org.springframework.kafka.support.serializer.JsonSerializer" />
			</map>
		</constructor-arg>
	</bean>

	<!-- 建立kafkatemplate須要使用的producerfactory bean -->
	<bean id="producerFactory"
		class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
		<constructor-arg>
			<ref bean="producerProperties" />
		</constructor-arg>
	</bean>

	<!-- 建立kafkatemplate bean,使用的時候,只須要注入這個bean,便可使用template的send消息方法 -->
	<!--<bean id="producerListener" class="com.hisense.fusion.kafka.producer.KafkaProducerListener" />-->

	<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
		<constructor-arg ref="producerFactory" />
		<constructor-arg name="autoFlush" value="true" />
		<property name="defaultTopic" value="defaultTopic" />
	</bean>

</beans>

3.2  消息發送

package com.hisense.fusion.kafka.producer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.hisense.fusion.kafka.constants.KafkaMesConstant;
import com.hisense.fusion.kafka.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * kafkaProducer模板
 * 使用此模板發送消息
 *
 * @author bill
 */
@Service
public class KafkaProducerServer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * The logger.
     */
    private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServer.class);

    /**
     * kafka發送消息模板
     *
     * @param topic  主題
     * @param values messageValue
     */
    public Boolean sendMessageList(String topic, String schema, List<JSONObject> values, String key, String ifPartition, Integer partitionNum) {
        boolean rs = true;
        Long startTime = DateUtil.getMillisTimeStamp();
        ListenableFuture<SendResult<String, Object>> result = null;
        int partitionIndex;
        String value;
        boolean send;
        int size = 0;
        String kafkaKey = null;
        JSONObject data = new JSONObject();
        if (values != null && values.size() > 0) {
            size = values.size();
            for (JSONObject object : values) {
                if (object.get(key) instanceof Integer) {
                    kafkaKey = ((Integer) object.get(key)).toString();
                }
                if (object.get(key) instanceof Long) {
                    kafkaKey = ((Long) object.get(key)).toString();
                }
                if (object.get(key) instanceof String) {
                    kafkaKey = ((String) object.get(key)).toString();
                }
                if(kafkaKey==null){
                    logger.warn("primary key is null,topic:{},key:{},data:{}",topic,key,JSON.toJSONString(object));
                    continue;
                }
                try {
                    data.put("schema", generateSchema(topic,object));
                    data.put("payload", object);
                    if (ifPartition.equals("0")) {
                        //表示使用分區
                        partitionIndex = getPartitionIndex(kafkaKey, partitionNum);
//                        result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, object);
                        result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, data);
                    } else {
                        result = kafkaTemplate.send(topic, kafkaKey, data);
                    }
                    send = checkResult(result);
                }catch (JSONException e){
                    logger.error("{} schema define is error:{}",topic,e.getMessage());
                    send = false;
                }catch (Exception e){
                    logger.error("{} send kafka failed :{}",topic,e.getMessage());
                    send = false;
                }
                if (!send) {
                    rs = false;
                    break;
                }
            }
        }
        Long endTime = DateUtil.getMillisTimeStamp();
        if (rs) {
            logger.info("topic:{},Send Result: true,count : {},takes :{} ms", topic, size, endTime - startTime);
        } else {
            logger.error("topic:{},Send Result: false,count : {}", topic, size);
        }
        return rs;
    }


    /**
     * 檢查發送返回結果record
     *
     * @param res
     * @return
     */
    private boolean checkResult(ListenableFuture<SendResult<String, Object>> res) {
        if (res != null) {
            try {
                SendResult r = res.get();//檢查result結果集
                /*檢查recordMetadata的offset數據,不檢查producerRecord*/
                Long offsetIndex = r.getRecordMetadata().offset();
                if (offsetIndex != null && offsetIndex >= 0) {
                    return true;
                } else {
                    return false;
                }
            } catch (InterruptedException e) {
                logger.error("kafka exception :{}", e.getMessage());
                return false;
            } catch (ExecutionException e) {
                logger.error("kafka exception :{}", e.getMessage());
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 根據key值獲取分區索引
     *
     * @param key
     * @param partitionNum
     * @return
     */
    private int getPartitionIndex(String key, int partitionNum) {
        if (key == null) {
            Random random = new Random();
            return random.nextInt(partitionNum);
        } else {
            int result = key.hashCode();
            if (result != Integer.MIN_VALUE) {
                result = Math.abs(key.hashCode()) % partitionNum;
            } else {
                result = key.hashCode() % partitionNum;
            }
            return result;
        }
    }

}

3.3  消息發送監聽

package com.hisense.fusion.kafka.producer;

import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;

/**
 * kafkaProducer監聽器,在producer配置文件中開啓
 * @author bill
 *
 */
@SuppressWarnings("rawtypes")
public class KafkaProducerListener implements ProducerListener {
    protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");
    /**
     * 發送消息成功後調用
     */
    public void onSuccess(String topic, Integer partition, Object key,
                          Object value, RecordMetadata recordMetadata) {
        LOG.info("==========kafka發送數據成功(日誌開始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
//        LOG.info("----------value:"+value);
        LOG.info("~~~~~~~~~~kafka發送數據成功(日誌結束)~~~~~~~~~~");
    }

    /**
     * 發送消息錯誤後調用
     */
    public void onError(String topic, Integer partition, Object key,
                        Object value, Exception exception) {
        LOG.info("==========kafka發送數據錯誤(日誌開始)==========");
        LOG.info("----------topic:"+topic);
        LOG.info("----------partition:"+partition);
        LOG.info("----------key:"+key);
        LOG.info("----------value:"+value);
        LOG.info("----------Exception:"+exception);
        LOG.info("~~~~~~~~~~kafka發送數據錯誤(日誌結束)~~~~~~~~~~");
        exception.printStackTrace();
    }

    /**
     * 方法返回值表明是否啓動kafkaProducer監聽器
     */
    public boolean isInterestedInSuccess() {
//        LOG.info("///kafkaProducer監聽器啓動///");
        return true;
    }

}
相關文章
相關標籤/搜索