MQ,Message Queue,基於TCP協議構建的簡單協議,區別於具體的通訊協議。java
基於通訊協議定義和抽象的更高層次的通訊模型,通常都是生產者和消費者模型,又或者說服務端和客戶端模型。web
生產者/消費者模型:通常經過定義生產者和消費者實現消息通訊從而屏蔽複雜的底層通訊協議。應用於分佈式應用系統,並且爲之提供異步解耦和削峯填谷的能力,同時也具有互聯網應用所需的海量堆積,高吞吐和可靠性重試機制的特性。spring
數據結構:消息隊列的數據結構採用FIFO方式來定義與實現
設計模式:採用觀察者模式docker
觀察者模式:定義對象間一對多的依賴關係,當一個對象的狀態發生改變時,全部依賴於它的對象都獲得通知自動更新apache
NameServer:名稱服務器[MQ命名空間服務器],大體至關於 jndi技術,更新和發現 broker服務。用於保存Broker相關元信息,並給生產者和消費者查找Broker消息。每一個Broker在啓動都會在名稱服務器[NameServer]註冊,生產者在發送消息前會根據消息主題到名稱服務器查詢獲取Broker路由消息,消費者也會定時獲取主題的路由消息。編程
ps:屬於無狀態服務設計,可橫向擴展,節點之間無通訊,能夠部署多臺機器來標識僞集羣。centos
Broker:消息存儲中心[消息中轉角色],負責存儲和轉發消息。接收來自生產者的消息並進行存儲,消費者從這拉取消息。存儲與消息相關的元數據,主要包括用戶組,消息進度偏移量,隊列消息等。其中Broker分爲Master和Slave節點:設計模式
ps:一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master。Master和Slave的對應關係經過指定相同的BrokerName,不一樣的BrokerId來定義。BrokerId爲0表示Master,BrokerId非0表示Slave。而後全部的Broker和Name Server上的節點創建長鏈接,定時註冊Topic信息到全部Name Server。服務器
Producer:消息生產者->負責生產消息,生產者向消息服務器發送業務應用程序生成的消息。主要有同步發送和異步發送方式兩種,其中:網絡
ps:Producer與Name Server其中一個節點創建鏈接。按期從Name Server取Topic信息。並與提供該Topic信息的Master創建長鏈接。Producer也能夠集羣部署。
Consumer:消息消費者 負責消費消息,從消息服務器拉取消息並將其輸入用戶應用程序中。主要分爲拉取型消費者和推送型消費者:
拉取型消費者:Pull Consumer->主動從消息服務器拉取消息,只要批量拉取消息,用戶就會啓動消費過程
推送型消費者:Push Consumer->封裝消息的拉取,消費進度和其它內部維護工做,消息到達以後便執行回調接口留給用戶應用程序來實現。屬於被動消費類型,Push拉取時須要註冊消息費者監聽器,當監聽器被觸發以後開始消費消息。
ps:Consumer 與Name Server 集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server 取Topic 路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。
拉取rocketmq鏡像:docker pull foxiswho/rocketmq
1.查詢鏡像images:docker search rocketmq
[root@centos-meteor ~]# docker search rocketmq NAME DESCRIPTION STARS OFFICIAL AUTOMATED styletang/rocketmq-console-ng rocketmq-console-ng 20 rocketmqinc/rocketmq Image repository for Apache RocketMQ 17 foxiswho/rocketmq rocketmq 14 laoyumi/rocketmq 10 [OK] xlxwhy/rocketmq alibaba's rocketmq 4 huanwei/rocketmq-broker 2 2019liurui/rocketmq-broker RocketMQ broker image for RocketMQ-Operator 1 2019liurui/rocketmq-namesrv RocketMQ name service image for RocketMQ-Ope… 1 apacherocketmq/rocketmq Docker Image for Apache RocketMQ 1 rocketmqinc/rocketmq-operator The Kubernetes operator for RocketMQ 0 2019liurui/rocketmq-operator Kubernetes Operator for RocketMQ ! 0 apacherocketmq/rocketmq-operator RocketMQ Operator is to manage RocketMQ serv… 0 coder4/rocketmq rocketmq 0 [OK] rocketmqinc/rocketmq-namesrv Customized RocketMQ Name Server Image for Ro… 0 rocketmqinc/rocketmq-broker Customized RocketMQ Broker Image for RocketM… 0 slpcat/rocketmq-console-ng 0 huanwei/rocketmq 0 huanwei/rocketmq-broker-k8s 0 king019/rocketmq rocketmq 0 pengzu/rocketmq-console-ng web console for rocketmq ,this code is from … 0 fengzt/rocketmq-broker apache rocketmq 4.2.0 broker server(官方文檔… 0 huanwei/rocketmq-operator 0 slpcat/rocketmq 0 fengzt/rocketmq-nameserver apache rocketmq 4.2.0 nameserver 0 icyblazek/rocketmq RocketMQ 0 [root@centos-meteor ~]#
2.執行:docker pull foxiswho/rocketmq
3.建立docker存儲根目錄而且受權:
mkdir docker && chmod -R 777 docker/
[root@centos-meteor /]# cd docker/ [root@centos-meteor docker]# pwd /docker [root@centos-meteor docker]#
4.部署名稱服務器rocketmq-namesrv-server[9876]:
docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv
docker run -itd --restart=always --privileged=true -p 9876:9876 --name rocketmq-namesrv-server -v /docker/rocketmq/namesrv/logs:/root/rocketmq/logs -v /docker/rocketmq/namesrv/store:/root/rocketmq/store -e "MAX_POSSIBLE_HEAP=100000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqnamesrv
ps:建立/docker/rocketmq/namesrv/logs和/docker/rocketmq/namesrv/store目錄
5.部署消息服務器rocketmq-broker-server[10911]:
docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
docker run -itd --restart=always --privileged=true -p 10909:10909 -p 10911:10911 -p 10912:10912 --name rocketmq-broker-server --link rocketmq-namesrv-server:namesrv -v /docker/rocketmq/broker/logs:/root/rocketmq/logs -v /docker/rocketmq/broker/store:/root/rocketmq/store -v /docker/rocketmq/broker/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" -e "JAVA_OPTS=-Duser.home=/opt" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf
ps:
1.建立/docker/rocketmq/broker/logs和/docker/rocketmq/broker/store目錄
2.編寫broker.conf配置文件:
brokerClusterName = rocketmq-cluster brokerName = broker-server brokerId = 0 deleteWhen = 04 fileReservedTime = 48 # Broker 的角色 # - ASYNC_MASTER 異步複製Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRote=ASYNC_MASTER brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 若是是本地程序調用雲主機 mq,這個須要設置成 雲主機 IP brokerIP1=Server-IP #限制的消息大小 maxMessageSize=65536 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=50000000 #併發send線程數,多線程來發送消息可能會出現broker busy sendMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=8 highSpeedMode=false commercialBaseCount=1 maxErrorRateOfBloomFilter=20 accessMessageInMemoryMaxRatio=40 #無讀寫客戶端存活時間 clientChannelMaxIdleTimeSeconds=120 flushDelayOffsetInterval=10000 serverSocketRcvBufSize=131072 #單次 Pull 消息(內存)傳輸的 最大字節數 maxTransferBytesOnMessageInMemory=262144 clientManageThreadPoolNums=32 serverChannelMaxIdleTimeSeconds=120 serverCallbackExecutorThreads=0 enablePropertyFilter=false transientStorePoolSize=5 enableConsumeQueueExt=false #rocketmq server config serverPooledByteBufAllocatorEnable=true serverSocketRcvBufSize=131072 #rocketmq client config
6.部署控制後臺rocketmq-consloe-server[8082]:
docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=47.104.22.10:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest
docker run -itd -p 8082:8080 --restart=always --privileged=true --name rocketmq-console-server -v /docker/rocketmq/console/data:/tmp -e "JAVA_OPTS=-Drocketmq.namesrv.addr=Server-IP:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -e "JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m -Duser.home=/opt" styletang/rocketmq-console-ng:latest
ps:
1.建立/docker/rocketmq/console/data目錄
2.Server-IP爲broker.conf配置brokerIP1值
7.最終部署結果:
1.配置Rocketmq的Maven依賴:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
ps:
1.目前rocketmq最新版本是4.7.1:
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency>
2.這裏選擇的是rocketmq版本[4.4.0],由於部署是4.4.0版本,推薦部署版本相同的rocketmq
2.建立pivotal-cloud-queue工程:
ps:建立消息隊列統一模塊工程,在業務應用程序服務模塊依賴該工程,並封裝消息生產者[Producer]和消息消費者[Consumer]處理類。
3.封裝rocketmq屬性配置類:
添加spring-boot-configuration-processor依賴:
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency>
編寫RocketmqProperties屬性配置類:
package com.pivotal.cloud.queue.properties; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.properties.RocketmqProperties * @title: RocketmqProperties * @description: 封裝Pivotal項目RocketmqProperties類 * @content: PivotalCloud項目系統RocketmqProperties自定義屬性配置類 * @author: marklin * @datetime: 2020-07-07 01:32 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @Data @Configuration @ConfigurationProperties(prefix = "pivotal.cloud.rocketmq") public class RocketmqProperties { /** * rocketmq消息隊列生產者-Producer */ private final Producer producer = new Producer(); /** * rocketmq消息隊列消費者-Consumer */ private final Consumer consumer = new Consumer(); @Data public static class Producer { } @Data public static class Consumer { } }
package com.pivotal.cloud.queue.configuration; import com.pivotal.cloud.queue.properties.RocketmqProperties; import lombok.AllArgsConstructor; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; /** * @className: com.pivotal.cloud.queue.configuration.RocketmqConfiguration * @title: RocketmqConfiguration * @description: 封裝Pivotal項目RocketmqConfiguration類 * @content: //TODO * @author: marklin * @datetime: 2020-07-07 02:25 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ @AllArgsConstructor @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties({RocketmqProperties.class}) public class RocketmqConfiguration { }
編寫RocketmqTemplate生產者模板類:
package com.pivotal.cloud.queue.template; /** * @className: com.pivotal.cloud.queue.template.RocketmqTemplate * @title: RocketmqTemplate * @description: 封裝Pivotal項目RocketmqTemplate類 * @content: PivotalCloud項目系統RocketmqTemplate生產者模板類 * @author: marklin * @datetime: 2020-07-07 02:48 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public class RocketmqTemplate { }
編寫RocketmqListener消費者監聽器類:
package com.pivotal.cloud.queue.listener; /** * @className: com.pivotal.cloud.queue.listener.RocketmqListener * @title: RocketmqListener * @description: 封裝Pivotal項目RocketmqListener類 * @content: PivotalCloud項目系統RocketmqListener消費者監聽器類 * @author: marklin * @datetime: 2020-07-07 02:52 * @version: 1.0.0 * @copyright: Copyright © 2018-2020 Pivotal Systems Incorporated. All rights reserved. */ public interface RocketmqListener { }
編寫META-INF/spring.factories工廠類:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.pivotal.cloud.queue.configuration.RocketmqConfiguration,\ com.pivotal.cloud.queue.configuration.RabbitmqConfiguration,\ com.pivotal.cloud.queue.configuration.ActivemqConfiguration,\ com.pivotal.cloud.queue.configuration.KafkaConfiguration,