以Spring Boot 1.5.19.RELEASE 爲例。java
PS:此文是已在服務端安裝好Kafka的前提下進行的。(請自行查找怎麼安裝Kafka及建立Topic等)spring
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <exclusions> <exclusion> <artifactId>kafka-clients</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> </exclusions> </dependency>
#============== kafka =================== # 指定kafka 代理地址,能夠多個 kafka.bootstrap.servers=xxxxxxx:9093 #=============== provider ======================= kafka.producer.retries=2 # 每次批量發送消息的數量 kafka.producer.batch.size=16384 kafka.producer.buffer.memory=33554432 kafka.producer.linger=1 kafka.producer.acks=all # 指定消息key和消息體的編解碼方式 producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= # 指定默認消費者group id kafka.consumer.group.id=dev-consumer-group #earliest 當分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費。 #latest 當分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據。 #none 當該topic下全部分區中存在未提交的offset時,拋出異常。 kafka.consumer.auto.offset.reset=earliest kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=60000 kafka.consumer.auto.commit.interval=1000 # 指定listener 容器中的線程數,用於提升併發量 kafka.consumer.concurrency=2 # 指定消息key和消息體的編解碼方式 kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
其中kafka.bootstrap.servers是Kafka的地址,若是是服務器地址,要確保打開對應端口的外網訪問,若是多個能夠用逗號隔開
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; @Value("${kafka.producer.acks}") private String acks; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, acks); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.bootstrap.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean(name = "kafkaListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId + HostUtil.getLocalHost().getHostAddress().replace(".","")); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
配置都完成了,先往Topic裏發送幾條消息:apache
@Resource private KafkaTemplate<String, String> kafkaTemplate; private Gson gson = new GsonBuilder().create(); //發送消息方法 public void send() { KafkaMessage message = new KafkaMessage(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); log.info("++ message = {}", gson.toJson(message)); //發送消息到 test ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send("test", gson.toJson(message)); sendCallBack(listenableFuture); } private void sendCallBack(ListenableFuture> listenableFuture) { try { SendResult sendResult = listenableFuture.get(3, TimeUnit.SECONDS); listenableFuture.addCallback( successCallBack -> log.info("kafka Producer發送消息成功!topic=" + sendResult.getRecordMetadata().topic() + ",partition=" + sendResult.getRecordMetadata().partition() + ",offset=" + sendResult.getRecordMetadata().offset()), failureCallBack -> log.error("kafka Producer發送消息失敗!sendResult=" + gson.toJson(sendResult.getProducerRecord()))); } catch (Exception e) { log.error("獲取producer返回值失敗", e); } }
發送完成了,還要有消費方消費消息:bootstrap
@KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory") public void listen(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); } }
這樣就能夠接收test這個topic的消息了。服務器
注意這裏的Topic test是已經在Kafka裏面建立好的,若是沒有建立Topic test,是發送不到這個Topic裏面的,怎麼建立Topic這裏再也不多作介紹。session