SpringBoot整合RabbitMQ(一)快速入門

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取隊列中的消息。消息中間件最主要的做用是解耦,中間件最標準的用法是生產者生產消息傳送到隊列,消費者從隊列中拿取消息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產消息,從而達到解耦的目的。在分佈式的系統中,消息隊列也會被用在不少其它的方面,好比:分佈式事務的支持,RPC的調用等等。java

RabbitMQ介紹

RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。spring

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。安全

RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。springboot

相關概念

一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。bash

那麼,其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。服務器

  • 虛擬主機:一個虛擬主機持有一組交換機、隊列和綁定。爲何須要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。 所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。
  • 交換機:Exchange 用於轉發消息,可是它不會作存儲 ,若是沒有 Queue bind 到 Exchange 的話,它會直接丟棄掉 Producer 發送過來的消息。 這裏有一個比較重要的概念:路由鍵 。消息到交換機的時候,交互機會轉發到對應的隊列中,那麼究竟轉發到哪一個隊列,就要根據該路由鍵。
  • 綁定:也就是交換機須要和隊列相綁定,這其中如上圖所示,是多對多的關係。

四種交換機(Exchange)

交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout架構

1. Direct Exchange

direct 類型的行爲是"先匹配, 再投送". 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.是RabbitMQ默認的交換機模式,也是最簡單的模式,根據key全文匹配去尋找隊列。分佈式

配置:設置一個路由鍵spring-boot

public  static  final String QUEUE="queue";
    /**
     * direct 交換機模式
     */
    @Bean
    public Queue queue(){
        return new Queue(QUEUE,true);
    }

複製代碼

發送服務:測試

@Service
@Slf4j
public class MQSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(Object message){
        String msg = (String) message;
        log.info("send msg"+message);
        amqpTemplate.convertAndSend(MQConfig.QUEUE,msg);
    }
}
複製代碼

接收服務:

@Service
@Slf4j
public class MQReceiver {

    //監聽的queue
    @RabbitListener(queues = MQConfig.QUEUE)
    public void receive(String msg){
        log.info("receive msg "+msg);
    }
}
複製代碼

測試:

@Autowired
    private MQSender sender;

    sender.send("hello direct Exchange");
複製代碼

2. Topic Exchange

按規則轉發消息(最靈活) 轉發消息主要是根據通配符。 在這種交換機下,隊列和交換機的綁定會定義一種路由模式,那麼,通配符就要在這種路由模式和路由鍵之間匹配後交換機才能轉發消息。

路由鍵必須是一串字符,用句號(.) 隔開,

路由模式必須包含一個 星號(*),主要用於匹配路由鍵指定位置的一個單詞, 井號(#)就表示至關於一個或者多個單詞

配置類:

public  static  final String TOPIC_QUEUE1="topic.queue1";
    public  static  final String TOPIC_QUEUE2="topic.queue2";
    public  static  final String ROUTING_KEY1="topic.key1";
    public  static  final String ROUTING_KEY2="topic.#";
    /**
     * Topic 交換機模式  能夠用通配符
     */
    @Bean
    public Queue topicQueue1(){
        return new Queue(TOPIC_QUEUE1,true);
    }
    @Bean
    public Queue topicQueue2(){
        return new Queue(TOPIC_QUEUE2,true);
    }

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(ROUTING_KEY1);
    }
    @Bean
    public Binding topicBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(ROUTING_KEY2);
    }
複製代碼

發送類:

public void sendTopic(Object message){
        String msg = (String) message;
        log.info("send topic message"+msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key1",msg+"1");
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE,"topic.key2",msg+"2");
    }
複製代碼

接收類:

@RabbitListener(queues = MQConfig.TOPIC_QUEUE1)
    public void receiveTopic1(String msg){
        log.info("receive topic1 msg "+msg);
    }
複製代碼

測試:

@Autowired
    private MQSender sender;

    sender.sendTopic("hello topic Exchange");
複製代碼

3. Headers Exchange

設置header attribute參數類型的交換機,相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的類型. 在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或所有匹配時, 消息被投送到對應隊列.

public static final String HEADER_EXCHANGE="headerExchange";
    /**
     * Header 交換機模式
     */
    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange(HEADER_EXCHANGE);
    }
    @Bean
    public Queue headerQueue(){
        return new Queue(HEADER_QUEUE2,true);
    }
    // 綁定須要指定header,若是不匹配 則不能使用
    @Bean
    public Binding headerBinding(){
        Map<String,Object> map = new HashMap();
        map.put("header1","value1");
        map.put("header2","value2");
        return BindingBuilder.bind(headerQueue()).to(headersExchange()).whereAll(map).match();
    }
複製代碼
public void sendHeader(Object massage){
        String msg = (String) massage;
        log.info("send fanout message: "+msg);
        MessageProperties properties = new MessageProperties();
        properties.setHeader("header1","value1");
        properties.setHeader("header2","value2");
        Message obj = new Message(msg.getBytes(),properties);
        amqpTemplate.convertAndSend(MQConfig.HEADER_EXCHANGE,"",obj);
    }
複製代碼

用MessageProperties來添加Header信息,而後與接收者的header比對。我都設置的是"header1","value1";"header2","value2"

//監聽 header模式的queue
    @RabbitListener(queues = MQConfig.HEADER_QUEUE2)
    //由於發送的是 byte 類型,因此接受也是該數據類型
    public void receiveHeader(byte[] message){
        log.info("header queue message"+new String(message));
    }
複製代碼

測試:

@Autowired
    private MQSender sender;

    sender.sendHeader("hello header Exchange");
複製代碼

4. Fanout Exchange

轉發消息到全部綁定隊列,消息廣播的模式,無論路由鍵或者是路由模式,會把消息發給綁定給它的所有隊列,若是配置了routing_key會被忽略。

public static final String FANOUT_EXCHANGE="fanoutExchange";
    /**
     * Fanout 交換機模式(廣播模式),不用綁定key
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding fanoutBinding1(){
        return BindingBuilder.bind(topicQueue1()).to(fanoutExchange());
    }
    @Bean
    public Binding fanoutBinding2(){
        return BindingBuilder.bind(topicQueue2()).to(fanoutExchange());
    }

複製代碼
public void sendFanout(Object massage){
        String msg = (String) massage;
        log.info("send fanout message: "+msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE,"",msg);
    }
複製代碼

測試:

@Autowired
    private MQSender sender;

    sender.sendFanout("hello fanout Exchange");
複製代碼

補充

這個示例是基於springboot的示例。

pom依賴

<!--rabbbitMQ相關依賴-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
複製代碼

配置文件

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 這個帳號密碼只能鏈接本地的mq,遠程的話須要配置
    virtual-host: /
    listener:
      simple:
        concurrency: 10
        max-concurrency: 10
        prefetch: 1 # 從隊列每次取一個
        auto-startup: true
        default-requeue-rejected: true # 失敗後重試
複製代碼

後續會用它來實現一個小小的搶票功能。



若是你喜歡個人文章,那麻煩請關注個人公衆號PlayInJava,公衆號重點分析架構師技術,該公衆號還處於初始階段,謝謝你們的支持。

關注公衆號,回覆 java架構獲取架構視頻資源(後期還會分享不一樣的優質資源噢)。
相關文章
相關標籤/搜索