rabbitMQ消息隊列原理

MQ:Message Queue,消息隊列,是一種應用程序對應用程序的通訊方法;應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。html

 

1      rabbitMQ入門及原理

rabbitMQ官網:http://www.rabbitmq.com/git

Erlang官網:http://www.erlang.org/github

1.1  rabbitMQ概述

RabbitMQ是一個由Erlang開發的AMQPAdvancedMessage Queue )的開源實現,支持多種客戶端,如:PythonRuby.NETJavaJMSCPHPActionScriptXMPPSTOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。web

借用網絡中一個rabbitMQ的系統架構圖:redis

d77e4cfd3d9dc6bf328c7db4b60fbc95.jpg

49e36a7d3c1523e781dac3fa753337d0.png

 

1.1.1     AMQP簡介

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。--百度百科算法

Message BrokerAMQP簡介spring

Message Broker是一種消息驗證、傳輸、路由的架構模式,其設計目標主要應用於下面這些場景:編程

  • 消息路由到一個或多個目的地windows

  • 消息轉化爲其餘的表現方式緩存

  • 執行消息的彙集、消息的分解,並將結果發送到他們的目的地,而後從新組合相應返回給消息用戶

  • 調用Web服務來檢索數據

  • 響應事件或錯誤

  • 使用發佈-訂閱模式來提供內容或基於主題的消息路由

AMQP是Advanced Message QueuingProtocol的簡稱,它是一個面向消息中間件的開放式標準應用層協議。AMQP定義了這些特性:

  • 消息方向

  • 消息隊列

  • 消息路由(包括:點到點和發佈-訂閱模式)

  • 可靠性

  • 安全性

RabbitMQ就是以AMQP協議實現的一種中間件產品,它能夠支持多種操做系統,多種編程語言,幾乎能夠覆蓋全部主流的企業級技術平臺。

1.1.1.1             AMQP理論

AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。

簡單介紹AMQP的協議棧,AMQP協議自己包含三層,以下:

79c3f2ad461558298780c4cbe3bfdda6.png

Model Layer,位於協議最高層,主要定義了一些供客戶端調用的命令,客戶端能夠經過這些命令實現本身的業務邏輯,例如,客戶端能夠經過queue declare聲明一個隊列,利用consume命令獲取隊列的消息。

Session Layer,主要負責將客戶端命令發送給服務器,在將服務器端的應答返回給客戶端,主要爲客戶端與服務器之間通訊提供可靠性、同步機制和錯誤處理。

Transport Layer,主要傳輸二進制數據流,提供幀的處理、信道複用、錯誤檢測和數據表示。

這種分層架構相似於OSI網絡協議,可替換各層實現而不影響與其它層的交互。AMQP定義了合適的服務器端域模型,用於規範服務器的行爲(AMQP服務器端可稱爲broker)。在這裏Model層決定這些基本域模型所產生的行爲,這種行爲在AMQP中用command表示。Session層定義客戶端與broker之間的通訊(通訊雙方都是一個peer,可互稱作partner),爲command的可靠傳輸提供保障。Transport層專一於數據傳送,並與Session保持交互,接受上層的數據,組裝成二進制流,傳送到receiver後再解析數據,交付給Session層。Session層須要Transport層完成網絡異常狀況的彙報,順序傳送command等工做。

接下來了解下AMQP當中的一些概念。

BrokerServer):接受客戶端鏈接,實現AMQP消息隊列和路由功能的進程。

Virtual Host:實際上是一個虛擬概念,相似於權限控制組,一個Virtual Host裏面能夠有若干個ExchangeQueue,可是權限控制的最小粒度是Virtual Host

Exchange:接受生產者發送的消息,並根據Binding規則將消息路由給服務器中的隊列。ExchangeType決定了Exchange路由消息的行爲,例如,在RabbitMQ中,ExchangeTypedirectFanoutTopic三種,不一樣類型的Exchange路由的行爲是不同的。

Message Queue:消息隊列,用於存儲還未被消費者消費的消息。

Message:由HeaderBody組成,Header是由生產者添加的各類屬性的集合,包括Message是否被持久化、由哪一個Message Queue接受、優先級是多少等。而Body是真正須要傳輸的APP數據。

BindingBinding聯繫了ExchangeMessageQueueExchange在與多個MessageQueue發生Binding後會生成一張路由表,路由表中存儲着Message Queue所需消息的限制條件即Binding Key。當Exchange收到Message時會解析其Header獲得Routing KeyExchange根據Routing KeyExchangeTypeMessage路由到MessageQueueBinding KeyConsumerBinding ExchangeMessageQueue時指定,而Routing KeyProducer發送Message時指定,二者的匹配方式由ExchangeType決定。

Connection:鏈接,對於RabbitMQ而言,其實就是一個位於客戶端和Broker之間的TCP鏈接。

Channel:信道,僅僅建立了客戶端到Broker之間的鏈接後,客戶端仍是不能發送消息的。須要爲每個Connection建立ChannelAMQP協議規定只有經過Channel才能執行AMQP的命令。一個Connection能夠包含多個Channel。之因此須要Channel,是由於TCP鏈接的創建和釋放都是十分昂貴的,若是一個客戶端每個線程都須要與Broker交互,若是每個線程都創建一個TCP鏈接,暫且不考慮TCP鏈接是否浪費,就算操做系統也沒法承受每秒創建如此多的TCP鏈接。RabbitMQ建議客戶端線程之間不要共用Channel,至少要保證共用Channel的線程發送消息必須是串行的,可是建議儘可能共用Connection

CommandAMQP的命令,客戶端經過Command完成與AMQP服務器的交互來實現自身的邏輯。例如在RabbitMQ中,客戶端能夠經過publish命令發送消息,txSelect開啓一個事務,txCommit提交一個事務。

消息中間件的主要功能是消息的路由(Routing)和緩存(Buffering)。在AMQP中提供相似功能的兩種域模型:Exchange Messagequeue

c9b25308f76d038e2de2bee39b2d0762.jpg

Exchange接收消息生產者(MessageProducer)發送的消息根據不一樣的路由算法將消息發送往Message queueMessagequeue會在消息不能被正常消費時緩存這些消息,具體的緩存策略由實現者決定,當message queue與消息消費者(Messageconsumer)之間的鏈接通暢時,Message queue有將消息轉發到consumer的責任。

一個Message的處理流程相似於下圖:

a789e83e1de4d4dfa2233f9d87158ceb.jpg

Message是當前模型中所操縱的基本單位,它由Producer產生,通過BrokerConsumer所消費。它的基本結構有兩部分: HeaderBodyHeader是由Producer添加上的各類屬性的集合,這些屬性有控制Message是否可被緩存,接收的queue是哪一個,優先級是多少等。Body是真正須要傳送的數據,它是對Broker不可見的二進制數據流,在傳輸過程當中不該該受到影響。

一個broker中會存在多個Message queueExchange怎樣知道它要把消息發送到哪一個Message queue中去呢? 這就是上圖中所展現Binding的做用。Messagequeue的建立是由client application控制的,在建立Message queue後須要肯定它來接收並保存哪一個Exchange路由的結果。Binding是用來關聯ExchangeMessage queue的域模型。Clientapplication控制Exchange與某個特定Messagequeue關聯,並將這個queue接受哪一種消息的條件綁定到Exchange,這個條件也叫Binding key或是 Criteria

在與多個Messagequeue關聯後,Exchange中就會存在一個路由表,這個表中存儲着每一個Message queue所須要消息的限制條件。Exchange就會檢查它接受到的每一個MessageHeaderBody信息,來決定將Message路由到哪一個queue中去。MessageHeader中應該有個屬性叫Routing Key,它由Message發送者產生,提供給Exchange路由這條Message的標準。Exchange根據不一樣路由算法有不一樣有ExchangeType。好比有Direct相似,須要Bindingkey等於Routing key;也有BindingkeyRouting key符合一個模式關係;也有根據Message包含的某些屬性來判斷。一些基礎的路由算法由AMQP所提供,clientapplication也能夠自定義各類本身的擴展路由算法。

AMQP中,Client application想要與Broker溝通,就須要創建起與Brokerconnection,這種connection實際上是與Virtual Host相關聯的,也就是說,connection是創建在clientVirtual Host之間。能夠在一個connection上併發運行多個channel,每一個channel執行與Broker的通訊,咱們前面提供的session就是依附於channel上的。

這裏的Session能夠有多種定義,既能夠表示AMQP內部提供的command分發機制,也能夠說是在宏觀上區別與域模型的接口。正常理解就是咱們平時所說的交互context,主要做用就是在網絡上可靠地傳遞每個command。在AMQP的設計中,應當是借鑑了TCP的各類設計,用於保證這種可靠性。

Session層,爲上層所須要交互的每一個command分配一個唯一標識符(能夠是一個UUID),是爲了在傳輸過程當中能夠對command作校驗和重傳。Command發送端也須要記錄每一個發送出去的commandReplayBuffer,以期獲得接收方的回饋,保證這個command被接收方明確地接收或是已執行這個command。對於超時沒有收到反饋的command,發送方再次重傳。若是接收方已明確地回饋信息想要告知command發送方但這條信息在中途丟失或是其它問題發送方沒有收到,那麼發送方不斷重傳會對接收方產生影響,爲了下降這種影響,command接收方設置一個過濾器IdempotencyBarrier,來攔截那些已接收過的command關於這種重傳及確認機制,能夠參考下TCP的相關設計。

 

1.1.2     Erlang簡介

 

Erlang([':l])是一種通用的面向併發的編程語言,它由瑞典電信設備製造商愛立信所轄的CS-Lab開發,目的是創造一種能夠應對大規模併發活動的編程語言和運行環境。Erlang問世於1987年,通過十年的發展,於1998年發佈開源版本。Erlang是運行於虛擬機的解釋性語言,可是如今也包含有烏普薩拉大學高性能Erlang計劃(HiPE)開發的本地代碼編譯器,自R11B-4版本開始,Erlang也開始支持腳本式解釋器。在編程範型上,Erlang屬於多重範型編程語言,涵蓋函數式、併發式及分佈式。順序執行的Erlang是一個及早求值, 單次賦值和動態類型的函數式編程語言。

Erlang是一個結構化,動態類型編程語言,內建並行計算支持。最初是由愛立信專門爲通訊應用設計的,好比控制交換機或者變換協議等,所以很是適合於構建分佈式,實時軟並行計算系統。使用Erlang編寫出的應用運行時一般由成千上萬個輕量級進程組成,並經過消息傳遞相互通信。進程間上下文切換對於Erlang來講僅僅只是一兩個環節,比起C程序的線程切換要高效得多得多了。

使用Erlang來編寫分佈式應用要簡單的多,由於它的分佈式機制是透明的:對於程序來講並不知道本身是在分佈式運行。Erlang運行時環境是一個虛擬機,有點像Java虛擬機,這樣代碼一經編譯,一樣能夠隨處運行。它的運行時系統甚至容許代碼在不被中斷的狀況下更新。另外若是須要更高效的話,字節代碼也能夠編譯成本地代碼運行。

 

來自:百度百科

 

其餘MQ

 f97ab5699444eadf310e07173f50f83f.png

1.1.3     下載rabbitMQErlang

 ec3645273847c31033ff96f5c4d49d4f.png

首頁,下拉至:download下載,Tutorials教程

 5e996097280f3fbe68b2156b1eff7f64.png

下載windows版本:

rabbitmq-server-3.6.12.exe

 

教程RabbitMQ Tutorials

http://www.erlang.org/訪問比較慢,建議你們也能夠網上找資源下載。

0d2d1abea7347d9af2481204d39e4426.png

 

1.1.4     rabbitMQ基本概念

spring-boot-rabbitMQ項目源碼https://git.oschina.net/wyait/springboot1.5.4.git

 

config配置類:

@Autowired

private ConnectionFactoryconnectionFactory;

@Autowired

private Queue3Listenerqueue3Listener;

 

@Bean

@Primary

public RabbitTemplaterabbitTemplate() {

    RabbitTemplate rabbitTemplate = newRabbitTemplate(connectionFactory);

    rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());

    return rabbitTemplate;

}

 

@Bean

@Primary

publicSimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory() {

    SimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory = newSimpleRabbitListenerContainerFactory();

   simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);

    simpleRabbitListenerContainerFactory.setMessageConverter(newJackson2JsonMessageConverter());

    returnsimpleRabbitListenerContainerFactory;

}

 

@Bean

publicSimpleMessageListenerContainer simpleMessageListenerContainer() {

    SimpleMessageListenerContainer container =new SimpleMessageListenerContainer(connectionFactory);

    container.setQueues(queue3());

   container.setMessageListener(queue3Listener);

    return container;

}

 

@Bean

public DirectExchangedirectExchange() {

    return new DirectExchange(EX_CHANGE_NAME1);

}

 

@Bean

public Queue queue1() {

    return new Queue(QUEUE1, true);

}

 

@Bean

public Queue queue2() {

    return new Queue(QUEUE2, true);

}

 

@Bean

public Queue queue3() {

    return new Queue(QUEUE3, true);

}

 

@Bean

public Binding binding1() {

    returnBindingBuilder.bind(queue1()).to(directExchange()).with(ROUTING_KEY1);

}

 

@Bean

public Binding binding2() {

    returnBindingBuilder.bind(queue2()).to(directExchange()).with(ROUTING_KEY2);

 

}

 

@Bean

public Binding binding3() {

    return BindingBuilder.bind(queue3()).to(directExchange()).with(ROUTING_KEY3);

}

 

基本概念:

  • ConnectionFactoryConnectionChannel

    connectionsocket鏈接的封裝,connectionFqactoryconnection的生產工程,channel是通訊的信道,實際進行數據交流的管道,由於創建connection的開銷明顯要比創建channel要大不少,因此數據傳輸真實發生在channel

  • Exchange,Queue

    exchange是能夠理解成一條特殊的傳輸通道,他會把消息投遞到綁定的消息池內。

    queue就是消息池了,使用前須要綁定exchange,以及本身的標誌。

  • exchange_key,routing_key

    exchange_key決定了publisher的消息投遞到哪條通道,routing_key決定了將消息放到哪一個池子裏

  • 綁定

    queue要接受消息必須與exchange進行綁定,並在綁定的時候給本身與exchange的綁定設置一個標記routing_key,之後用來匹配消息接收

    exchangequeue是一對多的關係,根據exchange不一樣類型,分別投遞到不一樣的消息池

 

下面來看看exchange的類型

 

  1. 1.       fanout

    直接將消息發送到與該exchange綁定的全部queue

  1. 2.       direct

    routing_key進行嚴格匹配,當消息來到的時候,只有exchange與某queue綁定的routing_key徹底匹配纔將消息投遞到該queue

  1. 3.       topic

    用特殊符號進行匹配,知足條件的queue都能收到消息,這裏的routing_key"."分隔,*匹配一個單詞,#匹配多個單詞,若是知足多個條件也不會投遞屢次

  1. 4.       headers

    不依賴routing_key匹配,根據消息體內的headers屬性匹配,綁定的時候能夠制定鍵值對

    接下來來看看配置文件

          1.@Bean統一注入到容器中,咱們聲明瞭connectionfactory,他會自動根據application裏面的屬性進行組裝,這個鏈接對於後面的容器都是要用到的,這裏要注意converter的設置,由於咱們要將pojo類型進行傳輸,通常程序外的傳輸都是創建在字節流的基礎上,converter就會自動轉換

          2.接下來咱們聲明queuetrue屬性設置爲持久型的池子,當鏈接斷開時,消息會唄保留,而後聲明exchange,這裏咱們使用的是directexchange,接下來將二者綁定起來

  1. 5.       聲明SimpleMessageListenerContainerSimpleRabbitListenerContainerFactory注意這裏聲明兩個是由於這是消息監聽的兩種方式

    首先講講SimpleMessageListenerContainer,這個須要設置確認方式,有較多屬性克設置,有興趣可自行設置,這裏我只是簡單的設置了一下,而後要設置listener

    listener須要實現ChannelAwareMessageListener裏面有

    public void onMessage(Message message,Channel channel) 的重載方法須要實現,消息體在Messagebody內,相對來講信息比較完備

    接下來看看SimpleRabbitListenerContainerFactory,這個有幾個注意點,須要再次設置converter由於,一個是發消息的時候解析成二進制,這個則是將二進制解析成具體的類,回調相對簡單一點

 

    @Component

    @RabbitListener(queues =RabbitMQConfig.QUEUE1, containerFactory ="simpleRabbitListenerContainerFactory")

    public class Queue1Listener  {

     private static Logger logger =LoggerFactory.getLogger(Queue1Listener.class);

     @RabbitHandler

     public void receive(@Payload String s) {

 

     logger.info("listener1 info: " +s);

 

     }

    }

 

    記得須要containerFactory具體寫出來

    在接收消息的方法上寫@RabbitHandler,消息體打上@payload久能夠接受消息了。

    其實還有個方法就是指定一個MessageAdapter,而後在container裏面就能夠指定接收的方法名,不是很推薦,明文反射總感受容易出問題

    固然publisher也是有消息的回調的

    RabbitTemplate下有ConfirmCallback實現confirm方法就行了

 

1.2  rabbitMQ入門

1.2.1     安裝rabbitMQwindows

安裝步驟:

1.    安裝Erland,經過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開並完成安裝。

2.    安裝RabbitMQ,經過官方下載頁面https://www.rabbitmq.com/download.html獲取exe安裝包。

3.    下載完成後,能夠直接運行安裝程序,或配置環境變量後運行rabbitMQ-server安裝程序。

4.    RabbitMQ Server安裝完成以後,會自動的註冊爲服務,並以默認配置啓動起來。

 

安裝過程請百度

 

安裝成功:訪問:http://127.0.0.1:15672   用戶密碼:guest/guest

19f5a378734d5d5164c22f58228dee5f.png

咱們能夠看到一些基本概念,好比:ConnectionsChannelsExchangesQueue等。第一次使用,能夠都點開看看都有些什麼內容,熟悉一下RabbitMQ Server的服務端。

 

點擊Admin標籤,在這裏能夠進行用戶的管理。

4cb5295ff0954532b14efa40468bc720.png

點擊admin,添加用戶:wyait/wyait並受權。

9ecaf7b7d635a42a1442d6a7f7828590.png

點擊all users表單中的用戶名「wyait」進行受權:

 1ae05b2095255154c1e6dd23f9e33b4f.png

1.2.1.1             Virtual Hosts設置界面:

160f8f3478f8a7e0a8237bbad7acb3d0.png

程序中和rabbitMQ交互的端口是:5672AMQP協議端口

 

1.2.2     建立spring-boot-MQ工程

項目源碼,

碼雲地址:https://git.oschina.net/wyait/springboot1.5.4.git

github地址:https://github.com/wyait/spring-boot-1.5.4.git

spring boot整合rabbitMQ項目博客連接:spring boot 1.5.4 整合rabbitMQ(十七)

 

1.2.3     消息隊列

官網:rabbitMQTutorials

04d7e56a4dd5d8b50c30711d96b72f2b.png

前提,rabbitMQ服務已經啓動;測試過程:

1spring Boot項目先啓動,監聽隊列;

2,啓動測試類發送消息到隊列中;、

3,消費者消費消息。

 

1.2.3.1             hello world

The simplest thingthat does something 簡單的消息隊列:

ae97423ac713c3dac0e57e861f95bd2b.png

P:消息的生產者;

C:消息的消費者;

紅色框:消息隊列;

 

demo參考1.2.2章節。

 

1.2.3.2             Work Queues

Distributing tasksamong workers (the competing consumers pattern)

9ef016c90da8e91d24f217825d6dff74.png

一個生產者對應一個消息隊列MQMQ能夠對應多個消費者,可是同一個消息只能被一個客戶端生產者所獲取;

 

同一個消息只能被一個客戶端所獲取。可是對於不一樣的消費者,接受消息,處理的效率不一樣,因此會有不合理的地方。

 

RabbitMqConfig中定義一個隊列workQueues

@Bean

   public Queue workQueue() {

      return new Queue("workQueues");

   }

 

消息生產者WorkSender:

@Component

public class WorkSender {

   @Autowired

   private AmqpTemplate rabbitMQTemplate;

 

   /**

    *

    * @描述:work模式

    * @建立人:wyait

    * @建立時間:2017914下午5:51:20

    */

   public void workSend(String msg) {

      String context = msg + new Date();

      System.out.println("workSender : " + context);

      this.rabbitMQTemplate.convertAndSend("workQueues",context);

   }

}

 

消息消費者1 WorkReceiver:

@Component

@RabbitListener(queues ="workQueues")

public class WorkReceiver {

 

   @RabbitHandler

   // handler註解來指定對消息的處理方法

   public void process(String hello) {

      System.out.println("workReceiver:" + hello);

   }

}

 

消息消費者2 WorkReceiverTwo:

@Component

@RabbitListener(queues ="workQueues")

public class WorkReceiverTwo {

 

   @RabbitHandler

   // handler註解來指定對消息的處理方法

   public void process(String hello) {

      System.out.println("workReceiverTwo:" + hello);

   }

}

 

測試消費消息結果:
503a2221941bc7043dad04eb1bcc8e89.png

平均分配消息原則(你一條,我一條)。

可經過更改channel設置,改變分配策略。

 

1.2.3.3             Publish/Subscribe訂閱模式

Sending messagesto many consumers at once.

一個生產者將同一條消息message發送到交換機exchange,經過exchange發送到多個隊列中,而對應的消費者都能獲取到該消息。

30edb6a834b95aabead6a9659c5caceb.png

注意:

問題1:消息是發送到交換機而不是隊列?答:消息能夠發送到隊列,也能夠發送到交換機。

問題2:消費者的消息來源只能是隊列;

問題3:若是將消息發送到沒有綁定隊列的交換機上,消息會去哪?答:消息丟失。

總結:消息只能存放於隊列,不能存放在交換機;交換機只能用於消息的傳遞,消息通道。

 

Fanout Exchange:

46015481703c70a380c08c87f758e852.png

不處理路由鍵(routingKey)。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout交換機轉發消息是最快的。

 

Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout轉發器發送消息,綁定了這個轉發器的全部隊列都收到這個消息。

 

這裏使用三個隊列來測試(也就是在Application類中建立和綁定的fanout.Afanout.Bfanout.C)這三個隊列都和Application中建立的fanoutExchange轉發器綁定。

 

新增subscribe訂閱模式配置:

// ******************subscribe訂閱模式**********Start****************

   /**

    *

    * @描述:subscribe訂閱模式的隊列A;

    * @建立人:wyait

    * @建立時間:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueA() {

      return new Queue("fanout.A");

   }

 

   /**

    *

    * @描述:subscribe訂閱模式的隊列B;

    * @建立人:wyait

    * @建立時間:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueB() {

      return new Queue("fanout.B");

   }

 

   /**

    *

    * @描述:subscribe訂閱模式的隊列C;

    * @建立人:wyait

    * @建立時間:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue subscribeQueueC() {

      return new Queue("fanout.C");

   }

 

   /**

    *

    * @描述:fanoutExchange交換機

    * @建立人:wyait

    * @建立時間:2017915下午3:34:41

    * @return

    */

   @Bean

   public FanoutExchange fanoutExchange() {

      return new FanoutExchange("fanoutExchange");

   }

 

   /**

    *

    * @描述:subscribeQueue綁定fanoutExchange交換機

    * @建立人:wyait

    * @建立時間:2017915下午3:41:10

    * @param subscribeQueue

    * @param fanoutExchange

    * @return

    */

   @Bean

   Binding bindingExchangeA(Queue subscribeQueueA,

        FanoutExchange fanoutExchange) {

      // 綁定隊列AfanoutExchange交換機,也可使用:bind(subscribeQueueA())進行綁定;

      return BindingBuilder.bind(subscribeQueueA).to(fanoutExchange);

   }

 

   @Bean

   Binding bindingExchangeB(Queue subscribeQueueB,

        FanoutExchange fanoutExchange) {

      return BindingBuilder.bind(subscribeQueueB).to(fanoutExchange);

   }

 

   @Bean

   Binding bindingExchangeC(Queue subscribeQueueC,

        FanoutExchange fanoutExchange) {

      return BindingBuilder.bind(subscribeQueueC).to(fanoutExchange);

   }

 

   // ******************subscribe訂閱模式**********End****************

 

消息生產者SubscribeSender指定交換機:

@Component

public class SubscribeSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---SubscribeSender : " +sendMsg);

      // convertAndSend(String exchange, String routingKey, Objectmessage)

      this.rabbitTemplate.convertAndSend("fanoutExchange","aaa", sendMsg);

   }

 

}

 

消息消費者SubscribeReveicerABC監聽隊列fanout.A/B/C:

@Component

@RabbitListener(queues ="fanout.A")

public class SubscribeReceiver{

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("SubscribeReceiverA  : " + msg);

   }

}

 

測試test類:

@Autowired

   private SubscribeSender subSend;

 

   @Test

   public void subscribeTest() {

      System.out.println("==========subscribe發送消息!");

      for (int i = 0; i < 50; i++) {

        String msg = "==========msg_" + i;

        subSend.send(msg);

        try {

           Thread.sleep(100);

        } catch (InterruptedException e) {

           e.printStackTrace();

        }

      }

   }

 

三個消費者都接收到了每一條信息。

851514323debff33e0affe3d7a010e66.png

注意:subscribe訂閱模式和work模式的區別。

一、  work模式將消息發送到隊列

二、  訂閱模式將消息發送到交換機

三、  work模式是1個隊列N個消費者,訂閱模式是N個隊列N個消費者(N>0)

 

1.2.3.4             Routing路由模式

e1eec04aaba9c5ee0ec7c4c151e98eb4.png

路由模式:基於訂閱模式,

能夠在隊列綁定到交換機時指定一個規則,根據不一樣的消息規則,選擇是否接受該消息。

0f3bcce82aacc2088184af47292c1a86.png

處理路由鍵(routingKey)。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵routingKey徹底匹配。

 

基於Subscribe訂閱模式,配置類中添加隊列、DirectExchange交換機並進行綁定:

/**

    *

    * @描述:routing路由模式的隊列A;

    * @建立人:wyait

    * @建立時間:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue routingQueueA() {

      return new Queue("routing.A");

   }

 

   /**

    *

    * @描述:routing路由模式的隊列B;

    * @建立人:wyait

    * @建立時間:2017915下午3:24:31

    * @return

    */

   @Bean

   public Queue routingQueueB() {

      return new Queue("routing.B");

   }

 

   /**

    *

    * @描述:DirectExchange交換機

    * @建立人:wyait

    * @建立時間:2017915下午3:34:41

    * @return

    */

   @Bean

   public DirectExchange directExchange() {

      return new DirectExchange("directExchange");

   }

 

   /**

    *

    * @描述:routingQueue綁定directExchange交換機

    * @建立人:wyait

    * @建立時間:2017915下午3:41:10

    * @param routingQueue

    * @param directExchange

    * @return

    */

   @Bean

   Binding bindingDirectExchangeA(Queue routingQueueA,

        DirectExchange directExchange) {

      // 綁定routing隊列AdirectExchange交換機,並指定routing路由規則;

      return BindingBuilder.bind(routingQueueA()).to(directExchange())

           .with("info");

   }

 

   @Bean

   Binding bindingDirectExchangeB(Queue routingQueueB,

        DirectExchange directExchange) {

      // 綁定routing隊列AdirectExchange交換機,並指定routing路由規則;

      returnBindingBuilder.bind(routingQueueB()).to(directExchange())

           .with("error");

   }

 

消息生產者:

@Component

public class RoutingSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender : " + sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","info", sendMsg);

   }

 

   public void sendTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender TWO: " +sendMsg);

      this.rabbitTemplate

           .convertAndSend("directExchange", "infoTwo",sendMsg);

   }

 

   public void sendError(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender Error: " +sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","error", sendMsg);

   }

 

   public void sendErrorTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---RoutingSender ErrorTwo: " +sendMsg);

      this.rabbitTemplate.convertAndSend("directExchange","errorTwo",

           sendMsg);

   }

 

}

 

消息消費者A

@Component

@RabbitListener(queues ="routing.A")

public class RoutingReceiver {

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("RoutingReceiverA === : " + msg);

   }

 

}

 

測試類:

@Autowired

   private RoutingSender routSend;

 

   @Test

   public void routingTest() {

      System.out.println("==========routing發送消息!");

      routSend.send("==========msg_info ");

      routSend.sendTwo("==========msg_infoTwo ");

      routSend.sendError("==========msg_error ");

      routSend.sendErrorTwo("==========msg_ErrorTwo ");

 

      System.out.println("==========routing發送消息   結束!");

   }

 

運行:

e1ea13567a2e8d26b56be095dc8fe33e.png

MqApplication控制檯:

33e294ec1cc02f99e7e6b190bd609584.png

由此能夠看出,routingKey符合規則的消息,會被消費方接收並消費。

 

1.2.3.5             Topics通配符模式

Receiving messagesbased on a pattern (topics)

9186caf294234d3acd57b894fc72a1ea.png

基於路由模式,使用通配符匹配隊列,發送消息

4ea0164e18715288b5c2314834987a09.png

將路由鍵和某模式進行匹配。

 

任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue

 

1. 這種模式須要RouteKey,要提早綁定ExchangeQueue

 

2. 若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。

 

3. 在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個RouteKey爲」MQ.log.error」的消息會被轉發到該隊列)

 

4. #」表示0個或若干個關鍵字,「*」表示一個關鍵字。如「log.*」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。

通配符#*的區別;

#:表明匹配一個或多個詞;

*:表明只匹配一個詞.

 

配置類中新增隊列綁定TopicExchange交換機,並指定routingKey和匹配模式:

@Bean

   public Queue topicQueueA() {

      return new Queue("topic.queueA", true); // true表示持久化該隊列

   }

 

   @Bean

   public Queue topicQueueB() {

      return new Queue("topic.queueB", true);

   }

 

   // 聲明交互器

   @Bean

   TopicExchange topicExchange() {

      return new TopicExchange("topicExchange");

   }

 

   // 綁定

   @Bean

   public Binding bindingA() {

      return BindingBuilder.bind(topicQueueA()).to(topicExchange())

           .with("topic.message");

   }

 

   @Bean

   public Binding bindingB() {

      return BindingBuilder.bind(topicQueueB()).to(topicExchange())

           .with("topic.#");

   }

 

消息生產者:

@Component

public class TopicSender {

   @Autowired

   private AmqpTemplate rabbitTemplate;

 

   public void send(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---TopicSender : " + sendMsg);

      this.rabbitTemplate.convertAndSend("topicExchange","topic.message",

           sendMsg);

   }

 

   public void sendTwo(String msg) {

      String sendMsg = msg + new Date();

      System.out.println("---TopicSender messages: " +sendMsg);

      this.rabbitTemplate.convertAndSend("topicExchange","topic.messages",

           sendMsg);

   }

 

}

 

消息消費者:

@Component

@RabbitListener(queues ="topic.queueA")

public class TopicReceiver {

   @RabbitHandler

   public void precess(String msg) {

      System.out.println("TopicReceiverA  : " + msg);

   }

}

 

test測試類:

   @Autowired

   private TopicSender topicSend;

 

   @Test

   public void topicTest() {

      System.out.println("==========topic發送消息!");

      topicSend.send("==========msg_info ");

      topicSend.sendTwo("==========msg_infoTwo ");

 

      System.out.println("==========topic發送消息   結束!");

   }

 

重啓MqApplication,運行test類:結果:

43a0b8c3dc037057b030a37d6838a34c.png

 92193865ac175fbe027e1cf8d04884fc.png

根據路由規則,接收不一樣生產者的消息。

1.2.3.6             交換機總結

RPC模式能夠百度查資料去了解!

 

FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念 

        HeadersExchange :經過添加屬性key-value匹配 

        DirectExchange:按照routingkey分發到指定隊列 

        TopicExchange:多關鍵字匹配 

 

1.2.4     管理界面操做隊列和交換機

 

進入Exchanges交換機界面,能夠看到全部的AMQP默認的交換機和定義的Exchange:

f3e237c7577f5ebd94d7330a5bd15911.png

選擇topicExchange:

5093d779e87925abfbf8fad052723d98.png

 

能夠對隊列進行添加和解綁操做!

 

1.2.5     隊列的持久化

RabbitMQ的隊列有2種,一種是內存隊列,一種是持久化隊列

一、  內存隊列

  1. 優勢:速度快,效率高

  2. 缺點:宕機,消息丟失

二、  持久化隊列

  1. 優勢:消息能夠持久化保存,宕機或斷電後消息不丟失

  2. 缺點:比內存存儲速度慢,性能差

設置方法:

@Bean

   public Queue topicQueueA() {

      return new Queue("topic.queueA", true); // true表示持久化該隊列

   }

 

管理界面查看是否持久化:

107a956b9ba6c55ce12636ecf6b0bfd7.png

D 持久化

 

項目源碼,

碼雲地址:https://git.oschina.net/wyait/springboot1.5.4.git

github地址:https://github.com/wyait/spring-boot-1.5.4.git



spring boot系列文章:

spring boot 1.5.4 概述(一)

spring boot 1.5.4入門和原理(二)

spring boot 1.5.4 之web開發(三)

spring boot 1.5.4 整合JSP(四)

spring boot 1.5.4 集成devTools(五)

spring boot 1.5.4 集成JdbcTemplate(六)

spring boot 1.5.4 集成spring-Data-JPA(七)

spring boot 1.5.4 配置文件詳解(八)

spring boot 1.5.4 統一異常處理(九)

spring boot 1.5.4 定時任務和異步調用(十)

spring boot 1.5.4 整合log4j2(十一)

spring boot 1.5.4 整合 mybatis(十二)

spring boot 1.5.4 整合 druid(十三)

spring boot 1.5.4 之監控Actuator(十四)

spring boot 1.5.4 整合webService(十五)

spring boot 1.5.4 整合redis、攔截器、過濾器、監聽器、靜態資源配置(十六)

spring boot 1.5.4 整合rabbitMQ(十七)

相關文章
相關標籤/搜索