Spring Boot 2.x 快速集成整合消息中間件 Kafka

歡迎關注我的微信公衆號: 小哈學Java, 每日推送 Java 領域乾貨文章,關注即免費無套路附送 100G 海量學習、面試資源喲!!

我的網站: https://www.exception.site/springboot/spring-boot2-kafkajava

什麼是 Kafka?

Kafka 是 Apache 基金會開源的一個分佈式發佈 - 訂閱消息中間件,流處理平臺。 它起源於 LinkedIn,由 Scala 和 Java兩種語言編寫而成。於 2011 年成爲 Apache 項目,2012 成爲 Apache 基金會下頂級項目。git

Kafka 專爲分佈式高吞吐系統而設計。相比較其餘消息中間件,如 RabbitMq 等,Kafka 具備更好的吞吐量,內置分區,複製和固有的容錯能力,使得它很是適合應用在大數據領域。另外,Kafka 還支持離線、在線消費消息。github

爲何要用 Kafka

  • 低延遲 - Kafka 支持低延遲消息傳遞,速度極快,能達到 200w 寫/秒;
  • 高性能 - Kafka對於消息的發佈、訂閱都具備高吞吐量。即便存儲了 TB 級的消息,依然可以保證穩定的性能;
  • 可靠性 - Kafka 是分佈式,分區,複製和容錯的,保證零停機和零數據丟失。
  • 可拓展性 - Kafka 支持集羣水平拓展。
  • 耐用性 - Kafka 使用"分佈式提交日誌",消息可以快速的持久化的磁盤上。

image

Kafka 環境安裝

接下來,小哈爲你們演示一下,在 Linux 系統中,採用最簡單的單機安裝方式, 由於本文着重點仍是介紹 Spring Boot 2.x 快速集成整合 Kafka.web

下載 Kafka

訪問 Kafka 官網 http://kafka.apache.org/downloads,下載 tgz 包, 這裏演示版本爲最新的 2.3.0 版本。面試

image

解壓,進入目錄

下載下來事後,放置到指定位置,執行命令解壓:spring

tar -zxvf kafka_2.11-2.3.0.tgz

解壓完成後,進入 Kafka 目錄下:apache

cd kafka_2.11-2.3.0

啓動 zookeeper

經過 bin 目錄下的 zookeeper-server-start.sh 啓動腳本,來啓動 zk 單節點實例:json

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

啓動 Kafka

經過 bin 目錄下的 kafka-server-start.sh 來啓動 :bootstrap

bin/kafka-server-start.sh  config/server.properties
注意:Kafka 默認使用 9092 端口,注意關閉防火牆,阿里雲服務器的話,記得添加安全組。

Spring Boot 2.x 開始整合

新建一個 Spring Boot 2.x Web 工程。數組

項目結構

image

添加 maven 依賴

小哈這裏完整的 maven 依賴以下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>site.exception</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      
           <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

添加 kafka 配置

修改 application.yml 文件,添加 kafka 相關配置:

spring:
  kafka:
    # 指定 kafka 地址,我這裏在本地,直接就 localhost, 若外網地址,注意修改【PS: 能夠指定多個】
    bootstrap-servers: localhost:9092
    consumer:
      # 指定 group_id
      group-id: group_id
      auto-offset-reset: earliest
      # 指定消息key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # 指定消息key和消息體的編解碼方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

關於 auto-offset-reset

auto.offset.reset 配置有3個值能夠設置,分別以下:

  • earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
  • latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
  • none: topic 各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;

默認建議用 earliest, 設置該參數後 kafka出錯後重啓,找到未消費的offset能夠繼續消費。

而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啓kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就無論了。

none 這個設置沒有用過,兼容性太差,常常出問題。

新增一個訂單類

模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其餘服務消費:

/**
 * @author 犬小哈(公衆號:小哈學Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 訂單實體類
 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 訂單id
     */
    private long orderId;
    /**
     * 訂單號
     */
    private String orderNum;
    /**
     * 訂單建立時間
     */
    private LocalDateTime createTime;
}

添加一個消息發佈者

新建一個 KafkaProvider 消息提供者類,源碼以下:

/**
 * @author 犬小哈(公衆號:小哈學Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息提供者
 **/
@Component
@Slf4j
public class KafkaProvider {

    /**
     * 消息 TOPIC
     */
    private static final String TOPIC = "xiaoha";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 構建一個訂單類
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 發送消息,訂單類的 json 做爲消息體
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 監聽回調
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("## Send message success ...");
            }
        });
    }
}

添加一個消息消費者

消息發送出去了,固然就須要一個消費者,消費者拿到消息後,再作相關的業務處理,這裏,小哈僅僅是打印消息體。

添加 KafkaConsumer 消費者類:

/**
 * @author 犬小哈(公衆號:小哈學Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription 消息消費者
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "xiaoha", groupId = "group_id")
    public void consume(String message) {
        log.info("## consume message: {}", message);
    }
}

經過 @KafkaListener註解,咱們能夠指定須要監聽的 topic 以及 groupId, 注意,這裏的 topics 是個數組,意味着咱們能夠指定多個 topic,如:@KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id")

注意:消息發佈者的 TOPIC 須要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

單元測試

新建單元測試,功能測試消息發佈,以及消費。

/**
 * @author 犬小哈(公衆號:小哈學Java)
 * @date 2019/4/12
 * @time 下午3:05
 * @discription
 **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {

    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        // 發送 1000 個消息
        for (int i = 0; i < 1000; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }

        TimeUnit.MINUTES.sleep(1);
    }
}

發送 1000 個消息,看消息是否可以被正常發佈與消費,控制檯日誌以下:

image

能夠發現,1000 個消息被成功發送,且被正常消費。

咱們再驗證下 Kafka 的 topic 列表,看 xiaoha 這個 topic 是否正常被建立, 執行 bin 目錄下查看 topic 列表的 kafka-topics.sh 腳本:

bin/kafka-topics.sh --list --zookeeper localhost:2181

image

好了,大功告成!

總結

小哈今天主要和你們分享了,如何安裝單機版的 kafka 環境、如何在 Spring Boot 2.x 中快速集成消息中間件 Kafka,以及演示了相關示例代碼來發布消息、消費消息,但願你們看完事後有所收穫,下期見!

GitHub 源碼地址

https://github.com/weiwosuoai/spring-boot-tutorial/tree/master/spring-boot-kafka

參考資料

https://zh.wikipedia.org/wiki/Kafka

https://www.w3cschool.cn/apache_kafka/

http://www.javashuo.com/article/p-uhappwig-bb.html

https://www.jianshu.com/p/e1df7d18bb8f

免費分享 | 面試&學習福利資源

最近在網上發現一個不錯的 PDF 資源《Java 核心知識&面試.pdf》分享給你們,不光是面試,學習,你都值得擁有!!!

獲取方式: 關注公衆號: 小哈學Java, 後臺回覆資源,既可免費無套路獲取資源連接,下面是目錄以及部分截圖:

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

重要的事情說兩遍,關注公衆號: 小哈學Java, 後臺回覆資源,既可免費無套路獲取資源連接 !!!

歡迎關注微信公衆號: 小哈學Java

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦

相關文章
相關標籤/搜索