Kafka 概述html
Apache Kafka 是一個分佈式流處理平臺,用於構建實時的數據管道和流式的應用.它可讓你發佈和訂閱流式的記錄,能夠儲存流式的記錄,而且有較好的容錯性,能夠在流式記錄產生時就進行處理。java
Apache Kafka是分佈式發佈-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分佈式發佈-訂閱消息傳遞系統。 git
Kafka 特性github
Kafka 使用場景web
Spring Boot2.0 + Kafkaspring
1,安裝配置Kafka ,Zookeeperapache
安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quick...json
使用命令啓動Kafka: bin/kafka-server-start
.sh config/server
.properties bootstrap
下面給出個人環境:緩存
Centos 7.5, Kafka 2.11, Zookeeper-3.4.13, JDK1.8+
2,建立 Spring Boot 項目
注意版本:該項目使用Spring Boot 2.0 +,低版本不保證正確
pom.xml引用
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
/** * 消息生產者 * * @author Jarvis * @date 2018/8/3 */ @Component public class KafkaSender<T> { private Logger logger = LoggerFactory.getLogger(KafkaSender.class); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * kafka 發送消息 * * @param obj 消息對象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); logger.info("------------ message = {}", jsonObj); //發送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { logger.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { //TODO 業務處理 logger.info("Produce: The message was sent successfully:"); logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); } }); } }
使用 @KafkaListener
註解監聽 topics 消息,此處的 topics 必須和 send函數中的 一致@Header(KafkaHeaders.RECEIVED_TOPI
直接獲取 topic
/** * 消息消費者 * * @author Jarvis * @date 2018/8/3 */ @Component public class KafkaConsumer { private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); /** * 監聽kafka.tut 的topic,不作其餘業務 * * @param record * @param topic topic */ @KafkaListener(id = "tut", topics = "kafka.tut") public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); logger.info("Receive: +++++++++++++++ Topic:" + topic); logger.info("Receive: +++++++++++++++ Record:" + record); logger.info("Receive: +++++++++++++++ Message:" + message); } } }
spring: application: name: kafka-tutorial kafka: # 指定kafka 代理地址,能夠多個 bootstrap-servers: 192.168.10.100:9092 producer: retries: 0 # 每次批量發送消息的數量 batch-size: 16384 # 緩存容量 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 指定默認消費者group id group-id: consumer-tutorial auto-commit-interval: 100 auto-offset-reset: earliest enable-auto-commit: true # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的線程數,用於提升併發量 listener: concurrency: 3
直接使用 @Autowired 對類 KafkaSender<T> 自動裝配,而後調用 send 方法發送消息便可,下面給出代碼:
@Autowired private KafkaSender<User> kafkaSender; @Test public void kafkaSend() throws InterruptedException { //模擬發消息 for (int i = 0; i < 5; i++) { User user = new User(); user.setId(System.currentTimeMillis()); user.setMsg(UUID.randomUUID().toString()); user.setSendTime(new Date()); kafkaSender.send(message); Thread.sleep(3000); } }
控制檯能夠看到執行成功:
在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181
能夠看到topic
Kafka如何保證數據的不丟失
1.生產者數據的不丟失
若是是同步模式:ack機制可以保證數據的不丟失,若是ack設置爲0,風險很大,通常不建議設置爲0
producer.type=sync request.required.acks=1
若是是異步模式:經過buffer來進行控制數據的發送,有兩個值來進行控制,時間閾值與消息的數量閾值,若是buffer滿了數據尚未發送出去,若是設置的是當即清理模式,風險很大,必定要設置爲阻塞模式
producer.type=async request.required.acks=1 queue.buffering.max.ms=5000 queue.buffering.max.messages=10000 queue.enqueue.timeout.ms = -1 batch.num.messages=200
2.消費者數據的不丟失
若是在消息處理完成前就提交了offset,那麼就有可能形成數據的丟失。因爲Kafka consumer默認是自動提交位移的,因此在後臺提交位移前必定要保證消息被正常處理了,所以不建議採用很重的處理邏輯,若是處理耗時很長,則建議把邏輯放到另外一個線程中去作。爲了不數據丟失,現給出兩點建議:
enable.auto.commit=false 關閉自動提交位移 在消息被完整處理以後再手動提交位移
源碼github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial
參考:
加粗文字