springboot(十)SpringBoot消息中間件RabbitMQ

 github地址:https://github.com/showkawa/springBoot_2017/tree/master/spb-demo/spb-brian-query-service

1.RabbitMQ簡介

是AMQP(Advanced Message Queue Protocol)的開源實現java

Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
​
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
​
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不一樣類型的Exchange轉發消息的策略有所區別
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
​
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
Exchange 和Queue的綁定能夠是多對多的關係。
​
Connection
網絡鏈接,好比一個TCP鏈接。
​
Channel
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
​
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
​
Broker
表示消息隊列服務器實體

 

2.RabbitMQ運行機制

2.1 Exchange類型git

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵, headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:github

2.1.1 Direct Exchangeweb

消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。
路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,
不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。

2.1.2 Fanout Exchangespring

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。
fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。
很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

2.1.3 Topic Exchangedocker

topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。
它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「*」。
#匹配0個或多個單詞,*匹配一個單詞。

3.RabbitMQ安裝

基於docker的國內鏡像安裝(3-management帶管理界面的rabbitmq)
> docker pull registry.docker-cn.com/library/rabbitmq:3-management
啓動rabbitmq(-d 後臺啓動 -p 端口映射 5672 鏈接rabbirmq的端口 15672訪問rabbitmq web管理界面的端口)
> docker run -d -p 5672:5672 -p 15672:15672 --name brianRabbitMQ xxxxxx(鏡像name或者鏡像ID)

rabbitmq的管理web訪問url: ip:15672,默認的帳戶密碼guest/guest服務器

4.RabbitTemplate發送接受消息&序列化機制

4.1 引用依賴網絡

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2 AmqpAdmin建立和刪除 Queue, Exchange,Bindingspring-boot

ManageMQService.java性能

package com.kawa.mq;
​
import com.kawa.config.Contents;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
​
@Service
public class ManageMQService {
​
    @Autowired
    AmqpAdmin amqpAdmin;
​
    public void createExchange(String exchangeName,String mqType){
        if(mqType.equals(Contents.DIRECT_EXCHANGE)){
            amqpAdmin.declareExchange(new DirectExchange(exchangeName));
        }
        if(mqType.equals(Contents.FANOUT_EXCHANGE)){
            amqpAdmin.declareExchange(new FanoutExchange(exchangeName));
        }
        if(mqType.equals(Contents.TOPIC_EXCHANGE)){
            amqpAdmin.declareExchange(new TopicExchange(exchangeName));
        }
    }
​
    public void removeExchange(String exchangeName){
            amqpAdmin.deleteExchange(exchangeName);
    }
​
    public void createQueue(String queueName){
        amqpAdmin.declareQueue(new Queue(queueName,true));
    }
​
    public void removeQueue(String queueName){
        amqpAdmin.deleteQueue(queueName);
    }
​
    public void createBinding(String queueName, String exchangeName, String routingKey){
        amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null));
    }
​
    public void removeBinding(String queueName, String exchangeName, String routingKey){
        amqpAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null));
    }
}

4.3 使用RabbitTemplate發送消息

SendMessageService.java

package com.kawa.mq;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class SendMessageService {

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange,String routingKey,Object obj){
        //Message須要本身構造一個;定義消息體和消息頭
        //rabbitTemplate.send(exchange,routingKey,message);

        //object默認當成消息體,只須要傳入發送對象,自動序列化發送給rabbitmq
        rabbitTemplate.convertAndSend(exchange,routingKey,obj);
    }

}

4.4 測試建立 Queue, Exchange,Binding和發送消息

SpbDemoApplicationTests.java

 @Test
    public void sendMessage() {
        manageMQService.createQueue("brian.test");
        manageMQService.createExchange("brian",Contents.DIRECT_EXCHANGE);
        manageMQService.createBinding("brian.test","brian","mymq");
        Brian brian = new Brian();
        User user = new User();
        user.setId((long) 12345678);
        user.setUsername("cassiel");
        user.setPassword("#fyds");
        List<String> list = new ArrayList<>();
        list.add("我");
        list.add("愛");
        list.add("你");
        list.add("中");
        list.add("國");
        Map<String,Object> map = new HashMap<>();
        map.put("123","包郵");
        brian.setKawadate(new Date());
        brian.setLists(list);
        brian.setObj(map);
        brian.setUser(user);
        sendMessageService.sendMessage("brian","mymq",brian);
    }

查看結果

4.5 添加@RabbitListener監聽和處理消息

在使用RabbitListener註解接收消息時,須要在啓動類上加上數據@EnableRabbit

BrianService.java

package com.kawa.sercice;


import com.kawa.pojo.Brian;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class BrianService {

    @RabbitListener(queues = "brian.test")
    public void receiveMessage(Brian brian){
        System.out.println("接收到的消息體:" + brian);
    }
}

4.6 啓動工程查看測試結果

查看到隊列裏面消息已經沒有了

相關文章
相關標籤/搜索