RabbitMq 初步

RabbitMQ的工做原理spring

它的基本結構異步

組成部分說明以下:測試

Broker:消息隊列服務進程,此進程包括兩個部分:Exchange和Queue。ui

Exchange:消息隊列交換機,按必定的規則將消息路由轉發到某個隊列,對消息進行過慮。spa

Queue:消息隊列,存儲消息的隊列,消息到達隊列並轉發給指定的消費方。code

Producer:消息生產者,即生產方客戶端,生產方客戶端將消息發送到MQ。orm

Consumer:消息消費者,即消費方客戶端,接收MQ轉發的消息。blog

 

Maven舉例配置rabbitmq

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>

生產者舉例Demo隊列

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啓以後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

 

消費者舉例Demo

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";

    //聲明交換機
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        //durable(true) 持久化,mq重啓以後交換機還在
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }

    //聲明QUEUE_INFORM_EMAIL隊列
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    //聲明QUEUE_INFORM_SMS隊列
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }

    //ROUTINGKEY_EMAIL隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS隊列綁定交換機,指定routingKey
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
}

工做模式 

RabbitMQ有如下幾種工做模式 :

一、Work queues

二、Publish/Subscribe

三、Routing

四、Topics

五、Header

六、RPC

 

Work queues

work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。

應用場景:對於 任務太重或任務較多狀況使用工做隊列能夠提升任務處理的速度。

測試:

一、使用入門程序,啓動多個消費者。

二、生產者發送多個消息。

結果:

一、一條消息只會被一個消費者接收;

二、rabbit採用輪詢的方式將消息是平均發送給消費者的;

三、消費者在處理完某條消息後,纔會收到下一條消息。

 

Publish/subscribe 發佈訂閱模式

發佈訂閱模式:

一、每一個消費者監聽本身的隊列。

二、生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每一個隊列,每一個綁定交換機的隊列都將接收

到消息

 

Routin

路由模式:

一、每一個消費者監聽本身的隊列,而且設置routingkey。

二、生產者將消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。

這是一種很是靈活的模式,常常被用到

 

Topics

 

 

路由模式:

一、每一個消費者監聽本身的隊列,而且設置帶統配符的routingkey。

二、生產者將消息發給broker,由交換機根據routingkey來轉發消息到指定的隊列。

 

Header模式

header模式與routing不一樣的地方在於,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配

隊列。

案例:

根據用戶的通知設置去通知用戶,設置接收Email的用戶只接收Email,設置接收sms的用戶只接收sms,設置兩種

通知類型都接收的則兩種通知都有效。

 

生產者Demo:

 

Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms");
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

通知Demo :

String message = "email inform to user"+i;
Map<String,Object> headers = new Hashtable<String, Object>();
headers.put("inform_type", "email");//匹配email通知消費者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

發送郵件消費者 : 

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機和隊列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費隊列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

 

RPC

 

 

RPC即客戶端遠程調用服務端的方法 ,使用MQ能夠實現RPC的異步調用,基於Direct交換機實現,流程以下:

一、客戶端便是生產者就是消費者,向RPC請求隊列發送RPC調用消息,同時監聽RPC響應隊列。

二、服務端監聽RPC請求隊列的消息,收到消息後執行服務端的方法,獲得方法返回的結果

三、服務端將RPC方法 的結果發送到RPC響應隊列

四、客戶端(RPC調用方)監聽RPC響應隊列,接收到RPC調用結果。

相關文章
相關標籤/搜索