基於springboot整合的rabbitmq

技術:springboot1.5.2 + maven3.0.5 + rabbitmq3.7.13 + jdk1.8
 

概述

RabbitMQ是對高級消息隊列協議(Advanced Message Queueing Protocol, AMQP)的實現,RabbitMQ是消息傳輸的中間者,能夠把它當作是一個消息代理,你把消息傳送給它,它再把消息發送給具體的接收人。 這就像是郵局同樣,你把郵件放入郵箱當中,郵件員會把郵件發送給你的收件人。不一樣的是RabbitMQ是接受,存儲和轉發二進制數據塊——消息。

詳細

RabbitMQ官方解釋:

消息系統容許軟件、應用相互鏈接和擴展。這些應用能夠相互連接起來組成一個更大的應用,或者將用戶設備和數據html

 

進行鏈接。消息系統經過將消息的發送和接收分離來實現應用程序的異步和解偶。
咱們白話文的理解就是:是一個消息代理 - 一個消息系統的媒介。它能夠爲你的應用提供一個通用的消息發送和接收平臺,而且保證消息在傳輸過程當中的安全。java

 


1、RabbitMQ模型簡介
spring

AMQP 的工做過程以下圖:消息(message)被髮布者(publisher)發送給交換機(exchange),交換機經常被比喻成郵局或者郵箱。而後交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最後AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。數據庫

image.png

2、RabbitMQ 交換機:json

Name Default pre-declared names
Direct exchange (Empty string) and amq.direct
Fanout exchange amq.fanout
Topic exchange amq.topic
Headers exchange amq.match (and amq.headers in RabbitMQ)

1. 默認交換機:windows

default exchange其實是一個由消息代理預先聲明好的沒有名字(名字爲空字符串)的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
如:當你聲明瞭一個名爲"search-indexing-online"的隊列,AMQP代理會自動將其綁定到默認交換機上,綁定(binding)的路由鍵名稱也是爲"search-indexing-online"。所以,當攜帶着名爲"search-indexing-online"的路由鍵的消息被髮送到默認交換機的時候,此消息會被默認交換機路由至名爲"search-indexing-online"的隊列中。換句話說,默認交換機看起來貌似可以直接將消息投遞給隊列,儘管技術上並無作相關的操做。數組

2.Direct 直連交換機:安全

直連型交換機(direct exchange)是根據消息攜帶的路由鍵(routing key)將消息投遞給對應隊列的。直連交換機用來處理消息的單播路由(unicast routing)(儘管它也能夠處理多播路由)。springboot


image.png

 

 

3.fanout扇形交換機:服務器

扇型交換機(funout exchange)將消息路由給綁定到它身上的全部隊列,而不理會綁定的路由鍵。若是N個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會將消息的拷貝分別發送給這全部的N個隊列。

image.png

 

4.topic 主題交換機:

主題交換機(topic exchanges)經過對消息的路由鍵和隊列到交換機的綁定模式之間的匹配,將消息路由給一個或多個隊列。主題交換機常常用來實現各類分發/訂閱模式及其變種。主題交換機一般用來實現消息的多播路由(multicast routing)。

5.head交換機:

有時消息的路由操做會涉及到多個屬性,此時使用消息頭就比用路由鍵更容易表達,頭交換機(headers exchange)就是爲此而生的。頭交換機使用多個消息屬性來代替路由鍵創建路由規則。經過判斷消息頭的值可否與指定的綁定相匹配來確立路由規則。

2、隊列

AMQP中的隊列(queue)跟其餘消息隊列或任務隊列中的隊列是很類似的:它們存儲着即將被應用消費掉的消息。隊列跟交換機共享某些屬性,可是隊列也有一些另外的屬性。

  • Name

  • Durable(消息代理重啓後,隊列依舊存在)

  • Exclusive(只被一個鏈接(connection)使用,並且當鏈接關閉後隊列即被刪除)

  • Auto-delete(當最後一個消費者退訂後即被刪除)

  • Arguments(一些消息代理用他來完成相似與TTL的某些額外功能)

隊列在聲明(declare)後才能被使用。若是一個隊列尚不存在,聲明一個隊列會建立它。若是聲明的隊列已經存在,而且屬性徹底相同,那麼這次聲明不會對原有隊列產生任何影響。若是聲明中的屬性與已存在隊列的屬性有差別,那麼一個錯誤代碼爲406的通道級異常就會被拋出。

1.隊列名稱

隊列的名字能夠由應用(application)來取,也可讓消息代理(broker)直接生成一個。隊列的名字能夠是最多255字節的一個utf-8字符串。若但願AMQP消息代理生成隊列名,須要給隊列的name參數賦值一個空字符串:在同一個通道(channel)的後續的方法(method)中,咱們可使用空字符串來表示以前生成的隊列名稱。之因此以後的方法能夠獲取正確的隊列名是由於通道能夠默默地記住消息代理最後一次生成的隊列名稱。

以"amq."開始的隊列名稱被預留作消息代理內部使用。若是試圖在隊列聲明時打破這一規則的話,一個通道級的403 (ACCESS_REFUSED)錯誤會被拋出。

 

2.隊列持久化

持久化隊列(Durable queues)會被存儲在磁盤上,當消息代理(broker)重啓的時候,它依舊存在。沒有被持久化的隊列稱做暫存隊列(Transient queues)。並非全部的場景和案例都須要將隊列持久化。

持久化的隊列並不會使得路由到它的消息也具備持久性。假若消息代理掛掉了,從新啓動,那麼在重啓的過程當中持久化隊列會被從新聲明,不管怎樣,只有通過持久化的消息才能被從新恢復。

3.綁定

綁定(Binding)是交換機(exchange)將消息(message)路由給隊列(queue)所需遵循的規則。若是要指示交換機「E」將消息路由給隊列「Q」,那麼「Q」就須要與「E」進行綁定。綁定操做須要定義一個可選的路由鍵(routing key)屬性給某些類型的交換機。路由鍵的意義在於從發送給交換機的衆多消息中選擇出某些消息,將其路由給綁定的隊列。

打個比方:

  • 隊列(queue)是咱們想要去的位於紐約的目的地

  • 交換機(exchange)是JFK機場

  • 綁定(binding)就是JFK機場到目的地的路線。可以到達目的地的路線能夠是一條或者多條

擁有了交換機這個中間層,不少由發佈者直接到隊列難以實現的路由方案可以得以實現,而且避免了應用開發者的許多重複勞動。

若是AMQP的消息沒法路由到隊列(例如,發送到的交換機沒有綁定隊列),消息會被就地銷燬或者返還給發佈者。如何處理取決於發佈者設置的消息屬性。

 

4.消費者

消息若是隻是存儲在隊列裏是沒有任何用處的。被應用消費掉,消息的價值纔可以體現。在AMQP 模型中,有兩種途徑能夠達到此目的:

  • 將消息投遞給應用 ("push API")

  • 應用根據須要主動獲取消息 ("pull API")

使用push API,應用(application)須要明確表示出它在某個特定隊列裏所感興趣的,想要消費的消息。如是,咱們能夠說應用註冊了一個消費者,或者說訂閱了一個隊列。一個隊列能夠註冊多個消費者,也能夠註冊一個獨享的消費者(當獨享消費者存在時,其餘消費者即被排除在外)。

每一個消費者(訂閱者)都有一個叫作消費者標籤的標識符。它能夠被用來退訂消息。消費者標籤其實是一個字符串。

5.消息確認

消費者應用(Consumer applications) - 用來接受和處理消息的應用 - 在處理消息的時候偶爾會失敗或者有時會直接崩潰掉。並且網絡緣由也有可能引發各類問題。這就給咱們出了個難題,AMQP代理在何時刪除消息纔是正確的?AMQP 0-9-1 規範給咱們兩種建議:

  • 當消息代理(broker)將消息發送給應用後當即刪除。(使用AMQP方法:basic.deliver或basic.get-ok)

  • 待應用(application)發送一個確認回執(acknowledgement)後再刪除消息。(使用AMQP方法:basic.ack)

前者被稱做自動確認模式(automatic acknowledgement model),後者被稱做顯式確認模式(explicit acknowledgement model)。在顯式模式下,由消費者應用來選擇何時發送確認回執(acknowledgement)。應用能夠在收到消息後當即發送,或將未處理的消息存儲後發送,或等到消息被處理完畢後再發送確認回執(例如,成功獲取一個網頁內容並將其存儲以後)。

若是一個消費者在還沒有發送確認回執的狀況下掛掉了,那AMQP代理會將消息從新投遞給另外一個消費者。若是當時沒有可用的消費者了,消息代理會死等下一個註冊到此隊列的消費者,而後再次嘗試投遞。

6.拒絕消息

當一個消費者接收到某條消息後,處理過程有可能成功,有可能失敗。應用能夠向消息代理代表,本條消息因爲「拒絕消息(Rejecting Messages)」的緣由處理失敗了(或者未能在此時完成)。當拒絕某條消息時,應用能夠告訴消息代理如何處理這條消息——銷燬它或者從新放入隊列。當此隊列只有一個消費者時,請確認不要因爲拒絕消息而且選擇了從新放入隊列的行爲而引發消息在同一個消費者身上無限循環的狀況發生。

Negative Acknowledgements

在AMQP中,basic.reject方法用來執行拒絕消息的操做。但basic.reject有個限制:你不能使用它決絕多個帶有確認回執(acknowledgements)的消息。可是若是你使用的是RabbitMQ,那麼你可使用被稱做negative acknowledgements(也叫nacks)的AMQP 0-9-1擴展來解決這個問題。更多的信息請參考幫助頁面

7.預取消息

在多個消費者共享一個隊列的案例中,明確指定在收到下一個確認回執前每一個消費者一次能夠接受多少條消息是很是有用的。這能夠在試圖批量發佈消息的時候起到簡單的負載均衡和提升消息吞吐量的做用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,若是生產應用每分鐘才發送一條消息,這說明處理工做尚在運行。)

注意,RabbitMQ只支持通道級的預取計數,而不是鏈接級的或者基於大小的預取。

8.消息屬性和有效載荷(消息主體)

AMQP模型中的消息(Message)對象是帶有屬性(Attributes)的。有些屬性及其常見,以致於AMQP明確的定義了它們,而且應用開發者們無需費心思思考這些屬性名字所表明的具體含義。例如:

  • Content type(內容類型)

  • Content encoding(內容編碼)

  • Routing key(路由鍵)

  • Delivery mode (persistent or not)
    投遞模式(持久化 或 非持久化)

  • Message priority(消息優先權)

  • Message publishing timestamp(消息發佈的時間戳)

  • Expiration period(消息有效期)

  • Publisher application id(發佈應用的ID)

有些屬性是被AMQP代理所使用的,可是大多數是開放給接收它們的應用解釋器用的。有些屬性是可選的也被稱做消息頭(headers)。他們跟HTTP協議的X-Headers很類似。消息屬性須要在消息被髮布的時候定義。

AMQP的消息除屬性外,也含有一個有效載荷 - Payload(消息實際攜帶的數據),它被AMQP代理看成不透明的字節數組來對待。消息代理不會檢查或者修改有效載荷。消息能夠只包含屬性而不攜帶有效載荷。它一般會使用相似JSON這種序列化的格式數據,爲了節省,協議緩衝器和MessagePack將結構化數據序列化,以便以消息的有效載荷的形式發佈。AMQP及其同行者們一般使用"content-type" 和 "content-encoding" 這兩個字段來與消息溝通進行有效載荷的辨識工做,但這僅僅是基於約定而已。

消息可以以持久化的方式發佈,AMQP代理會將此消息存儲在磁盤上。若是服務器重啓,系統會確認收到的持久化消息未丟失。簡單地將消息發送給一個持久化的交換機或者路由給一個持久化的隊列,並不會使得此消息具備持久化性質:它徹底取決與消息自己的持久模式(persistence mode)。將消息以持久化方式發佈時,會對性能形成必定的影響(就像數據庫操做同樣,健壯性的存在一定形成一些性能犧牲)。

9.消息確認

因爲網絡的不肯定性和應用失敗的可能性,處理確認回執(acknowledgement)就變的十分重要。有時咱們確認消費者收到消息就能夠了,有時確認回執意味着消息已被驗證而且處理完畢,例如對某些數據已經驗證完畢而且進行了數據存儲或者索引操做。

這種情形很常見,因此 AMQP 內置了一個功能叫作 消息確認(message acknowledgements),消費者用它來確認消息已經被接收或者處理。若是一個應用崩潰掉(此時鏈接會斷掉,因此AMQP代理亦會得知),並且消息的確認回執功能已經被開啓,可是消息代理還沒有得到確認回執,那麼消息會被重新放入隊列(而且在還有還有其餘消費者存在於此隊列的前提下,當即投遞給另一個消費者)。

協議內置的消息確認功能將幫助開發者創建強大的軟件。

 

3、準備工做(windows10環境下的RabbitMQ安裝步驟)

 

第一步:下載並安裝erlang

  • 緣由:RabbitMQ服務端代碼是使用併發式語言Erlang編寫的,安裝Rabbit MQ的前提是安裝Erlang。

  • 下載地址:http://www.erlang.org/downloads

image.png

 

  • 安裝完事兒後要記得配置一下系統的環境變量。

此電腦-->鼠標右鍵「屬性」-->高級系統設置-->環境變量-->「新建」系統環境變量

image.png

 

變量名:ERLANG_HOME

變量值就是剛纔erlang的安裝地址,點擊肯定。

而後雙擊系統變量path

image.png

 

點擊「新建」,將%ERLANG_HOME%\bin加入到path中。

最後windows鍵+R鍵,輸入cmd,再輸入erl,看到版本號就說明erlang安裝成功了。

 

image.png

 

第二步:下載並安裝RabbitMQ

下載地址:http://www.rabbitmq.com/download.html

image.png

 

雙擊下載後的.exe文件,安裝過程與erlang的安裝過程相同。

RabbitMQ安裝好後接下來安裝RabbitMQ-Plugins。打開命令行cd,輸入RabbitMQ的sbin目錄。

個人目錄是:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin

而後在後面輸入rabbitmq-plugins enable rabbitmq_management命令進行安裝

 

image.png

 

打開sbin目錄,雙擊rabbitmq-server.bat


image.png

等幾秒鐘看到這個界面後,訪問http://localhost:15672

而後能夠看到以下界面

image.png

默認用戶名和密碼都是guest,登錄便可。

4、程序實現

1.建立rabbitmqconfig配置文件類:

package com.zxh.config;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

@EnableRabbit
@Configuration
public class RabbitMqConfig {
    public static final String EXCHANGE = "spring.boot.direct";
    public static final String ROUTINGKEY_FAIL = "spring.boot.routingKey.failure";
    public static final String ROUTINGKEY = "spring.boot.routingKey";
    public static final String QUEUE_NAME = "spring.demo";
    public static final String QUEUE_NAME_FAIL = "spring.demo.failure";

    //RabbitMQ的配置信息
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private Integer port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    //創建一個鏈接容器,類型數據庫的鏈接池
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);// 確認機制
//        connectionFactory.setPublisherReturns(true);
        //發佈確認,template要求CachingConnectionFactory的publisherConfirms屬性設置爲true
        return connectionFactory;
    }

    // RabbitMQ的使用入口
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype類型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        template.setMessageConverter(this.jsonMessageConverter());
        template.setMandatory(true);
        return template;
    }

    /**
     * 交換機
     * 針對消費者配置
     * FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念
     * HeadersExchange :經過添加屬性key-value匹配
     * DirectExchange:按照routingkey分發到指定隊列
     * DirectExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 隊列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); //隊列持久

    }
    @Bean
    public Queue queueFail() {
        return new Queue(QUEUE_NAME_FAIL, true); //隊列持久

    }


    /**
     * 綁定
     *
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY);
    }
    @Bean
    public Binding bindingFail(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queueFail()).to(exchange()).with(RabbitMqConfig.ROUTINGKEY_FAIL);
    }


    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

//    @Bean
//    public CharacterEncodingFilter characterEncodingFilter() {
//        CharacterEncodingFilter filter = new CharacterEncodingFilter();
//        filter.setEncoding("UTF-8");
//        filter.setForceEncoding(true);
//        return filter;
//    }

}

 

2.生產者推送消息

package com.zxh.service;

import com.zxh.config.RabbitMqConfig;
import com.zxh.pojo.User;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;

import java.util.List;
import java.util.UUID;

@Service
public class UserService {
	
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private RabbitTemplate template;

    /**
     * 增長用戶
     *
     */
    public boolean addPerson(User user) throws Exception {
        Assert.notNull(user, "添加對象信息不能爲空");

        Assert.hasText(user.getUserId(), "添加對象信息用戶編號不能爲空");
        Assert.notNull(user.getAge(), "添加對象信息年齡不能爲空");

        template.convertAndSend(RabbitMqConfig.EXCHANGE, RabbitMqConfig.ROUTINGKEY, user.toString());
//        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//            @Override
//            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//                if (!ack) {
//                    logger.info("send message failed: " + cause); //+ correlationData.toString());
//                    throw new RuntimeException("send error " + cause);
//                } else {
//                    logger.info("send to broke ok" + correlationData.getId());
//                }
//            }
//        });

        return true;
    }

    private Message buildMessage(User user) throws Exception {
        Message message = MessageBuilder.withBody(user.toString().getBytes())
                .setMessageId(UUID.randomUUID().toString()).setContentType("application/json").build();
        return message;
    }


}

 

3.消費者訂閱消息

package com.zxh.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class UserTopicRecive {

	@RabbitListener(queues="spring.demo")
	public void process(String user) throws InterruptedException {
		System.out.println("TopicRecive1接受的消息: "+user);
	}
}

 

5、程序演示

image.png

image.png

6、項目結構圖

 

image.png

 

7、小結 - RabbitMQ的工做流程介紹

 

一、創建信息。Publisher定義須要發送消息的結構和內容。

 

二、創建Conection和Channel。由Publisher和Consumer建立鏈接,鏈接到Broker的物理節點上,同時創建Channel。Channel是創建在Connection之上的,一個Connection能夠創建多個Channel。Publisher鏈接Virtual Host 創建Channel,Consumer鏈接到相應的Queue上創建Channel。

 

三、聲明交換機和隊列。聲明一個消息交換機(Exchange)和隊列(Queue),並設置相關屬性。

 

四、發送消息。由Publisher發送消息到Broker中的Exchange中

 

五、路由轉發。RabbitMQ收到消息後,根據消息指定的Exchange(交換機) 來查找Binding(綁定) 而後根據規則(Routing Key)分發到不一樣的Queue。這裏就是說使用Routing Key在消息交換機(Exchange)和消息隊列(Queue)中創建好綁定關係,而後將消息發送到綁定的隊列中去。

 

六、消息接收。Consumer監聽相應的Queue,一旦Queue中有能夠消費的消息,Queue就將消息發送給Consumer端。

 

七、消息確認。當Consumer完成某一條消息的處理以後,須要發送一條ACK消息給對應的Queue。

 

Consumer收到消息時須要顯式的向RabbitMQ Broker發送basic.ack消息或者Consumer訂閱消息時設置auto_ack參數爲true。

在通訊過程當中,隊列對ACK的處理有如下幾種狀況:

 

若是Consumer接收了消息,發送ack,RabbitMQ會刪除隊列中這個消息,發送另外一條消息給Consumer。

若是Consumer接收了消息, 但在發送ack以前斷開Channel,RabbitMQ會認爲這條消息沒有被deliver(遞送),若是有其餘的Channel,會該消息將被髮送給另外的Channel。若是沒有,當在Consumer再次鏈接的時候,這條消息會被redeliver(從新遞送)。

若是consumer接收了消息,可是忘記了ack,RabbitMQ不會重複發送消息。

新版RabbitMQ還支持Consumer reject某條(類)消息,能夠經過設置requeue參數中的reject爲true達到目地,那麼Consumer將會把消息發送給下一個註冊的Consumer。

八、關閉消息通道(channel)以及和服務器的鏈接。

 



 

 

注:本文著做權歸做者,由demo大師發表,拒絕轉載,轉載須要做者受權

相關文章
相關標籤/搜索