Kafka 簡介 & 集成 SpringBoot

近期在作 SOFA 與 SpringCloud 的集成,但願經過一系列的 DEMO 工程去幫助你們更好的使用 SOFA 和 SpringCloud;同時也但願你們一塊兒來參與共建和 star。java

GitHub傳送門:spring-cloud-sofastack-sampleslinux

Kafka 簡介

官方網站:https://kafka.apache.org/git

img

功能提供

Apache Kafka™ 是 一個分佈式數據流平臺,從官方文檔的解釋來看,其職能大致以下:github

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system。發佈和訂閱數據流,與消息隊列或企業級消息系統很像。
  • Store streams of records in a fault-tolerant durable way。具備很強容災性的存儲數據流
  • Process streams of records as they occur。及時的處理數據流。

做爲一個後端司機,大多數狀況下都是把 Kafka 做爲一個分佈式消息隊列來使用的,分佈式消息隊列能夠提供應用解耦、流量消峯、消息分發等功能,已是大型互聯網服務架構不可缺乏的基礎設置了。spring

基本概念

topic 和 partition

Kafka 對數據提供的核心抽象,topic 是發佈的數據流的類別或名稱。topic 在 Kafka 中,支持多訂閱者; 也就是說,topic 能夠有零個、一個或多個消費者訂閱寫到相應 topic 的數據。對應每個 topic,Kafka 集羣會維護像一個以下這樣的分區的日誌: apache

img
每一個 Partition 都是一個有序的、不可變的而且不斷被附加的記錄序列,也就是一個結構化提交日誌(commit log)。爲了保證惟一標性識 Partition 中的每一個數據記錄,Partition 中的記錄每一個都會被分配一個叫作偏移(offset)順序的ID號。經過一個可配置的保留期,Kafka 集羣會保留全部被髮布的數據,無論它們是否是已經被消費者處理。例如,若是保留期設置爲兩天,則在發佈記錄後的兩天內,數據均可以被消費,以後它將被丟棄以釋放空間。 Kafka 的性能是不爲由於數據量大小而受影響的,所以長時間存儲數據並不成問題。

img
事實上,在每一個消費者上保留的惟一元數據是消費者在日誌中的偏移位置,這個偏移由消費者控制:一般消費者會在讀取記錄時線性地提升其偏移值(offset++),但實際上,因爲偏移位置由消費者控制,它能夠以任何順序來處理數據記錄。 例如,消費者能夠重置爲較舊的偏移量以從新處理來自過去的數據,或者跳過以前的記錄,並從「如今」開始消費。 這種特徵的組合意味着 Kafka 消費者很是輕量級,隨意的開啓和關閉並不會對其餘的消費者有大的影響。

日誌中的 Partition 有幾個目的:bootstrap

  • 保證日誌的擴展性,topic 的大小不受單個服務器大小的限制。每一個單獨的 Partition 大小必須小於託管它的服務器磁盤大小,但 topic 可能有不少 Partition,所以它能夠處理任意數量的海量數據。
  • 做爲並行處理的單位 (知乎-Partition:Kafka能夠將主題劃分爲多個分區(Partition),會根據分區規則選擇把消息存儲到哪一個分區中,只要若是分區規則設置的合理,那麼全部的消息將會被均勻的分佈到不一樣的分區中,這樣就實現了負載均衡和水平擴展。另外,多個訂閱者能夠從一個或者多個分區中同時消費數據,以支撐海量數據處理能力)

kafka中的topic爲何要進行分區

原貼:kafka中的topic爲何要進行分區 ,因爲不能轉載,此處不摘抄原文~vim

生產者

生產者將數據發佈到他們選擇的 topic , 生產者負責選擇要吧數據分配給 topic 中哪一個 Partition。這能夠經過循環方式(round-robin)簡單地平衡負載,或者能夠根據某些語義進行分區(例如基於數據中的某些關鍵字)來完成。後端

消費者

消費者們使用消費羣組(consumer group )名稱來標註本身,幾個消費者共享一個 group,每個發佈到 topic 的數據會被傳遞到每一個消費羣組(consumer group )中的一個消費者實例。 消費者實例能夠在不一樣的進程中或不一樣的機器上。bash

若是全部的消費者實例具備相同的 consumer group,則記錄將在全部的消費者實例上有效地負載平衡

若是全部的消費者實例都有不一樣的 consumer group,那麼每一個記錄將被廣播給全部的消費者進程,每一個數據都發到了全部的消費者。

img

上圖解釋源自《Kafka 官方文檔》 介紹

如上圖,一個兩個服務器節點的Kafka集羣, 託管着4個分區(P0-P3),分爲兩個消費者羣. 消費者羣A有2個消費者實例,消費者羣B有4個. 然而,更常見的是,咱們發現主題具備少許的消費者羣,每一個消費者羣表明一個「邏輯訂戶」。每一個組由許多消費者實例組成,保證可擴展性和容錯能力。這能夠說是「發佈-訂閱」語義,但用戶是一組消費者而不是單個進程。 在Kafka中實現消費的方式,是經過將日誌中的分區均分到消費者實例上,以便每一個實例在任什麼時候間都是「相應大小的一塊」分區的惟一消費者。維護消費者組成員資格的過程,由卡夫卡協議動態處理。 若是新的實例加入組,他們將從組中的其餘成員接管一些分區; 若是一個實例消失,其分區將被分發到剩餘的實例。 Kafka僅提供單個分區內的記錄的順序,而不是主題中的不一樣分區之間的總順序。 每一個分區排序結合按鍵分區,足以知足大多數應用程序的需求。 可是,若是您須要使用總順序,則能夠經過僅具備一個分區的主題來實現,儘管這僅意味着每一個消費者組只有一個消費者進程。

Kafka 做爲消息系統

消息系統傳統上有兩種模式: 隊列發佈-訂閱

  • 隊列模式中,消費者池能夠從服務器讀取,每條記錄只會被某一個消費者消費
    • 容許在多個消費者實例上分配數據處理,可是一旦數據被消費以後,數據就沒有了
  • 發佈訂閱模式中,記錄將廣播給全部消費者
    • 容許將數據廣播到多個進程,但沒法縮放和擴容,由於每一個消息都發送給每一個訂閱用戶

本篇只介紹 Kafka 做爲消息隊列的一些基本概念,更多介紹請參考官方文檔

Kafka 安裝

這裏來看下如何安裝 kafka,下載地址:https://kafka.apache.org/downloads。本篇使用的版本是 kafka_2.12-1.1.1

  • 獲取包文件

    > wget http://mirrors.shu.edu.cn/apache/kafka/1.1.1/kafka_2.12-1.1.1.tgz
    複製代碼
  • 解壓壓縮包

    > tar -zxvf kafka_2.12-1.1.1.tgz
    複製代碼
  • 修改配置文件

    > cd kafka_2.12-1.1.1/config
    > vim server.properties
    複製代碼

    我這裏主要修改項包括如下幾個:

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    listeners=PLAINTEXT://192.168.0.1:9092
    
    advertised.listeners=PLAINTEXT://192.168.0.1:9092
    # zookeeper 地址,能夠多個
    zookeeper.connect=192.168.0.6:2181
    複製代碼

    Kafka 服務啓動須要依賴 Zookeeper ,因此在配置文件中須要指定 Zookeeper 集羣地址。Kafka 本身的安裝包中解壓以後是包括 Zookeeper 的,能夠經過如下的方式來啓動一個單節點 Zookeeper 實例:

    > sh zookeeper-server-start.sh -daemon config/zookeeper.properties
    複製代碼

    這裏我是指定了以前部署的一臺ZK機器,因此能夠直接將ZK地址指到已部署好的地址。Zookeeper 安裝能夠參考: Linux 下安裝 Zookeeper

    經過上述操做,下面就能夠直接來啓動Kafka 服務了:

    > sh kafka-server-start.sh config/server.properties
    複製代碼

SpringBoot 集成 Kafka

構建一個簡單的 Kafka Producer 工具依賴

  • 依賴引入
<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
	<version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->
</dependency>
複製代碼
  • producer

爲了能夠把 Kafka 封裝已提供給其餘模塊使用,你們能夠將 Kafka 的生產端工具類使用 SpringBoot 的自動配置機制進行包裝,以下:

@Configuration
public class KafkaProducerAutoConfiguration {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @Bean
    public KafkaSender kafkaSender(){
        return new KafkaSender(kafkaTemplate);
    }
}
複製代碼
  • KafkaSender
public class KafkaSender {
    private KafkaTemplate<String, String> kafkaTemplate;
    public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    /** * send message */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}
複製代碼
  • 自動配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.sofastack.cloud.core.kafka.configuration.KafkaProducerAutoConfiguration
複製代碼

工程模塊以下: image-20190306151759441.png

案例測試

在測試工程中引入依賴,這個依賴就是上面工程打包來的:

<dependency>
	<groupId>io.sofastack.cloud</groupId>
	<artifactId>sofastack-cloud-core-kafka</artifactId>
</dependency>
複製代碼
  • 在 resources 目錄下新建 application.properties 配置文件
#============== kafka ===================
# 指定kafka 代理地址,能夠多個,這裏的192.168.0.1是上面Kafka 啓動配置文件中對應的
# 注:網上一些帖子中說 Kafka 這裏的配置只能是主機名,不支持 ip,沒有驗證過,
# 若是您在驗證時出現問題,能夠嘗試本機綁定下 host
spring.kafka.bootstrap-servers= 192.168.0.1:9092
#=============== provider  =======================
spring.kafka.producer.retries=0
# 每次批量發送消息的數量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer  =======================
# 指定默認消費者group id
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.application.name=kafka-test
logging.path=./logs
複製代碼
  • 啓動類中模擬發送消息
@SpringBootApplication
@PropertySource("classpath:application-kafka.properties")
public class ProviderApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(ProviderApplication.class, args);
        // 這裏經過容器獲取,正常使用狀況下,能夠直接使用 Autowired 注入
        KafkaSender bean = run.getBean(KafkaSender.class);
        for (int i = 0; i < 3; i++) {
            //調用消息發送類中的消息發送方法
            bean.sendMessage(KafkaContants.TRADE_TOPIC, "send a test message");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
複製代碼
  • 編寫消費者,在 SpringBoot 工程中,消費者實現很是簡單
@Component
public class KafkaReceiver {
    // 配置監聽的主體,groupId 和配置文件中的保持一致
    @KafkaListener(topics = { KafkaContants.TRADE_TOPIC }, groupId = "test-consumer-group")
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println(message);
        }
    }
}
複製代碼

啓動工程後,能夠在控制檯看下消費者打印的信息:

這裏保持應用正常運行,再經過服務端來手動發送消息,看下是當前消費者可以正確監聽到對應的 topic 並消費。

> sh kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic trading
複製代碼

執行上述命令以後,命令行將會等待輸入,這裏輸入前後輸入 glmapper 和 sofa :

而後再看下應用程序控制臺輸入結果以下: image-20190306153452565.png

參考

相關文章
相關標籤/搜索