Spring Boot 2.0 快速集成整合消息中間件 Kafka

歡迎關注筆者微信公衆號: 小哈學Java, 每日持續推送 Java 領域乾貨文章,月送福利哦!!php

我的網站: www.exception.site/springboot/…java

什麼是 Kafka?

Kafka 是 Apache 基金會開源的一個分佈式發佈 - 訂閱消息中間件,流處理平臺。 它起源於 LinkedIn,由 Scala 和 Java兩種語言編寫而成。於 2011 年成爲 Apache 項目,2012 成爲 Apache 基金會下頂級項目。git

Kafka 專爲分佈式高吞吐系統而設計。相比較其餘消息中間件,如 RabbitMq 等,Kafka 具備更好的吞吐量,內置分區,複製和固有的容錯能力,使得它很是適合應用在大數據領域。另外,Kafka 還支持離線、在線消費消息。github

爲何要用 Kafka

  • 低延遲 - Kafka 支持低延遲消息傳遞,速度極快,能達到 200w 寫/秒;
  • 高性能 - Kafka對於消息的發佈、訂閱都具備高吞吐量。即便存儲了 TB 級的消息,依然可以保證穩定的性能;
  • 可靠性 - Kafka 是分佈式,分區,複製和容錯的,保證零停機和零數據丟失。
  • 可拓展性 - Kafka 支持集羣水平拓展。
  • 耐用性 - Kafka 使用"分佈式提交日誌",消息可以快速的持久化的磁盤上。

Kafka 環境安裝

接下來,小哈爲你們演示一下,在 Linux 系統中,採用最簡單的單機安裝方式, 由於本文着重點仍是介紹 Spring Boot 2.x 快速集成整合 Kafka.web

下載 Kafka

訪問 Kafka 官網 kafka.apache.org/downloads,下載 tgz 包, 這裏演示版本爲最新的 2.3.0 版本。spring

解壓,進入目錄

下載下來事後,放置到指定位置,執行命令解壓:apache

tar -zxvf kafka_2.11-2.3.0.tgz 
複製代碼

解壓完成後,進入 Kafka 目錄下:json

cd kafka_2.11-2.3.0
複製代碼

啓動 zookeeper

經過 bin 目錄下的 zookeeper-server-start.sh 啓動腳本,來啓動 zk 單節點實例:bootstrap

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
複製代碼

啓動 Kafka

經過 bin 目錄下的 kafka-server-start.sh 來啓動 :數組

bin/kafka-server-start.sh  config/server.properties
複製代碼

注意:Kafka 默認使用 9092 端口,注意關閉防火牆,阿里雲服務器的話,記得添加安全組。

Spring Boot 2.x 開始整合

新建一個 Spring Boot 2.x Web 工程。

項目結構

添加 maven 依賴

小哈這裏完整的 maven 依賴以下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>site.exception</groupId>
    <artifactId>spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      
       	<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 阿里巴巴 fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
複製代碼

添加 kafka 配置

修改 application.yml 文件,添加 kafka 相關配置:

spring:
 kafka:
    # 指定 kafka 地址,我這裏在本地,直接就 localhost, 若外網地址,注意修改【PS: 能夠指定多個】
 bootstrap-servers: localhost:9092
 consumer:
      # 指定 group_id
 group-id: group_id
 auto-offset-reset: earliest
      # 指定消息key和消息體的編解碼方式
 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
 producer:
      # 指定消息key和消息體的編解碼方式
 key-deserializer: org.apache.kafka.common.serialization.StringSerializer
 value-deserializer: org.apache.kafka.common.serialization.StringSerializer
複製代碼

關於 auto-offset-reset

auto.offset.reset 配置有3個值能夠設置,分別以下:

  • earliest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset時,從頭開始消費;
  • latest:當各分區下有已提交的 offset 時,從提交的 offset 開始消費;無提交的 offset 時,消費新產生的該分區下的數據;
  • none: topic各分區都存在已提交的 offset 時,從 offset 後開始消費;只要有一個分區不存在已提交的 offset,則拋出異常;

默認建議用 earliest, 設置該參數後 kafka出錯後重啓,找到未消費的offset能夠繼續消費。

而 latest 這個設置容易丟失消息,假如 kafka 出現問題,還有數據往topic中寫,這個時候重啓kafka,這個設置會從最新的offset開始消費, 中間出問題的哪些就無論了。

none 這個設置沒有用過,兼容性太差,常常出問題。

新增一個訂單類

模擬業務系統中,用戶每下一筆訂單,就發送一個消息,供其餘服務消費:

/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 訂單實體類 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /** * 訂單id */
    private long orderId;
    /** * 訂單號 */
    private String orderNum;
    /** * 訂單建立時間 */
    private LocalDateTime createTime;
}

複製代碼

添加一個消息發佈者

新建一個 KafkaProvider 消息提供者類,源碼以下:

/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 消息提供者 **/
@Component
@Slf4j
public class KafkaProvider {

    /** * 消息 TOPIC */
    private static final String TOPIC = "xiaoha";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(long orderId, String orderNum, LocalDateTime createTime) {
        // 構建一個訂單類
        Order order = Order.builder()
                .orderId(orderId)
                .orderNum(orderNum)
                .createTime(createTime)
                .build();

        // 發送消息,訂單類的 json 做爲消息體
        ListenableFuture<SendResult<String, String>> future =
                kafkaTemplate.send(TOPIC, JSONObject.toJSONString(order));

        // 監聽回調
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.info("## Send message fail ...");
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                log.info("## Send message success ...");
            }
        });
    }
}
複製代碼

添加一個消息消費者

消息發送出去了,固然就須要一個消費者,消費者拿到消息後,再作相關的業務處理,這裏,小哈僅僅是打印消息體。

添加 KafkaConsumer 消費者類:

/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription 消息消費者 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "xiaoha", groupId = "group_id")
    public void consume(String message) {
        log.info("## consume message: {}", message);
    }
}
複製代碼

經過 @KafkaListener註解,咱們能夠指定須要監聽的 topic 以及 groupId, 注意,這裏的 topics 是個數組,意味着咱們能夠指定多個 topic,如:@KafkaListener(topics = {"xiaoha", "xiaoha2"}, groupId = "group_id")

注意:消息發佈者的 TOPIC 須要保持與消費者監聽的 TOPIC 一致,否者消費不到消息。

單元測試

新建單元測試,功能測試消息發佈,以及消費。

/** * @author 犬小哈(公衆號:小哈學Java) * @date 2019/4/12 * @time 下午3:05 * @discription **/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootKafkaApplicationTests {

    @Autowired
    private KafkaProvider kafkaProvider;

    @Test
    public void sendMessage() throws InterruptedException {
        // 發送 1000 個消息
        for (int i = 0; i < 1000; i++) {
            long orderId = i+1;
            String orderNum = UUID.randomUUID().toString();
            kafkaProvider.sendMessage(orderId, orderNum, LocalDateTime.now());
        }

        TimeUnit.MINUTES.sleep(1);
    }
}
複製代碼

發送 1000 個消息,看消息是否可以被正常發佈與消費,控制檯日誌以下:

能夠發現,1000 個消息被成功發送,且被正常消費。

咱們再驗證下 Kafka 的 topic 列表,看 xiaoha 這個topic 是否正常被建立, 執行 bin 目錄下查看 topic 列表的 kafka-topics.sh 腳本:

bin/kafka-topics.sh --list --zookeeper localhost:2181
複製代碼

好了,大功告成!

總結

小哈今天主要和你們分享了,如何安裝單機版的 kafka 環境、如何在 Spring Boot 2.x 中快速集成消息中間件 Kafka,以及演示了相關示例代碼來發布消息、消費消息,但願你們看完事後有所收穫,下期見!

GitHub 源碼地址

github.com/weiwosuoai/…

參考資料

zh.wikipedia.org/wiki/Kafka

www.w3cschool.cn/apache_kafk…

juejin.im/post/5d406a…

www.jianshu.com/p/e1df7d18b…

歡迎關注微信公衆號: 小哈學Java

關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦
關注微信公衆號【小哈學Java】,回覆【資源】,便可免費無套路領取資源連接哦
相關文章
相關標籤/搜索