RocketMQ學習分享

消息隊列的流派

 

什麼是 MQ

Message Queue(MQ),消息隊列中間件。不少人都說:MQ 經過將消息的發送和接收分離來實現應用程序的異步和解偶,這個給人的直覺是——MQ 是異步的,用來解耦的,可是這個只是 MQ 的效果而不是目的。MQ 真正的目的是爲了通信,屏蔽底層複雜的通信協議,定義了一套應用層的、更加簡單的通信協議。一個分佈式系統中兩個模塊之間通信要麼是 HTTP,要麼是本身開發的 TCP,可是這兩種協議其實都是原始的協議。HTTP 協議很難實現兩端通信——模塊 A 能夠調用 B,B 也能夠主動調用 A,若是要作到這個兩端都要背上 WebServer,並且還不支持長鏈接(HTTP 2.0 的庫根本找不到)。TCP 就更加原始了,粘包、心跳、私有的協議,想想頭皮就發麻。MQ 所要作的就是在這些協議之上構建一個簡單的「協議」——生產者/消費者模型。MQ 帶給個人「協議」不是具體的通信協議,而是更高層次通信模型。它定義了兩個對象——發送數據的叫生產者;接收數據的叫消費者, 提供一個 SDK 讓咱們能夠定義本身的生產者和消費者實現消息通信而無視底層通信協議git

 

有 Broker 的 MQ

這個流派一般有一臺服務器做爲 Broker,全部的消息都經過它中轉。生產者把消息發送給它就結束本身的任務了,Broker 則把消息主動推送給消費者(或者消費者主動輪詢)github

重 Topic

kafka、JMS(ActiveMQ)就屬於這個流派,生產者會發送 key 和數據到 Broker,由 Broker 比較 key 以後決定給哪一個消費者。這種模式是咱們最多見的模式,是咱們對 MQ 最多的印象。在這種模式下一個 topic 每每是一個比較大的概念,甚至一個系統中就可能只有一個 topic,topic 某種意義上就是 queue,生產者發送 key 至關於說:「hi,把數據放到 key 的隊列中」web

如上圖所示,Broker 定義了三個隊列,key1,key2,key3,生產者發送數據的時候會發送 key1 和 data,Broker 在推送數據的時候則推送 data(也可能把 key 帶上)。spring

雖然架構同樣可是 kafka 的性能要比 jms 的性能不知道高到多少倍,因此基本這種類型的 MQ 只有 kafka 一種備選方案。若是你須要一條暴力的數據流(在意性能而非靈活性)那麼 kafka 是最好的選擇docker

輕 Topic

這種的表明是 RabbitMQ(或者說是 AMQP)。生產者發送 key 和數據,消費者定義訂閱的隊列,Broker 收到數據以後會經過必定的邏輯計算出 key 對應的隊列,而後把數據交給隊列express

這種模式下解耦了 key 和 queue,在這種架構中 queue 是很是輕量級的(在 RabbitMQ 中它的上限取決於你的內存),消費者關心的只是本身的 queue;生產者沒必要關心數據最終給誰只要指定 key 就好了,中間的那層映射在 AMQP 中叫 exchange(交換機)。apache

AMQP 中有四種 exchangejson

  • Direct exchange:key 就等於 queue
  • Fanout exchange:無視 key,給全部的 queue 都來一份
  • Topic exchange:key 能夠用「寬字符」模糊匹配 queue
  • Headers exchange:無視 key,經過查看消息的頭部元數據來決定發給那個 queue(AMQP 頭部元數據很是豐富並且能夠自定義)

這種結構的架構給通信帶來了很大的靈活性,咱們能想到的通信方式均可以用這四種 exchange 表達出來。若是你須要一個企業數據總線(在意靈活性)那麼 RabbitMQ 絕對的值得一用服務器

 

無 Broker 的 MQ

無 Broker 的 MQ 的表明是 ZeroMQ。該做者很是睿智,他很是敏銳的意識到——MQ 是更高級的 Socket,它是解決通信問題的。因此 ZeroMQ 被設計成了一個「庫」而不是一箇中間件,這種實現也能夠達到——沒有 Broker 的目的:網絡

節點之間通信的消息都是發送到彼此的隊列中,每一個節點都既是生產者又是消費者。ZeroMQ 作的事情就是封裝出一套相似於 Socket 的 API 能夠完成發送數據,讀取數據

ZeroMQ 其實就是一個跨語言的、重量級的 Actor 模型郵箱庫。你能夠把本身的程序想象成一個 Actor,ZeroMQ 就是提供郵箱功能的庫;ZeroMQ 能夠實現同一臺機器的 RPC 通信也能夠實現不一樣機器的 TCP、UDP 通信,若是你須要一個強大的、靈活、野蠻的通信能力,別猶豫 ZeroMQ

 

附:Queue 和 Topic 的區別

  • Queue: 一個發佈者發佈消息,下面的接收者按隊列順序接收,好比發佈了 10 個消息,兩個接收者 A,B 那就是 A,B 總共 會收到 10 條消息,不重複。
  • Topic: 一個發佈者發佈消息,有兩個接收者 A,B 來訂閱,那麼發佈了 10 條消息,A,B 各收到 10 條消息。

 

1

 

RocketMQ 簡介

 

概述

消息隊列做爲高併發系統的核心組件之一,可以幫助業務系統解構提高開發效率和系統穩定性。主要具備如下優點:

  • 削峯填谷: 主要解決瞬時寫壓力大於應用服務能力致使消息丟失、系統奔潰等問題
  • 系統解耦: 解決不一樣重要程度、不一樣能力級別系統之間依賴致使一死全死
  • 提高性能: 當存在一對多調用時,能夠發一條消息給消息系統,讓消息系統通知相關係統
  • 蓄流壓測: 線上有些鏈路很差壓測,能夠經過堆積必定量消息再放開來壓測

 

RocketMQ

Apache Alibaba RocketMQ 是一個消息中間件。消息中間件中有兩個角色:消息生產者和消息消費者。RocketMQ 裏一樣有這兩個概念,消息生產者負責建立消息併發送到 RocketMQ 服務器,RocketMQ 服務器會將消息持久化到磁盤,消息消費者從 RocketMQ 服務器拉取消息並提交給應用消費。

 

RocketMQ 特色

RocketMQ 是一款分佈式、隊列模型的消息中間件,具備如下特色:

  • 支持嚴格的消息順序
  • 支持 Topic 與 Queue 兩種模式
  • 億級消息堆積能力
  • 比較友好的分佈式特性
  • 同時支持 Push 與 Pull 方式消費消息
  • 歷經屢次天貓雙十一海量消息考驗

 

RocketMQ 優點

目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要優點有:

  • 支持事務型消息(消息發送和 DB 操做保持兩方的最終一致性,RabbitMQ 和 Kafka 不支持)
  • 支持結合 RocketMQ 的多個系統之間數據最終一致性(多方事務,二方事務是前提)
  • 支持 18 個級別的延遲消息(RabbitMQ 和 Kafka 不支持)
  • 支持指定次數和時間間隔的失敗消息重發(Kafka 不支持,RabbitMQ 須要手動確認)
  • 支持 Consumer 端 Tag 過濾,減小沒必要要的網絡傳輸(RabbitMQ 和 Kafka 不支持)
  • 支持重複消費(RabbitMQ 不支持,Kafka 支持)

 

消息隊列對比參照表

 

基於 Docker 安裝 RocketMQ

 

docker-compose.yml

注意:啓動 RocketMQ Server + Broker + Console 至少須要 2G 內存

  1 version: '3.5'
  2 services:
  3   rmqnamesrv:
  4     image: foxiswho/rocketmq:server
  5     container_name: rmqnamesrv
  6     ports:
  7       - 9876:9876
  8     volumes:
  9       - ./data/logs:/opt/logs
 10       - ./data/store:/opt/store
 11     networks:
 12         rmq:
 13           aliases:
 14             - rmqnamesrv
 15 
 16   rmqbroker:
 17     image: foxiswho/rocketmq:broker
 18     container_name: rmqbroker
 19     ports:
 20       - 10909:10909
 21       - 10911:10911
 22     volumes:
 23       - ./data/logs:/opt/logs
 24       - ./data/store:/opt/store
 25       - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
 26     environment:
 27         NAMESRV_ADDR: "rmqnamesrv:9876"
 28         JAVA_OPTS: " -Duser.home=/opt"
 29         JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
 30     command: mqbroker -c /etc/rocketmq/broker.conf
 31     depends_on:
 32       - rmqnamesrv
 33     networks:
 34       rmq:
 35         aliases:
 36           - rmqbroker
 37 
 38   rmqconsole:
 39     image: styletang/rocketmq-console-ng
 40     container_name: rmqconsole
 41     ports:
 42       - 8080:8080
 43     environment:
 44         JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
 45     depends_on:
 46       - rmqnamesrv
 47     networks:
 48       rmq:
 49         aliases:
 50           - rmqconsole
 51 
 52 networks:
 53   rmq:
 54     name: rmq
 55     driver: bridge

 

broker.conf

RocketMQ Broker 須要一個配置文件,按照上面的 Compose 配置,咱們須要在 ./data/brokerconf/ 目錄下建立一個名爲 broker.conf 的配置文件,內容以下:

  1 # Licensed to the Apache Software Foundation (ASF) under one or more
  2 # contributor license agreements.  See the NOTICE file distributed with
  3 # this work for additional information regarding copyright ownership.
  4 # The ASF licenses this file to You under the Apache License, Version 2.0
  5 # (the "License"); you may not use this file except in compliance with
  6 # the License.  You may obtain a copy of the License at
  7 #
  8 #     http://www.apache.org/licenses/LICENSE-2.0
  9 #
 10 #  Unless required by applicable law or agreed to in writing, software
 11 #  distributed under the License is distributed on an "AS IS" BASIS,
 12 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 13 #  See the License for the specific language governing permissions and
 14 #  limitations under the License.
 15 
 16 
 17 # 所屬集羣名字
 18 brokerClusterName=DefaultCluster
 19 
 20 # broker 名字,注意此處不一樣的配置文件填寫的不同,若是在 broker-a.properties 使用: broker-a,
 21 # 在 broker-b.properties 使用: broker-b
 22 brokerName=broker-a
 23 
 24 # 0 表示 Master,> 0 表示 Slave
 25 brokerId=0
 26 
 27 # nameServer地址,分號分割
 28 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
 29 
 30 # 啓動IP,若是 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
 31 # 解決方式1 加上一句 producer.setVipChannelEnabled(false);,解決方式2 brokerIP1 設置宿主機IP,不要使用docker 內部IP
 32 # brokerIP1=192.168.0.253
 33 
 34 # 在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數
 35 defaultTopicQueueNums=4
 36 
 37 # 是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉 !!!這裏仔細看是 false,false,false
 38 autoCreateTopicEnable=true
 39 
 40 # 是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉
 41 autoCreateSubscriptionGroup=true
 42 
 43 # Broker 對外服務的監聽端口
 44 listenPort=10911
 45 
 46 # 刪除文件時間點,默認凌晨4點
 47 deleteWhen=04
 48 
 49 # 文件保留時間,默認48小時
 50 fileReservedTime=120
 51 
 52 # commitLog 每一個文件的大小默認1G
 53 mapedFileSizeCommitLog=1073741824
 54 
 55 # ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整
 56 mapedFileSizeConsumeQueue=300000
 57 
 58 # destroyMapedFileIntervalForcibly=120000
 59 # redeleteHangedFileInterval=120000
 60 # 檢測物理文件磁盤空間
 61 diskMaxUsedSpaceRatio=88
 62 # 存儲路徑
 63 # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
 64 # commitLog 存儲路徑
 65 # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
 66 # 消費隊列存儲
 67 # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
 68 # 消息索引存儲路徑
 69 # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
 70 # checkpoint 文件存儲路徑
 71 # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
 72 # abort 文件存儲路徑
 73 # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
 74 # 限制的消息大小
 75 maxMessageSize=65536
 76 
 77 # flushCommitLogLeastPages=4
 78 # flushConsumeQueueLeastPages=2
 79 # flushCommitLogThoroughInterval=10000
 80 # flushConsumeQueueThoroughInterval=60000
 81 
 82 # Broker 的角色
 83 # - ASYNC_MASTER 異步複製Master
 84 # - SYNC_MASTER 同步雙寫Master
 85 # - SLAVE
 86 brokerRole=ASYNC_MASTER
 87 
 88 # 刷盤方式
 89 # - ASYNC_FLUSH 異步刷盤
 90 # - SYNC_FLUSH 同步刷盤
 91 flushDiskType=ASYNC_FLUSH
 92 
 93 # 發消息線程池數量
 94 # sendMessageThreadPoolNums=128
 95 # 拉消息線程池數量
 96 # pullMessageThreadPoolNums=128

 

RocketMQ 控制檯

訪問 http://rmqIP:8080 登入控制檯

 

RocketMQ 生產者

 

概述

RocketMQ 是一款開源的分佈式消息系統,基於高可用分佈式集羣技術,提供低延時的、高可靠的消息發佈與訂閱服務。

因爲本教程整個案例基於 Spring Cloud,故咱們採用 Spring Cloud Stream 完成一次發佈和訂閱

官方教程

 

Spring Cloud Stream

Spring Cloud Stream 是一個用於構建基於消息的微服務應用框架。它基於 Spring Boot 來建立具備生產級別的單機 Spring 應用,而且使用 Spring Integration 與 Broker 進行鏈接。

Spring Cloud Stream 提供了消息中間件配置的統一抽象,推出了 publish-subscribeconsumer groupspartition 這些統一的概念。

Spring Cloud Stream 內部有兩個概念:

  • Binder: 跟外部消息中間件集成的組件,用來建立 Binding,各消息中間件都有本身的 Binder 實現。
  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋樑,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據便可,屏蔽了開發者與底層消息中間件的接觸。

 

解決鏈接超時問題

在以前的章節中,咱們採用 Docker 部署了 RocketMQ 服務,此時 RocketMQ Broker 暴露的地址和端口(10909,10911)是基於容器的,會致使咱們開發機沒法鏈接,從而引起 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout 異常

注意下圖中的 IP 地址,這個是容器的 IP,開發機與容器不在一個局域網因此沒法鏈接。

解決方案是在 broker.conf 配置文件中增長 brokerIP1=宿主機IP 便可

 

POM

主要增長了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依賴

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4     <modelVersion>4.0.0</modelVersion>
  5 
  6     <parent>
  7         <groupId>com.funtl</groupId>
  8         <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
  9         <version>1.0.0-SNAPSHOT</version>
 10         <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
 11     </parent>
 12 
 13     <artifactId>hello-spring-cloud-alibaba-rocketmq-provider</artifactId>
 14     <packaging>jar</packaging>
 15 
 16     <name>hello-spring-cloud-alibaba-rocketmq-provider</name>
 17     <url>http://www.funtl.com</url>
 18     <inceptionYear>2018-Now</inceptionYear>
 19 
 20     <dependencies>
 21         <!-- Spring Boot Begin -->
 22         <dependency>
 23             <groupId>org.springframework.boot</groupId>
 24             <artifactId>spring-boot-starter-web</artifactId>
 25         </dependency>
 26         <dependency>
 27             <groupId>org.springframework.boot</groupId>
 28             <artifactId>spring-boot-starter-actuator</artifactId>
 29         </dependency>
 30         <dependency>
 31             <groupId>org.springframework.boot</groupId>
 32             <artifactId>spring-boot-starter-test</artifactId>
 33             <scope>test</scope>
 34         </dependency>
 35         <!-- Spring Boot End -->
 36 
 37         <!-- Spring Cloud Begin -->
 38         <dependency>
 39             <groupId>org.springframework.cloud</groupId>
 40             <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
 41         </dependency>
 42         <!-- Spring Cloud End -->
 43     </dependencies>
 44 
 45     <build>
 46         <plugins>
 47             <plugin>
 48                 <groupId>org.springframework.boot</groupId>
 49                 <artifactId>spring-boot-maven-plugin</artifactId>
 50                 <configuration>
 51                     <mainClass>com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication</mainClass>
 52                 </configuration>
 53             </plugin>
 54         </plugins>
 55     </build>
 56 </project>

 

消息生產者服務

  1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service;
  2 
  3 import org.springframework.beans.factory.annotation.Autowired;
  4 import org.springframework.messaging.MessageChannel;
  5 import org.springframework.messaging.support.MessageBuilder;
  6 import org.springframework.stereotype.Service;
  7 
  8 @Service
  9 public class ProviderService {
 10     @Autowired
 11     private MessageChannel output;
 12 
 13     public void send(String message) {
 14         output.send(MessageBuilder.withPayload(message).build());
 15     }
 16 }

 

Application

配置 Output(Source.class) 的 Binding 信息並配合 @EnableBinding 註解使其生效

  1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider;
  2 
  3 import com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service.ProviderService;
  4 import org.springframework.beans.factory.annotation.Autowired;
  5 import org.springframework.boot.CommandLineRunner;
  6 import org.springframework.boot.SpringApplication;
  7 import org.springframework.boot.autoconfigure.SpringBootApplication;
  8 import org.springframework.cloud.stream.annotation.EnableBinding;
  9 import org.springframework.cloud.stream.messaging.Source;
 10 
 11 @SpringBootApplication
 12 @EnableBinding({Source.class})
 13 public class RocketMQProviderApplication implements CommandLineRunner {
 14 
 15     @Autowired
 16     private ProviderService providerService;
 17 
 18     public static void main(String[] args) {
 19         SpringApplication.run(RocketMQProviderApplication.class, args);
 20     }
 21 
 22     /**
 23      * 實現了 CommandLineRunner 接口,只是爲了 Spring Boot 啓動時執行任務,沒必要特別在乎
 24      * @param args
 25      * @throws Exception
 26      */
 27     @Override
 28     public void run(String... args) throws Exception {
 29         providerService.send("Hello RocketMQ");
 30     }
 31 }

application.yml

  1 spring:
  2   application:
  3     name: rocketmq-provider
  4   cloud:
  5     stream:
  6       rocketmq:
  7         binder:
  8           # RocketMQ 服務器地址
  9           namesrv-addr: 192.168.10.149:9876
 10       bindings:
 11         # 這裏是個 Map 類型參數,{} 爲 YAML 中 Map 的行內寫法
 12         output: {destination: test-topic, content-type: application/json}
 13 
 14 server:
 15   port: 9093
 16 
 17 management:
 18   endpoints:
 19     web:
 20       exposure:
 21         include: '*'

運行成功後便可在 RocketMQ 控制檯的 消息 列表中選擇 test-topic 主題便可看到發送的消息。

 

RocketMQ 消費者

 

POM

主要增長了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依賴

  1 <?xml version="1.0" encoding="UTF-8"?>
  2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4     <modelVersion>4.0.0</modelVersion>
  5 
  6     <parent>
  7         <groupId>com.funtl</groupId>
  8         <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId>
  9         <version>1.0.0-SNAPSHOT</version>
 10         <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath>
 11     </parent>
 12 
 13     <artifactId>hello-spring-cloud-alibaba-rocketmq-consumer</artifactId>
 14     <packaging>jar</packaging>
 15 
 16     <name>hello-spring-cloud-alibaba-rocketmq-consumer</name>
 17     <url>http://www.funtl.com</url>
 18     <inceptionYear>2018-Now</inceptionYear>
 19 
 20     <dependencies>
 21         <!-- Spring Boot Begin -->
 22         <dependency>
 23             <groupId>org.springframework.boot</groupId>
 24             <artifactId>spring-boot-starter-web</artifactId>
 25         </dependency>
 26         <dependency>
 27             <groupId>org.springframework.boot</groupId>
 28             <artifactId>spring-boot-starter-actuator</artifactId>
 29         </dependency>
 30         <dependency>
 31             <groupId>org.springframework.boot</groupId>
 32             <artifactId>spring-boot-starter-test</artifactId>
 33             <scope>test</scope>
 34         </dependency>
 35         <!-- Spring Boot End -->
 36 
 37         <!-- Spring Cloud Begin -->
 38         <dependency>
 39             <groupId>org.springframework.cloud</groupId>
 40             <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
 41         </dependency>
 42         <!-- Spring Cloud End -->
 43     </dependencies>
 44 
 45     <build>
 46         <plugins>
 47             <plugin>
 48                 <groupId>org.springframework.boot</groupId>
 49                 <artifactId>spring-boot-maven-plugin</artifactId>
 50                 <configuration>
 51                     <mainClass>com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer.RocketMQConsumerApplication</mainClass>
 52                 </configuration>
 53             </plugin>
 54         </plugins>
 55     </build>
 56 </project>

消息消費者服務

主要使用 @StreamListener("input") 註解來訂閱從名爲 input 的 Binding 中接收的消息

  1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer.receive;
  2 
  3 import org.springframework.cloud.stream.annotation.StreamListener;
  4 import org.springframework.stereotype.Service;
  5 
  6 @Service
  7 public class ConsumerReceive {
  8 
  9     @StreamListener("input")
 10     public void receiveInput(String message) {
 11         System.out.println("Receive input: " + message);
 12     }
 13 }

Application

配置 Input(Sink.class) 的 Binding 信息並配合 @EnableBinding 註解使其生效

  1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer;
  2 
  3 import org.springframework.boot.SpringApplication;
  4 import org.springframework.boot.autoconfigure.SpringBootApplication;
  5 import org.springframework.cloud.stream.annotation.EnableBinding;
  6 import org.springframework.cloud.stream.messaging.Sink;
  7 
  8 @SpringBootApplication
  9 @EnableBinding({Sink.class})
 10 public class RocketMQConsumerApplication {
 11     public static void main(String[] args) {
 12         SpringApplication.run(RocketMQConsumerApplication.class, args);
 13     }
 14 }

application.yml

  1 spring:
  2   application:
  3     name: rocketmq-consumer
  4   cloud:
  5     stream:
  6       rocketmq:
  7         binder:
  8           namesrv-addr: 192.168.10.149:9876
  9         bindings:
 10           input: {consumer.orderly: true}
 11       bindings:
 12         input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
 13 
 14 server:
 15   port: 9094
 16 
 17 management:
 18   endpoints:
 19     web:
 20       exposure:
 21         include: '*'

運行成功後便可在控制檯接收到消息:Receive input: Hello RocketMQ

 

RocketMQ 自定義 Binding

 

概述

在實際生產中,咱們須要發佈和訂閱的消息可能不止一種 Topic ,故此時就須要使用自定義 Binding 來幫咱們實現多 Topic 的發佈和訂閱功能

 

生產者

自定義 Output 接口,代碼以下:

  1 public interface MySource {
  2     @Output("output1")
  3     MessageChannel output1();
  4 
  5     @Output("output2")
  6     MessageChannel output2();
  7 }

發佈消息的案例代碼以下:

  1 @Autowired
  2 private MySource source;
  3 
  4 public void send(String msg) throws Exception {
  5     source.output1().send(MessageBuilder.withPayload(msg).build());
  6 }

消費者

自定義 Input 接口,代碼以下:

  1 public interface MySink {
  2     @Input("input1")
  3     SubscribableChannel input1();
  4 
  5     @Input("input2")
  6     SubscribableChannel input2();
  7 
  8     @Input("input3")
  9     SubscribableChannel input3();
 10 
 11     @Input("input4")
 12     SubscribableChannel input4();
 13 }

接收消息的案例代碼以下:

  1 @StreamListener("input1")
  2 public void receiveInput1(String receiveMsg) {
  3     System.out.println("input1 receive: " + receiveMsg);
  4 }

Application

配置 Input 和 Output 的 Binding 信息並配合 @EnableBinding 註解使其生效,代碼以下:

  1 @SpringBootApplication
  2 @EnableBinding({ MySource.class, MySink.class })
  3 public class RocketMQApplication {
  4 	public static void main(String[] args) {
  5 		SpringApplication.run(RocketMQApplication.class, args);
  6 	}
  7 }

application.yml

生產者
  1 spring:
  2   application:
  3     name: rocketmq-provider
  4   cloud:
  5     stream:
  6       rocketmq:
  7         binder:
  8           namesrv-addr: 192.168.10.149:9876
  9       bindings:
 10         output1: {destination: test-topic1, content-type: application/json}
 11         output2: {destination: test-topic2, content-type: application/json}
消費者
  1 spring:
  2   application:
  3     name: rocketmq-consumer
  4   cloud:
  5     stream:
  6       rocketmq:
  7         binder:
  8           namesrv-addr: 192.168.10.149:9876
  9         bindings:
 10           input: {consumer.orderly: true}
 11       bindings:
 12         input1: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
 13         input2: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
 14         input3: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
 15         input4: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}

 

#感謝閱覽#

引用:千峯

相關文章
相關標籤/搜索