初窺RabbitMQ消息中間及SpringBoot整合

【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】java

一:RabbitMQ簡介

RabbitMQ介紹

RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。web

消息中間件最主要的做用是解耦,中間件最標準的用法是生產者生產消息傳送到隊列,消費者從隊列中拿取消息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產消息,從而達到解耦的目的。在分佈式的系統中,消息隊列也會被用在不少其它的方面,好比:分佈式事務的支持,RPC的調用等等。spring

RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。shell

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。編程

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。json

運行機制

一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。數組

  • 左側 P 表明 生產者,也就是往 RabbitMQ 發消息的程序。
  • 中間便是 RabbitMQ,其中包括了 交換機 和 隊列。
  • 右側 C 表明 消費者,也就是往 RabbitMQ 拿消息的程序。

那麼,其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。安全

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。爲何須要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。
  • 交換機:Exchange 用於轉發消息,可是它不會作存儲 ,若是沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵(routing_key) 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麼究竟轉發到哪一個隊列,就要根據該路由鍵。
  • 綁定:也就是交換機須要和隊列相綁定,這其中如上圖所示,是多對多的關係。

交換機

交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout服務器

  • Direct:direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
  • Topic:按規則轉發消息(最靈活)
  • Headers:設置header attribute參數類型的交換機
  • Fanout:轉發消息到全部綁定隊列(廣播:忽略routing_key )

一句話總結

RabbitMQ簡單來講,就是生產者發送消息到虛擬主機,虛擬主機把消息交給指定的交換機,交換機按照規則扔給消息隊列進行存儲,消息隊列等待消費者來消費。微信

由此我想到了商品買賣:廠家生產商品賣給批發部,批發部交給指定的超市出售,超市按照售價擺放在門店,並等待顧客上門購買。

完美~~

二:RabbitMQ安裝

由於RabbitMQ是由erlang語言寫的,就像Java程序須要jdk環境同樣,運行RabbitMQ也須要erlang環境。

1.安裝erlang

環境:Centos7.4

下載地址:http://erlang.org/download/

在Linux終端運行命令行

下載:

wget http://erlang.org/download/otp_src_18.3.tar.gz

下載必定要認準otp_src_字樣。

解壓:

tar -zxvf otp_src_18.3.tar.gz

進入解壓後的根目錄:

./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
make && make install

上面有點慢。

把erlang加入環境變量:

vi /etc/profile
export ERLANG=/usr/local/erlang/erlang
export PATH=$ERLANG/bin:$PATH

使環境變量生效

source /etc/profile

而後,咱們測試下是否安裝成功:

[root@ystcode ~]# erl
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.3  (abort with ^G)
1>

2.安裝RabbitMQ

下載地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/

下載:

wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-generic-unix-3.6.1.tar.xz

對於下載xz包進行解壓,首先先下載xz壓縮工具:

yum install xz

對rabbitmq包進行解壓:

xz -d xz -d rabbitmq-server-generic-unix-3.6.1.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.1.tar

隨後在sbin目錄啓用MQ管理方式:

./rabbitmq-plugins enable rabbitmq_management   #啓動後臺管理
./rabbitmq-server -detached #後臺運行rabbitmq

添加用戶和權限 默認網頁guest用戶是不容許訪問的,須要增長一個用戶修改一下權限,代碼以下:

添加用戶:

./rabbitmqctl add_user admin admin

添加權限:

./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

修改用戶角色:

./rabbitmqctl set_user_tags admin administrator

而後就能夠遠程訪問了,而後可直接配置用戶權限等信息。

驗證

訪問http://localhost:15672/,輸入admin用戶密碼,登陸成功!

三:整合Spring Boot

新建一個1.5版本的Spring boot項目,選擇rabbitmq+web模塊。

1.導入依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.yml配置rabbitmq

virtual-host: / 默認就是斜槓,具體查看rabbitmq後臺admin欄。若是默認這行不用寫。

spring:
  rabbitmq:
    host: 127.0.0.1
    username: admin
    password: admin
    virtual-host: / #能夠不用寫

3.發送direct消息

direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.

1.咱們在RabbitMQ後臺新建一個交換機,demo-direct交換機名,direct交換機類型,Durable持久化:

2.再新建一個消息隊列,取名爲demo

3.點擊demo-direct交換機進入綁定消息隊列demo

若是沒有指定routingkey,消息隊列的名稱就是routingkey

4.綁定成功後查看:

你能夠直接在交換機頁面下方的Publish message發送消息,在消息隊列的Get message查看消息,不過咱們實際生產比較多使用編程:

在Spring Boot建立單元測試

demo-direct交換機名

demo是該交換機綁定的消息隊列名

發送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() {
   //message須要本身構造一個,定義消息體內容和消息體
   //rabbitTemplate.send(exchange,routingkey,message);
   Map map = new HashMap();
   map.put("key","值");
   map.put("msg",true);
   //對象被默認序列化後發送
   rabbitTemplate.convertAndSend("demo-direct","demo",map);
}

此時發送消息咱們在rabbitmq網頁發現消息是經序列化後的,咱們若是想改變序列化機制爲JSON,也很簡單,只須要注入一我的Bean:

@Configuration
public class MyAMQPConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

而後咱們再啓動測試,發送。

接收消息

demo是消息隊列名,也就是消費者只須要獲得消息隊列的名字就能夠接收隊列中的消息。

@Test
public void receive(){
    Object o = rabbitTemplate.receiveAndConvert("demo");
    System.out.println(o.getClass());
    System.out.println(o);
}

查看打印

class java.util.HashMap
{msg=true, key=值}

4.廣播fanout消息

【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】

轉發消息到全部綁定隊列(廣播:忽略routing_key )

1.首先咱們須要在RabbitMQ後臺建立一個廣播交換機:

2.再建立一些(demo1,demo2)消息隊列,以一個做爲演示:

3.進入建立的交換機頁面:

4.交換機與消息隊列(demo1,demo2)進行綁定:

若是沒有指定routingkey,消息隊列的名稱就是routingkey

在Spring Boot建立單元測試

注意在廣播模式下會忽略忽略routing_key

發送消息

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void send() {
    Book book = new Book();
    book.setName("<西遊記>");
    book.setAnthony("吳承恩");
    //對象被默認序列化後發送
    rabbitTemplate.convertAndSend("demo-fanout","",book);
}

接收消息

@Test
public void receive(){
    Object o = rabbitTemplate.receiveAndConvert("demo");
    System.out.println(o.getClass());
    System.out.println(o);
}

打印輸出:

class cn.zyzpp.rabbitmq.entity.Book
Book{name='<西遊記>', anthony='吳承恩'}

5.發送topic消息

按規則轉發消息(使用通配符)

1.在rabbitmq後臺新建一個topic交換機

2.新建一個消息隊列demo

3.交換機綁定消息隊列,注意此處的路由鍵Routing key使用了通配符

  • *表示一個詞.
  • #表示零個或多個詞.

那咱們如何區分幾個字母爲一個單詞呢?

答案是經過」點分」的 routing_key 形式,好比兩個單詞是 *.demo hello.demo

若是路由鍵爲demo.#,那能夠匹配demo.開頭的全部路由鍵。

4.查看此時的交換機

在Spring Boot建立單元測試

發送消息

@Test
public void contextLoads() {
    //message須要本身構造一個,定義消息體內容和消息體
    //rabbitTemplate.send(exchange,routingkey,message);
    Map map = new HashMap();
    map.put("key","topic交換機");
    map.put("msg",true);
    //對象被默認序列化後發送
    rabbitTemplate.convertAndSend("demo-topic","demo.hello",map);
}

接收消息

@Test
public void receive(){
    Object o = rabbitTemplate.receiveAndConvert("demo");
    System.out.println(o.getClass());
    System.out.println(o);
}

打印輸出

class java.util.HashMap
{msg=true, key=topic交換機}

6.代碼建立交換機與消息隊列

1.上面演示的是經過RabbitMQ網頁後臺建立,經過編程的方式也很是簡單:

@Autowired
AmqpAdmin amqpAdmin;

/**
 * 代碼建立交換機與消息隊列並綁定
 */
@Test
public void createExChange(){
      new TopicExchange("topic.exChange");
      new FanoutExchange("fanout.exChange");
    amqpAdmin.declareExchange(new DirectExchange("amqp.exChange"));//建立交換機(remove爲刪除交換機)
    System.out.println("單播交換機建立完成");
    amqpAdmin.declareQueue(new Queue("amqp.queue",true)); //建立消息隊列
    amqpAdmin.declareBinding(new Binding("amqp.queue",
            Binding.DestinationType.QUEUE,"amqp.exChange","amqp.exChange",null));//綁定
}

2.登陸後臺查看,建立成功!

7.監聽消息隊列

監聽消息隊列,當有消息發到消息隊列,立馬獲取並操做。方法也很簡單。

1.開啓基於註解的Rabbit模式

@EnableRabbit   //開啓基於註解的Rabbit模式
@SpringBootApplication
public class RabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }
}

2.@RabbitListener註解實現監聽

注意此時你的消息隊列已經有了demodemo.news

@Service
public class BookService {

    @RabbitListener(queues = "demo")
    public void receive(Book book){
        System.out.println("收到消息:"+ book);
    }

    @RabbitListener(queues = "demo.news")
    public void receiveMess(Message message){
        System.out.println("收到消息:"+ message);
        System.out.println(message.getBody()); //getBody返回的byte[]字節數組
        System.out.println(message.getMessageProperties());
    }
}

而後咱們在開啓主程序時再運行測試用例:

測試用例

@Test
public void send() {
    Book book = new Book();
    book.setName("<西遊記>");
    book.setAnthony("吳承恩");
    //對象被默認序列化後發送
    rabbitTemplate.convertAndSend("demo-fanout","",book);
}

查看主控制檯打印

收到消息:Book{name='<西遊記>', anthony='吳承恩'}

測試用例

@Test
public void contextLoads() {
    //message須要本身構造一個,定義消息體內容和消息體
    //rabbitTemplate.send(exchange,routingkey,message);
    Map map = new HashMap();
    map.put("key","topic交換機");
    map.put("msg",true);
    //對象被默認序列化後發送
    rabbitTemplate.convertAndSend("demo-topic","demo.hello",map);
}

查看主控制檯打印

收到消息:(Body:'{"msg":true,"key":"topic交換機"}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news])
[B@5332f99e
MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news]

版權聲明

【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】

相關文章
相關標籤/搜索