RabbitMQ簡介以及與SpringBoot整合示例

什麼是消息隊列

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。html

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。java

RabbitMQ簡介

RabbitMQ安裝

1:下載spring

下載 rabbitMQ :http://www.rabbitmq.com/downl...windows

安裝rabbitmq須要erlang,下載erlang:http://www.erlang.org/downloa...服務器

2:Windows安裝RabbitMQ微信

rabbitMQ安裝,查看安裝文檔:http://www.rabbitmq.com/insta...網絡

3:安裝Erlangapp

下載完成Erlang後,直接打開文件下一步就能夠安裝完成了,安裝完成ERLANG後再回過來安裝RabbitMQ。分佈式

四、啓動RabbitMQide

若是是安裝版的直接在開始那裏找到安裝目錄點擊啓動便可

img

五、安裝管理工具

參考官方文檔:http://www.rabbitmq.com/manag...

操做起來很簡單,只須要在DOS下面,進入安裝目錄(F:RabbitMQ Serverrabbitmq_server-3.5.3sbin)執行以下命令就能夠成功安裝。

rabbitmq-plugins enable rabbitmq_management

能夠經過訪問http://localhost:15672進行測試,默認的登錄帳號爲:guest,密碼爲:guest。

若是訪問成功了,恭喜,整個rabbitMQ安裝完成了。

RabbitMQ安裝就不細講了,大致就這樣子,有什麼問題能夠具體查看其餘官網詳細的安裝文檔。

RabbitMQ特色

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。

RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:

1:可靠性(Reliability)

RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。

2:靈活的路由(Flexible Routing)

在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。

3:消息集羣(Clustering)

多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。

4:高可用(Highly Available Queues)

隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。

5:多種協議(Multi-protocol)

RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。

6:多語言客戶端(Many Clients)

RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。

7:管理界面(Management UI)

RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。

8:跟蹤機制(Tracing)

若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。

9:插件機制(Plugin System)

RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

消息模型

全部MQ產品從模型抽象上來講都是同樣的過程:

消費者(consumer)訂閱某個隊列,生產者(producer)建立消息,而後發佈到隊列(queue)中去,消息將發送到訂閱了該隊列的消費者。

RabbitMQ基本概念

生產者(Producer) > 交換器(Exchange) > 隊列(Queue) > 消費者(Consumer)

1: Message

消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。

2: Publisher

消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。

3: Exchange

交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

4: Binding

綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。

5: Queue

消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。

6: Connection

網絡鏈接,好比一個TCP鏈接。

7: Channel

信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。

8: Consumer

消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

9: Virtual Host

虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。

10: Broker

表示消息隊列服務器實體。

RabbitMQ三種模式

1: Direct

消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。

2: Fanout

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

3: Topic

topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「」。#匹配0個或多個單詞,匹配很少很多一個單詞。

4: headers

不經常使用,和direct功能接近,不討論。

上面只是簡單的介紹了一下RabbitMQ的三種模式,接下來結合代碼實例來看看。

SpringBoot整合RabbitMQ

SpringBoot就很少作介紹了,用idea建立SpringBoot項目,pom文件中導入以下包:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

而後在applicaition.yml加入以下配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: magic
    password: 123456

這樣就算是引入RabbitMQ就算成功了,接下來說一下RabbitMQ三種模式結合SpringBoot。

Direct

Direct就是一對一模式,從上面能夠知道,RabbitMQ有發送者,交換機,隊列,接收者。Direct就是一個發送者對應一個接收者。若是有多個,只會有一個接收到消息。

先建立該模式的隊列(Queue)

@Configuration
public class OneByOneConfig {
    
    @Bean
    public Queue oneQueue(){
        return new Queue("OneByOne");
    }
    
}

能夠看到新建的隊列名字叫作OneByOne,接下來建立發送者類。

@Component
public class OneByOneSender {

    @Autowired
    AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "OneByOneSender" + new Date();
        System.out.println("OneSender : " + context);
        this.rabbitTemplate.convertAndSend("OneByOne", context);
    }

}

這裏面AmqpTemplateSpringBoot包裝好的用來操做消息隊列的。convertAndSend是裏面的一個方法,用來發送者匹配交換機,隊列以及攜帶消息的。Drect裏面只須要匹配隊列以及攜帶消息便可。

接下來建立接收者

@Component
@RabbitListener(queues = "OneByOne")
public class OneByOneReceiver {

    @RabbitHandler
    public void receiver(String context){
        System.out.println("OneByOne-Receiver::::"+context);
    }
}

@RabbitListener是用來綁定隊列的,該接收者綁定了OneByOne這個隊列,下面的@RabbitHandler註解是用來表示該方法是接收者接收消息的方法。

接下來進行測試,SpringBoot有測試的test類

導入OneByOneSender類

@Autowired
    private OneByOneSender oneByOneSender;

    @Test
    public void oneByOneTest(){
        oneByOneSender.send();
    }

運行能夠看到結果輸出:

OneSender : OneByOneSenderThu Aug 22 17:00:56 CST 2019

OneByOne-Receiver::::OneByOneSenderThu Aug 22 17:00:56 CST 2019

能夠看到發送者發送打印的輸出,以及接收者接收到的消息打印出來的結果。

Fanout

Fanout模式就是發佈訂閱模式,發佈者發佈了消息時候順便綁定交換器,交換器又是跟隊列綁定的,那麼跟這個交換器綁定的全部隊列都會收到這個消息,相應的綁定了這些隊列的全部接收者都會接收到發送的消息。

具體看代碼再分析:

建立FanoutConfig類,而後建立隊列,交換器,以及綁定隊列與交換器。

@Configuration
public class FanoutConfig {

    //隊列1
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.a");
    }

    //隊列2
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.b");
    }

    //隊列3
    @Bean
    public Queue fanoutQueue3(){
        return new Queue("fanout.c");
    }

    //交換器
    @Bean
    FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }

    //綁定交換器和隊列1
    @Bean
    Binding bindingFanout1(){
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }

    //綁定交換器和隊列2
    @Bean
    Binding bindingFanout2(){
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    //綁定交換器和隊列3
    @Bean
    Binding bindingFanout3(){
        return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
    }
}

能夠看到上面建立了三個隊列,到時候再建立三個接收者,那麼這三個接收者再Fanout模式下,只要發佈者綁定了該fanoutExchange交換器,那麼他們就應該均可以收到消息。

建立發送者

@Component
public class FanoutSender {

    @Autowired
    AmqpTemplate rabbitTemplate;

    public void fanSender1(){
        Date date = new Date();
        String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date);
        String message = "FanSender1111:"+dateString;
        this.rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }

    public void fanSender2(){
        Date date = new Date();
        String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date);
        String message = "FanSender2222:"+dateString;
        this.rabbitTemplate.convertAndSend("fanoutExchange","",message);
    }
}

建立了兩個發送者,分別綁定了fanoutExchange交換器,中間的是交換器選擇隊列是的條件routerKey,這個在後面的Topic模式中會用到,如今由於是全部隊列都會收到,全部就沒有條件。

建立三和接收者分別綁定三個隊列

@Component
@RabbitListener(queues = "fanout.a")
public class Fanout1Reciver {

    @RabbitHandler
    public void receiver(String message){
        System.out.println("FanoutReceiver---fanout.a:"+message);
    }
}

@Component
@RabbitListener(queues = "fanout.b")
public class Fanout2Reciver {

    @RabbitHandler
    public void receiver(String message){
        System.out.println("FanoutReceiver---fanout.b:"+message);
    }
}

@Component
@RabbitListener(queues = "fanout.c")
public class Fanout3Reciver {

    @RabbitHandler
    public void receiver(String message){
        System.out.println("FanoutReceiver---fanout.c:"+message);
    }
}

能夠看到接收者分別綁定了fanout.a/b/c三個隊列,接下來進行測試

運行下面代碼:

@Autowired
FanoutSender fanoutSender;

@Test
public void fanoutSenderTest(){
   fanoutSender.fanSender1();
   fanoutSender.fanSender2();
}

結果:

FanoutReceiver---fanout.b:FanSender1111:2019-40-234 05:08:32
FanoutReceiver---fanout.c:FanSender1111:2019-40-234 05:08:32
FanoutReceiver---fanout.a:FanSender1111:2019-40-234 05:08:32
FanoutReceiver---fanout.c:FanSender2222:2019-40-234 05:08:32
FanoutReceiver---fanout.b:FanSender2222:2019-40-234 05:08:32
FanoutReceiver---fanout.a:FanSender2222:2019-40-234 05:08:32

由於咱們的FanoutSender發送者沒有寫輸出,因此能夠看到上面六條都是接收者的輸出,兩個發送者分別發送了一條消息,三個接收者都收到了這兩個發送者發送的消息。

Topic

Topic模式就至關於發佈訂閱模式交換機和隊列之間加上了必定的匹配規則。只有符合規則的消息才能到這個隊列中去從而被接收者收到。看代碼

建立TopicConfig:

@Configuration
public class TopicConfig {

    @Bean
    public Queue topicQueue1(){
        return new Queue("topic.a");
    }
    @Bean
    public Queue topicQueue2(){
        return new Queue("topic.b");
    }
    @Bean
    public Queue topicQueue3(){
        return new Queue("topic.c");
    }

    @Bean
    TopicExchange topicExchange(){
        return new TopicExchange("topicExchange");
    }

    @Bean
    public Binding binding1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.msg");
    }

    @Bean
    public Binding binding2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.#");
    }

    @Bean
    public Binding binding3(){
        return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("topic.*.z");
    }
}

能夠看到建立了三個隊列和一個交換器,而且將交換器和隊列進行了綁定,在綁定的過程當中多了一個條件with,這是一種通配符方式的匹配,. 爲分隔符,*表明一個,#表示0個或者多個,如上面的topic.#就可已匹配,topic,topic.z,topic.ma.z.z.z等,而topic.*.z就能夠匹配topic.m.z,topic.z.z等,而topic.msg就只能匹配topic.msg條件的消息。

建立發送者:

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
   public void topicSend1(){
       Date date = new Date();
       String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date);
       dateString = "[topic.msg] Send1 msg:" + dateString;
       System.out.println(dateString);
       this.rabbitTemplate.convertAndSend("topicExchange","topic.msg",dateString);
   }

    public void topicSend2(){
        Date date = new Date();
        String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date);
        dateString = "[topic.good.msg] Send2 msg:" + dateString;
        System.out.println(dateString);
        this.rabbitTemplate.convertAndSend("topicExchange","topic.good.msg",dateString);
    }

    public void topicSend3(){
        Date date = new Date();
        String dateString = new SimpleDateFormat("yyyy-mm-DD hh:MM:ss").format(date);
        dateString = "[topic.m.z] Send3 msg:" + dateString;
        System.out.println(dateString);
        this.rabbitTemplate.convertAndSend("topicExchange","topic.m.z",dateString);
    }
}

其中

this.rabbitTemplate.convertAndSend("topicExchange",
"topic.good.msg", dateString);

發送者發送消息時,傳的三個參數,第一個時你要傳給的交換機,第二個是傳給交換機的條件,在Topic模式中,隊列與交換機會有一個匹配的條件,若是如今有三個隊列和交換機綁定,分別條件是:A: topic.# ,B: topic.msg, C:topic.*.z(#表明多個,*表明一個)。

則上面代碼給的key時 topic.good.msg 就只能匹配到A隊列中去。若是時topic.msg,那麼就匹配到B隊列中了,若是是topic.good.z/topic.msg.z 那麼會匹配到A和C兩個隊列中去。

而同時,只要綁定了A,B,C的隊列的接收者,若是上面匹配成功,消息就會被髮布到隊列中,相應的綁定了該隊列的接收者就會獲取到該消息。

建立接收者:

@Component
@RabbitListener(queues = "topic.a")
public class Topic1Reciver {

    @RabbitHandler
    public void receiver(String message){
        System.out.println("topic.A--Receiver::::"+message);
    }
}

@Component
@RabbitListener(queues = "topic.b")
public class Topic2Reciver {

    @RabbitHandler
    public void receiver(String msg){
        System.out.println("topic.B--Receiver::::"+msg);
    }
}

@Component
@RabbitListener(queues = "topic.c")
public class Topic3Reciver {

    @RabbitHandler
    public void receiver(String msg){
        System.out.println("topic.C--Receiver::::"+msg);
    }
}

三個接收者分別綁定三個隊列,看看測試以及結果

@Autowired
    TopicSender topicSender;

    @Test
    public void topicSenderTest(){
        topicSender.topicSend1();
        topicSender.topicSend2();
        topicSender.topicSend3();
    }

結果:

[topic.msg] Send1 msg:2019-00-234 06:08:00
[topic.good.msg] Send2 msg:2019-00-234 06:08:00
[topic.m.z] Send3 msg:2019-00-234 06:08:00

----------------------------------------------------

topic.B--Receiver::::[topic.msg] Send1 msg:2019-00-234 06:08:00
topic.C--Receiver::::[topic.m.z] Send3 msg:2019-00-234 06:08:00
topic.A--Receiver::::[topic.msg] Send1 msg:2019-00-234 06:08:00
topic.B--Receiver::::[topic.good.msg] Send2 msg:2019-00-234 06:08:00
topic.B--Receiver::::[topic.m.z] Send3 msg:2019-00-234 06:08:00

分割線上面的是發送者的輸出,三個發送者分別發送了一條消息,根據發送者傳入的key與交換器與隊列綁定的匹配規則進行匹配,最終匹配經過的將消息從交換器發到隊列中,相應的綁定該隊列的接收者就能夠獲取到這條消息。

總結

在Java中,須要先建立queue隊列,接收消息着綁定的是隊列,若是隊列中有了消息,就會發給綁定它的接收者。而後交換機和隊列進行綁定,交換機是一個,全部隊列都在交換機上綁定着,發送者發送消息時,把消息給交換機,而後加上限制條件,topic是給知足條件的隊列,fanout時給全部綁定交換機的隊列。普通模式就是Direct模式也就是1對1模式,發送者直接綁定隊列,接收者也綁定隊列。互勉~


歡迎關注個人微信公衆號,一個喜歡代碼和NBA的年輕人,主要用來分享技術收穫。

相關文章
相關標籤/搜索