Apache Kafka 是一個分佈式流處理平臺,用於構建實時的數據管道和流式的應用.它可讓你發佈和訂閱流式的記錄,能夠儲存流式的記錄,而且有較好的容錯性,能夠在流式記錄產生時就進行處理。html
Apache Kafka是分佈式發佈-訂閱消息系統,在 kafka官網上對 Kafka 的定義:一個分佈式發佈-訂閱消息傳遞系統。java
1,安裝配置Kafka ,Zookeepergit
安裝和配置過程很簡單,就不詳細說了,參考官網:http://kafka.apache.org/quickstartgithub
使用命令啓動Kafka: bin``/kafka-server-start``.sh config``/server``.properties
web
下面給出個人環境:spring
Centos 7.5, Kafka 2.11, Zookeeper-3.4.13, JDK1.8+
2,建立 Spring Boot 項目apache
注意版本:該項目使用Spring Boot 2.0 +,低版本可能不對json
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
KafkaSender<T>
T 就是你須要發送的消息 對象,序列化使用阿里的 fastjson消息發送後,能夠在回調類裏面處理本身的業務,ListenableFutureCallback
類有兩個方法,分別是 onFailureon
和 onSuccess
,實際場景能夠在這兩個方法,處理本身的具體業務,這裏不作實現。bootstrap
/** * 消息生產者 * * @author Jarvis * @date 2018/8/3 */ @Component public class KafkaSender<T> { private Logger logger = LoggerFactory.getLogger(KafkaSender.class); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; /** * kafka 發送消息 * * @param obj 消息對象 */ public void send(T obj) { String jsonObj = JSON.toJSONString(obj); logger.info("------------ message = {}", jsonObj); //發送消息 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("kafka.tut", jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { logger.info("Produce: The message failed to be sent:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { //TODO 業務處理 logger.info("Produce: The message was sent successfully:"); logger.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); } }); } }
@KafkaListener
註解監聽 topics 消息,此處的topics
必須和 send 函數中的 一致@Header(KafkaHeaders.RECEIVED_TOPI
直接獲取 topic緩存
/** * 監聽kafka.tut 的 topic * * @param record * @param topic topic */ @KafkaListener(id = "tut", topics = "kafka.tut") public void listen(ConsumerRecord<?, ?> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { //判斷是否NULL Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { //獲取消息 Object message = kafkaMessage.get(); logger.info("Receive: +++++++++++++++ Topic:" + topic); logger.info("Receive: +++++++++++++++ Record:" + record); logger.info("Receive: +++++++++++++++ Message:" + message); } }
spring: application: name: kafka-tutorial kafka: # 指定kafka 代理地址,能夠多個 bootstrap-servers: 192.168.10.100:9092 producer: retries: 0 # 每次批量發送消息的數量 batch-size: 16384 # 緩存容量 buffer-memory: 33554432 # 指定消息key和消息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # 指定默認消費者group id group-id: consumer-tutorial auto-commit-interval: 100 auto-offset-reset: earliest enable-auto-commit: true # 指定消息key和消息體的編解碼方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 指定listener 容器中的線程數,用於提升併發量 listener: concurrency: 3
@Autowired private KafkaSender<User> kafkaSender; @Test public void kafkaSend() throws InterruptedException { //模擬發消息 for (int i = 0; i < 5; i++) { User user = new User(); user.setId(System.currentTimeMillis()); user.setMsg(UUID.randomUUID().toString()); user.setSendTime(new Date()); kafkaSender.send(message); Thread.sleep(3000); } }
控制檯能夠看到執行成功:
在服務器執行 bin/kafka-topics.sh --list --zookeeper localhost:2181
能夠看到topic
1.生產者數據的不丟失
源碼 github:https://github.com/jarvisqi/java-tutorial/tree/master/kafka-tutorial
參考: