SpringBoot開發案例之整合Kafka實現消息隊列

前言

最近在作一款秒殺的案例,涉及到了同步鎖、數據庫鎖、分佈式鎖、進程內隊列以及分佈式消息隊列,這裏對SpringBoot集成Kafka實現消息隊列作一個簡單的記錄。spring

Kafka簡介

Kafka是由Apache軟件基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,它能夠處理消費者規模的網站中的全部動做流數據。 這種動做(網頁瀏覽,搜索和其餘用戶的行動)是在現代網絡上的許多社會功能的一個關鍵因素。 這些數據一般是因爲吞吐量的要求而經過處理日誌和日誌聚合來解決。 對於像Hadoop的同樣的日誌數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是經過Hadoop的並行加載機制來統一線上和離線的消息處理,也是爲了經過集羣來提供實時的消息。數據庫

Kafka是一種高吞吐量的分佈式發佈訂閱消息系統,有以下特性:apache

  • 經過O(1)的磁盤數據結構提供消息的持久化,這種結構對於即便數以TB的消息存儲也可以保持長時間的穩定性能。bootstrap

  • 高吞吐量:即便是很是普通的硬件Kafka也能夠支持每秒數百萬的消息。安全

  • 支持經過Kafka服務器和消費機集羣來分區消息。服務器

  • 支持Hadoop並行數據加載。網絡

術語介紹

  • Broker Kafka集羣包含一個或多個服務器,這種服務器被稱爲broker數據結構

  • Topic 每條發佈到Kafka集羣的消息都有一個類別,這個類別被稱爲Topic。(物理上不一樣Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic便可生產或消費數據而沒必要關心數據存於何處)app

  • Partition Partition是物理上的概念,每一個Topic包含一個或多個Partition.分佈式

  • Producer 負責發佈消息到Kafka broker

  • Consumer 消息消費者,向Kafka broker讀取消息的客戶端。

  • Consumer Group 每一個Consumer屬於一個特定的Consumer Group(可爲每一個Consumer指定group name,若不指定group name則屬於默認的group)。

Kafka安裝

Kafka須要依賴JAVA環境運行,如何安裝JDK這裏不作介紹。

下載kafka:

 
  1. wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

將包下載到執行目錄並解壓:

 
  1. cd /usr/local/

  2. tar -xzvf kafka_2.11-0.10.0.1.tgz

修改kafka配置文件:

 
  1. cd kafka_2.11-0.10.0.1/config/

  2. #編輯配置文件

  3. vi server.properties

  4. broker.id=0

  5. #端口號、記得開啓端口,雲服務器要開放安全組

  6. port=9092

  7. #服務器IP地址,修改成本身的服務器IP

  8. host.name=127.0.0.1

  9. #zookeeper地址和端口, Kafka支持內置的Zookeeper和引用外部的Zookeeper

  10. zookeeper.connect=localhost:2181

分別啓動 kafka 和 zookeeper:

 
  1. ./zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/zookeeper.properties &

  2. ./kafka-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/server.properties &

SpringBoot集成

pom.xml引入:

 
  1. <!--kafka支持-->

  2. <dependency>

  3.    <groupId>org.springframework.kafka</groupId>

  4.    <artifactId>spring-kafka</artifactId>

  5.    <version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->

  6. </dependency>

application.properties配置:

 
  1. #kafka相關配置

  2. spring.kafka.bootstrap-servers=192.168.1.180:9092

  3. #設置一個默認組

  4. spring.kafka.consumer.group-id=0

  5. #key-value序列化反序列化

  6. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

  7. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

  8. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

  9. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

  10. #每次批量發送消息的數量

  11. spring.kafka.producer.batch-size=65536

  12. spring.kafka.producer.buffer-memory=524288

生產者KafkaSender:

 
  1. /**

  2. * 生產者

  3. * @author 科幫網 By https://blog.52itstyle.com

  4. */

  5. @Component

  6. public class KafkaSender {

  7.    @Autowired

  8.    private KafkaTemplate<String,String> kafkaTemplate;

  9.  

  10.    /**

  11.     * 發送消息到kafka

  12.     */

  13.    public void sendChannelMess(String channel, String message){

  14.        kafkaTemplate.send(channel,message);

  15.    }

  16. }

消費者:

 
  1. /**

  2. * 消費者 spring-kafka 2.0 + 依賴JDK8

  3. * @author 科幫網 By https://blog.52itstyle.com

  4. */

  5. @Component

  6. public class KafkaConsumer {

  7.    /**

  8.     * 監聽seckill主題,有消息就讀取

  9.     * @param message

  10.     */

  11.    @KafkaListener(topics = {"seckill"})

  12.    public void receiveMessage(String message){

  13.        //收到通道的消息以後執行秒殺操做

  14.    }

  15. }

參考

http://kafka.apache.org/

相關文章
相關標籤/搜索