<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.0.0.RC1</version> </dependency>
創建kafka-consumer.xml文件配置java
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd" default-autowire="byName"> <!-- 定義consumer的參數 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/> <entry key="group.id" value="data_center_group"/> <!--消費端口收不到消息--> <!--spring-kafka-consumer.xml的auto-startup設置爲true--> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="5000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 建立consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 實際執行消息消費的類 --> <bean id="messageListenerConsumerService" class="com.hisense.fusion.kafka.consumer.KafkaConsumerServer"/> <!-- 聚好看消費者容器配置信息 --> <bean id="mediaContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="${kafka.topic.media}"/> <property name="messageListener" ref="messageListenerConsumerService"/> </bean> <!-- mediaMessageListenerContainer bean,使用的時候,只須要注入這個bean --> <bean id="mediaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="mediaContainerProperties"/> </bean> </beans>
創建監聽文件,實現messageListener接口spring
package com.hisense.fusion.kafka.consumer; import com.hisense.fusion.kafka.parse.IncDataParse; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.listener.MessageListener; /** * kafka監聽器啓動 * 自動監聽是否有消息須要消費 * @author bill * */ public class KafkaConsumerServer implements MessageListener<String, String> { protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer"); /** * 監聽器自動執行該方法 * 消費消息 * 自動提交offset * 執行業務代碼 * (high level api 不提供offset管理,不能指定offset進行消費) */ public void onMessage(ConsumerRecord<String, String> record) { String topic = record.topic(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); LOG.info("topic:{}-partition:{}-offset:{}",topic,partition,offset); } }
建立kafka-producer.xml配置文件apache
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd" default-autowire="byName"> <!-- 定義producer的參數 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${kafka.bootstrap.servers}"/> <entry key="retries" value="3" /> <entry key="retry.backoff.ms" value="10000" /> <entry key="request.timeout.ms" value="10000" /> <entry key="acks" value="1"/> <!--<entry key="batch.size" value="5242880" />--> <!--<entry key="compression.type" value="gzip"/>--> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.springframework.kafka.support.serializer.JsonSerializer" /> </map> </constructor-arg> </bean> <!-- 建立kafkatemplate須要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 建立kafkatemplate bean,使用的時候,只須要注入這個bean,便可使用template的send消息方法 --> <!--<bean id="producerListener" class="com.hisense.fusion.kafka.producer.KafkaProducerListener" />--> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="defaultTopic" /> </bean> </beans>
package com.hisense.fusion.kafka.producer; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.hisense.fusion.kafka.constants.KafkaMesConstant; import com.hisense.fusion.kafka.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import java.io.UnsupportedEncodingException; import java.util.*; import java.util.concurrent.ExecutionException; /** * kafkaProducer模板 * 使用此模板發送消息 * * @author bill */ @Service public class KafkaProducerServer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * The logger. */ private static final Logger logger = LoggerFactory.getLogger(KafkaProducerServer.class); /** * kafka發送消息模板 * * @param topic 主題 * @param values messageValue */ public Boolean sendMessageList(String topic, String schema, List<JSONObject> values, String key, String ifPartition, Integer partitionNum) { boolean rs = true; Long startTime = DateUtil.getMillisTimeStamp(); ListenableFuture<SendResult<String, Object>> result = null; int partitionIndex; String value; boolean send; int size = 0; String kafkaKey = null; JSONObject data = new JSONObject(); if (values != null && values.size() > 0) { size = values.size(); for (JSONObject object : values) { if (object.get(key) instanceof Integer) { kafkaKey = ((Integer) object.get(key)).toString(); } if (object.get(key) instanceof Long) { kafkaKey = ((Long) object.get(key)).toString(); } if (object.get(key) instanceof String) { kafkaKey = ((String) object.get(key)).toString(); } if(kafkaKey==null){ logger.warn("primary key is null,topic:{},key:{},data:{}",topic,key,JSON.toJSONString(object)); continue; } try { data.put("schema", generateSchema(topic,object)); data.put("payload", object); if (ifPartition.equals("0")) { //表示使用分區 partitionIndex = getPartitionIndex(kafkaKey, partitionNum); // result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, object); result = kafkaTemplate.send(topic, partitionIndex, kafkaKey, data); } else { result = kafkaTemplate.send(topic, kafkaKey, data); } send = checkResult(result); }catch (JSONException e){ logger.error("{} schema define is error:{}",topic,e.getMessage()); send = false; }catch (Exception e){ logger.error("{} send kafka failed :{}",topic,e.getMessage()); send = false; } if (!send) { rs = false; break; } } } Long endTime = DateUtil.getMillisTimeStamp(); if (rs) { logger.info("topic:{},Send Result: true,count : {},takes :{} ms", topic, size, endTime - startTime); } else { logger.error("topic:{},Send Result: false,count : {}", topic, size); } return rs; } /** * 檢查發送返回結果record * * @param res * @return */ private boolean checkResult(ListenableFuture<SendResult<String, Object>> res) { if (res != null) { try { SendResult r = res.get();//檢查result結果集 /*檢查recordMetadata的offset數據,不檢查producerRecord*/ Long offsetIndex = r.getRecordMetadata().offset(); if (offsetIndex != null && offsetIndex >= 0) { return true; } else { return false; } } catch (InterruptedException e) { logger.error("kafka exception :{}", e.getMessage()); return false; } catch (ExecutionException e) { logger.error("kafka exception :{}", e.getMessage()); return false; } } else { return false; } } /** * 根據key值獲取分區索引 * * @param key * @param partitionNum * @return */ private int getPartitionIndex(String key, int partitionNum) { if (key == null) { Random random = new Random(); return random.nextInt(partitionNum); } else { int result = key.hashCode(); if (result != Integer.MIN_VALUE) { result = Math.abs(key.hashCode()) % partitionNum; } else { result = key.hashCode() % partitionNum; } return result; } } }
package com.hisense.fusion.kafka.producer; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.support.ProducerListener; /** * kafkaProducer監聽器,在producer配置文件中開啓 * @author bill * */ @SuppressWarnings("rawtypes") public class KafkaProducerListener implements ProducerListener { protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer"); /** * 發送消息成功後調用 */ public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) { LOG.info("==========kafka發送數據成功(日誌開始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); // LOG.info("----------value:"+value); LOG.info("~~~~~~~~~~kafka發送數據成功(日誌結束)~~~~~~~~~~"); } /** * 發送消息錯誤後調用 */ public void onError(String topic, Integer partition, Object key, Object value, Exception exception) { LOG.info("==========kafka發送數據錯誤(日誌開始)=========="); LOG.info("----------topic:"+topic); LOG.info("----------partition:"+partition); LOG.info("----------key:"+key); LOG.info("----------value:"+value); LOG.info("----------Exception:"+exception); LOG.info("~~~~~~~~~~kafka發送數據錯誤(日誌結束)~~~~~~~~~~"); exception.printStackTrace(); } /** * 方法返回值表明是否啓動kafkaProducer監聽器 */ public boolean isInterestedInSuccess() { // LOG.info("///kafkaProducer監聽器啓動///"); return true; } }