Message Queue(MQ),消息隊列中間件。不少人都說:MQ 經過將消息的發送和接收分離來實現應用程序的異步和解偶,這個給人的直覺是——MQ 是異步的,用來解耦的,可是這個只是 MQ 的效果而不是目的。MQ 真正的目的是爲了通信,屏蔽底層複雜的通信協議,定義了一套應用層的、更加簡單的通信協議。一個分佈式系統中兩個模塊之間通信要麼是 HTTP,要麼是本身開發的 TCP,可是這兩種協議其實都是原始的協議。HTTP 協議很難實現兩端通信——模塊 A 能夠調用 B,B 也能夠主動調用 A,若是要作到這個兩端都要背上 WebServer,並且還不支持長鏈接(HTTP 2.0 的庫根本找不到)。TCP 就更加原始了,粘包、心跳、私有的協議,想想頭皮就發麻。MQ 所要作的就是在這些協議之上構建一個簡單的「協議」——生產者/消費者模型。MQ 帶給個人「協議」不是具體的通信協議,而是更高層次通信模型。它定義了兩個對象——發送數據的叫生產者;接收數據的叫消費者, 提供一個 SDK 讓咱們能夠定義本身的生產者和消費者實現消息通信而無視底層通信協議git
這個流派一般有一臺服務器做爲 Broker,全部的消息都經過它中轉。生產者把消息發送給它就結束本身的任務了,Broker 則把消息主動推送給消費者(或者消費者主動輪詢)github
kafka、JMS(ActiveMQ)就屬於這個流派,生產者會發送 key 和數據到 Broker,由 Broker 比較 key 以後決定給哪一個消費者。這種模式是咱們最多見的模式,是咱們對 MQ 最多的印象。在這種模式下一個 topic 每每是一個比較大的概念,甚至一個系統中就可能只有一個 topic,topic 某種意義上就是 queue,生產者發送 key 至關於說:「hi,把數據放到 key 的隊列中」web
如上圖所示,Broker 定義了三個隊列,key1,key2,key3,生產者發送數據的時候會發送 key1 和 data,Broker 在推送數據的時候則推送 data(也可能把 key 帶上)。spring
雖然架構同樣可是 kafka 的性能要比 jms 的性能不知道高到多少倍,因此基本這種類型的 MQ 只有 kafka 一種備選方案。若是你須要一條暴力的數據流(在意性能而非靈活性)那麼 kafka 是最好的選擇docker
這種的表明是 RabbitMQ(或者說是 AMQP)。生產者發送 key 和數據,消費者定義訂閱的隊列,Broker 收到數據以後會經過必定的邏輯計算出 key 對應的隊列,而後把數據交給隊列express
這種模式下解耦了 key 和 queue,在這種架構中 queue 是很是輕量級的(在 RabbitMQ 中它的上限取決於你的內存),消費者關心的只是本身的 queue;生產者沒必要關心數據最終給誰只要指定 key 就好了,中間的那層映射在 AMQP 中叫 exchange(交換機)。apache
AMQP 中有四種 exchangejson
這種結構的架構給通信帶來了很大的靈活性,咱們能想到的通信方式均可以用這四種 exchange 表達出來。若是你須要一個企業數據總線(在意靈活性)那麼 RabbitMQ 絕對的值得一用服務器
無 Broker 的 MQ 的表明是 ZeroMQ。該做者很是睿智,他很是敏銳的意識到——MQ 是更高級的 Socket,它是解決通信問題的。因此 ZeroMQ 被設計成了一個「庫」而不是一箇中間件,這種實現也能夠達到——沒有 Broker 的目的:網絡
節點之間通信的消息都是發送到彼此的隊列中,每一個節點都既是生產者又是消費者。ZeroMQ 作的事情就是封裝出一套相似於 Socket 的 API 能夠完成發送數據,讀取數據
ZeroMQ 其實就是一個跨語言的、重量級的 Actor 模型郵箱庫。你能夠把本身的程序想象成一個 Actor,ZeroMQ 就是提供郵箱功能的庫;ZeroMQ 能夠實現同一臺機器的 RPC 通信也能夠實現不一樣機器的 TCP、UDP 通信,若是你須要一個強大的、靈活、野蠻的通信能力,別猶豫 ZeroMQ
消息隊列做爲高併發系統的核心組件之一,可以幫助業務系統解構提高開發效率和系統穩定性。主要具備如下優點:
Apache Alibaba RocketMQ 是一個消息中間件。消息中間件中有兩個角色:消息生產者和消息消費者。RocketMQ 裏一樣有這兩個概念,消息生產者負責建立消息併發送到 RocketMQ 服務器,RocketMQ 服務器會將消息持久化到磁盤,消息消費者從 RocketMQ 服務器拉取消息並提交給應用消費。
RocketMQ 是一款分佈式、隊列模型的消息中間件,具備如下特色:
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要優點有:
注意:啓動 RocketMQ Server + Broker + Console 至少須要 2G 內存
1 version: '3.5' 2 services: 3 rmqnamesrv: 4 image: foxiswho/rocketmq:server 5 container_name: rmqnamesrv 6 ports: 7 - 9876:9876 8 volumes: 9 - ./data/logs:/opt/logs 10 - ./data/store:/opt/store 11 networks: 12 rmq: 13 aliases: 14 - rmqnamesrv 15 16 rmqbroker: 17 image: foxiswho/rocketmq:broker 18 container_name: rmqbroker 19 ports: 20 - 10909:10909 21 - 10911:10911 22 volumes: 23 - ./data/logs:/opt/logs 24 - ./data/store:/opt/store 25 - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf 26 environment: 27 NAMESRV_ADDR: "rmqnamesrv:9876" 28 JAVA_OPTS: " -Duser.home=/opt" 29 JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m" 30 command: mqbroker -c /etc/rocketmq/broker.conf 31 depends_on: 32 - rmqnamesrv 33 networks: 34 rmq: 35 aliases: 36 - rmqbroker 37 38 rmqconsole: 39 image: styletang/rocketmq-console-ng 40 container_name: rmqconsole 41 ports: 42 - 8080:8080 43 environment: 44 JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" 45 depends_on: 46 - rmqnamesrv 47 networks: 48 rmq: 49 aliases: 50 - rmqconsole 51 52 networks: 53 rmq: 54 name: rmq 55 driver: bridge
RocketMQ Broker 須要一個配置文件,按照上面的 Compose 配置,咱們須要在 ./data/brokerconf/
目錄下建立一個名爲 broker.conf
的配置文件,內容以下:
1 # Licensed to the Apache Software Foundation (ASF) under one or more 2 # contributor license agreements. See the NOTICE file distributed with 3 # this work for additional information regarding copyright ownership. 4 # The ASF licenses this file to You under the Apache License, Version 2.0 5 # (the "License"); you may not use this file except in compliance with 6 # the License. You may obtain a copy of the License at 7 # 8 # http://www.apache.org/licenses/LICENSE-2.0 9 # 10 # Unless required by applicable law or agreed to in writing, software 11 # distributed under the License is distributed on an "AS IS" BASIS, 12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 # See the License for the specific language governing permissions and 14 # limitations under the License. 15 16 17 # 所屬集羣名字 18 brokerClusterName=DefaultCluster 19 20 # broker 名字,注意此處不一樣的配置文件填寫的不同,若是在 broker-a.properties 使用: broker-a, 21 # 在 broker-b.properties 使用: broker-b 22 brokerName=broker-a 23 24 # 0 表示 Master,> 0 表示 Slave 25 brokerId=0 26 27 # nameServer地址,分號分割 28 # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 29 30 # 啓動IP,若是 docker 報 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed 31 # 解決方式1 加上一句 producer.setVipChannelEnabled(false);,解決方式2 brokerIP1 設置宿主機IP,不要使用docker 內部IP 32 # brokerIP1=192.168.0.253 33 34 # 在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 35 defaultTopicQueueNums=4 36 37 # 是否容許 Broker 自動建立 Topic,建議線下開啓,線上關閉 !!!這裏仔細看是 false,false,false 38 autoCreateTopicEnable=true 39 40 # 是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 41 autoCreateSubscriptionGroup=true 42 43 # Broker 對外服務的監聽端口 44 listenPort=10911 45 46 # 刪除文件時間點,默認凌晨4點 47 deleteWhen=04 48 49 # 文件保留時間,默認48小時 50 fileReservedTime=120 51 52 # commitLog 每一個文件的大小默認1G 53 mapedFileSizeCommitLog=1073741824 54 55 # ConsumeQueue 每一個文件默認存 30W 條,根據業務狀況調整 56 mapedFileSizeConsumeQueue=300000 57 58 # destroyMapedFileIntervalForcibly=120000 59 # redeleteHangedFileInterval=120000 60 # 檢測物理文件磁盤空間 61 diskMaxUsedSpaceRatio=88 62 # 存儲路徑 63 # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store 64 # commitLog 存儲路徑 65 # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog 66 # 消費隊列存儲 67 # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue 68 # 消息索引存儲路徑 69 # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index 70 # checkpoint 文件存儲路徑 71 # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint 72 # abort 文件存儲路徑 73 # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort 74 # 限制的消息大小 75 maxMessageSize=65536 76 77 # flushCommitLogLeastPages=4 78 # flushConsumeQueueLeastPages=2 79 # flushCommitLogThoroughInterval=10000 80 # flushConsumeQueueThoroughInterval=60000 81 82 # Broker 的角色 83 # - ASYNC_MASTER 異步複製Master 84 # - SYNC_MASTER 同步雙寫Master 85 # - SLAVE 86 brokerRole=ASYNC_MASTER 87 88 # 刷盤方式 89 # - ASYNC_FLUSH 異步刷盤 90 # - SYNC_FLUSH 同步刷盤 91 flushDiskType=ASYNC_FLUSH 92 93 # 發消息線程池數量 94 # sendMessageThreadPoolNums=128 95 # 拉消息線程池數量 96 # pullMessageThreadPoolNums=128
訪問 http://rmqIP:8080 登入控制檯
RocketMQ 是一款開源的分佈式消息系統,基於高可用分佈式集羣技術,提供低延時的、高可靠的消息發佈與訂閱服務。
因爲本教程整個案例基於 Spring Cloud,故咱們採用 Spring Cloud Stream 完成一次發佈和訂閱
Spring Cloud Stream 是一個用於構建基於消息的微服務應用框架。它基於 Spring Boot 來建立具備生產級別的單機 Spring 應用,而且使用 Spring Integration
與 Broker 進行鏈接。
Spring Cloud Stream 提供了消息中間件配置的統一抽象,推出了 publish-subscribe
、consumer groups
、partition
這些統一的概念。
Spring Cloud Stream 內部有兩個概念:
Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋樑,實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據便可,屏蔽了開發者與底層消息中間件的接觸。
在以前的章節中,咱們採用 Docker 部署了 RocketMQ 服務,此時 RocketMQ Broker 暴露的地址和端口(10909,10911)是基於容器的,會致使咱們開發機沒法鏈接,從而引起 org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
異常
注意下圖中的 IP 地址,這個是容器的 IP,開發機與容器不在一個局域網因此沒法鏈接。
解決方案是在 broker.conf
配置文件中增長 brokerIP1=宿主機IP
便可
主要增長了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq
依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <parent> 7 <groupId>com.funtl</groupId> 8 <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId> 9 <version>1.0.0-SNAPSHOT</version> 10 <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath> 11 </parent> 12 13 <artifactId>hello-spring-cloud-alibaba-rocketmq-provider</artifactId> 14 <packaging>jar</packaging> 15 16 <name>hello-spring-cloud-alibaba-rocketmq-provider</name> 17 <url>http://www.funtl.com</url> 18 <inceptionYear>2018-Now</inceptionYear> 19 20 <dependencies> 21 <!-- Spring Boot Begin --> 22 <dependency> 23 <groupId>org.springframework.boot</groupId> 24 <artifactId>spring-boot-starter-web</artifactId> 25 </dependency> 26 <dependency> 27 <groupId>org.springframework.boot</groupId> 28 <artifactId>spring-boot-starter-actuator</artifactId> 29 </dependency> 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 </dependency> 35 <!-- Spring Boot End --> 36 37 <!-- Spring Cloud Begin --> 38 <dependency> 39 <groupId>org.springframework.cloud</groupId> 40 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> 41 </dependency> 42 <!-- Spring Cloud End --> 43 </dependencies> 44 45 <build> 46 <plugins> 47 <plugin> 48 <groupId>org.springframework.boot</groupId> 49 <artifactId>spring-boot-maven-plugin</artifactId> 50 <configuration> 51 <mainClass>com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.RocketMQProviderApplication</mainClass> 52 </configuration> 53 </plugin> 54 </plugins> 55 </build> 56 </project>
1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service; 2 3 import org.springframework.beans.factory.annotation.Autowired; 4 import org.springframework.messaging.MessageChannel; 5 import org.springframework.messaging.support.MessageBuilder; 6 import org.springframework.stereotype.Service; 7 8 @Service 9 public class ProviderService { 10 @Autowired 11 private MessageChannel output; 12 13 public void send(String message) { 14 output.send(MessageBuilder.withPayload(message).build()); 15 } 16 }
配置 Output(Source.class
) 的 Binding 信息並配合 @EnableBinding
註解使其生效
1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.provider; 2 3 import com.funtl.hello.spring.cloud.alibaba.rocketmq.provider.service.ProviderService; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.boot.CommandLineRunner; 6 import org.springframework.boot.SpringApplication; 7 import org.springframework.boot.autoconfigure.SpringBootApplication; 8 import org.springframework.cloud.stream.annotation.EnableBinding; 9 import org.springframework.cloud.stream.messaging.Source; 10 11 @SpringBootApplication 12 @EnableBinding({Source.class}) 13 public class RocketMQProviderApplication implements CommandLineRunner { 14 15 @Autowired 16 private ProviderService providerService; 17 18 public static void main(String[] args) { 19 SpringApplication.run(RocketMQProviderApplication.class, args); 20 } 21 22 /** 23 * 實現了 CommandLineRunner 接口,只是爲了 Spring Boot 啓動時執行任務,沒必要特別在乎 24 * @param args 25 * @throws Exception 26 */ 27 @Override 28 public void run(String... args) throws Exception { 29 providerService.send("Hello RocketMQ"); 30 } 31 }
1 spring: 2 application: 3 name: rocketmq-provider 4 cloud: 5 stream: 6 rocketmq: 7 binder: 8 # RocketMQ 服務器地址 9 namesrv-addr: 192.168.10.149:9876 10 bindings: 11 # 這裏是個 Map 類型參數,{} 爲 YAML 中 Map 的行內寫法 12 output: {destination: test-topic, content-type: application/json} 13 14 server: 15 port: 9093 16 17 management: 18 endpoints: 19 web: 20 exposure: 21 include: '*'
運行成功後便可在 RocketMQ 控制檯的 消息
列表中選擇 test-topic
主題便可看到發送的消息。
主要增長了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq
依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 6 <parent> 7 <groupId>com.funtl</groupId> 8 <artifactId>hello-spring-cloud-alibaba-dependencies</artifactId> 9 <version>1.0.0-SNAPSHOT</version> 10 <relativePath>../hello-spring-cloud-alibaba-dependencies/pom.xml</relativePath> 11 </parent> 12 13 <artifactId>hello-spring-cloud-alibaba-rocketmq-consumer</artifactId> 14 <packaging>jar</packaging> 15 16 <name>hello-spring-cloud-alibaba-rocketmq-consumer</name> 17 <url>http://www.funtl.com</url> 18 <inceptionYear>2018-Now</inceptionYear> 19 20 <dependencies> 21 <!-- Spring Boot Begin --> 22 <dependency> 23 <groupId>org.springframework.boot</groupId> 24 <artifactId>spring-boot-starter-web</artifactId> 25 </dependency> 26 <dependency> 27 <groupId>org.springframework.boot</groupId> 28 <artifactId>spring-boot-starter-actuator</artifactId> 29 </dependency> 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-test</artifactId> 33 <scope>test</scope> 34 </dependency> 35 <!-- Spring Boot End --> 36 37 <!-- Spring Cloud Begin --> 38 <dependency> 39 <groupId>org.springframework.cloud</groupId> 40 <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> 41 </dependency> 42 <!-- Spring Cloud End --> 43 </dependencies> 44 45 <build> 46 <plugins> 47 <plugin> 48 <groupId>org.springframework.boot</groupId> 49 <artifactId>spring-boot-maven-plugin</artifactId> 50 <configuration> 51 <mainClass>com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer.RocketMQConsumerApplication</mainClass> 52 </configuration> 53 </plugin> 54 </plugins> 55 </build> 56 </project>
主要使用 @StreamListener("input")
註解來訂閱從名爲 input
的 Binding 中接收的消息
1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer.receive; 2 3 import org.springframework.cloud.stream.annotation.StreamListener; 4 import org.springframework.stereotype.Service; 5 6 @Service 7 public class ConsumerReceive { 8 9 @StreamListener("input") 10 public void receiveInput(String message) { 11 System.out.println("Receive input: " + message); 12 } 13 }
配置 Input(Sink.class
) 的 Binding 信息並配合 @EnableBinding
註解使其生效
1 package com.funtl.hello.spring.cloud.alibaba.rocketmq.consumer; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 import org.springframework.cloud.stream.annotation.EnableBinding; 6 import org.springframework.cloud.stream.messaging.Sink; 7 8 @SpringBootApplication 9 @EnableBinding({Sink.class}) 10 public class RocketMQConsumerApplication { 11 public static void main(String[] args) { 12 SpringApplication.run(RocketMQConsumerApplication.class, args); 13 } 14 }
1 spring: 2 application: 3 name: rocketmq-consumer 4 cloud: 5 stream: 6 rocketmq: 7 binder: 8 namesrv-addr: 192.168.10.149:9876 9 bindings: 10 input: {consumer.orderly: true} 11 bindings: 12 input: {destination: test-topic, content-type: text/plain, group: test-group, consumer.maxAttempts: 1} 13 14 server: 15 port: 9094 16 17 management: 18 endpoints: 19 web: 20 exposure: 21 include: '*'
運行成功後便可在控制檯接收到消息:Receive input: Hello RocketMQ
在實際生產中,咱們須要發佈和訂閱的消息可能不止一種 Topic ,故此時就須要使用自定義 Binding 來幫咱們實現多 Topic 的發佈和訂閱功能
自定義 Output 接口,代碼以下:
1 public interface MySource { 2 @Output("output1") 3 MessageChannel output1(); 4 5 @Output("output2") 6 MessageChannel output2(); 7 }
發佈消息的案例代碼以下:
1 @Autowired 2 private MySource source; 3 4 public void send(String msg) throws Exception { 5 source.output1().send(MessageBuilder.withPayload(msg).build()); 6 }
自定義 Input 接口,代碼以下:
1 public interface MySink { 2 @Input("input1") 3 SubscribableChannel input1(); 4 5 @Input("input2") 6 SubscribableChannel input2(); 7 8 @Input("input3") 9 SubscribableChannel input3(); 10 11 @Input("input4") 12 SubscribableChannel input4(); 13 }
接收消息的案例代碼以下:
1 @StreamListener("input1") 2 public void receiveInput1(String receiveMsg) { 3 System.out.println("input1 receive: " + receiveMsg); 4 }
配置 Input 和 Output 的 Binding 信息並配合 @EnableBinding
註解使其生效,代碼以下:
1 @SpringBootApplication 2 @EnableBinding({ MySource.class, MySink.class }) 3 public class RocketMQApplication { 4 public static void main(String[] args) { 5 SpringApplication.run(RocketMQApplication.class, args); 6 } 7 }
1 spring: 2 application: 3 name: rocketmq-provider 4 cloud: 5 stream: 6 rocketmq: 7 binder: 8 namesrv-addr: 192.168.10.149:9876 9 bindings: 10 output1: {destination: test-topic1, content-type: application/json} 11 output2: {destination: test-topic2, content-type: application/json}
1 spring: 2 application: 3 name: rocketmq-consumer 4 cloud: 5 stream: 6 rocketmq: 7 binder: 8 namesrv-addr: 192.168.10.149:9876 9 bindings: 10 input: {consumer.orderly: true} 11 bindings: 12 input1: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1} 13 input2: {destination: test-topic1, content-type: text/plain, group: test-group, consumer.maxAttempts: 1} 14 input3: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1} 15 input4: {destination: test-topic2, content-type: text/plain, group: test-group, consumer.maxAttempts: 1}
引用:千峯