Spring Boot整合RabbitMQ實例

什麼是消息?java

消息是一個或者多個實體之間溝通的一種方式而且無處不在。linux

自從計算機發明以來,計算機以多種多樣的方式發送消息,消息定義了軟硬件或者應用程序之間的溝通方式。消息老是有一個發送者和多個接受者,消息有synchronous和asynchronous、pub-sub和peer-to-peer, RPC和enterprise-based, Message Broker, ESB (Enterprise Service Bus), MOM 
(Message Oriented Middleware)等。git

從這些咱們能夠確定,消息使分佈式溝通變得鬆耦合,意味着發送者如何發送和發送了什麼消息並不重要,接受者在消費消息時並不會告知發送者。github

Spring Boot RabbitMQweb

由於像Sun/Oracle/IBM的JMS,Microsoft的MSMQ,他們所使用的協議都是有全部權的,咱們也知道JMS僅僅只是提供了一些接口API,可是若是說要嘗試着去混合技術和編程語言去使用JMS,那這個就會至關的複雜。技術的不斷革新,給咱們帶來的一個結果就是有複雜的問題出現,就會有層出不窮的簡單的解決辦法被提出。在計算機消息領域,特別須要感謝JPMorgan團隊,由於他們發明了AMQP (Advance Message Queuing Protocol )協議。AMQP是爲MOM(Message Oriented Middleware)設計的一個開放式標準應用層協議。換句話說,你使用這個協議你就可使用任何技術或者語言。算法

在實現了AMQP協議的諸多消息系統當中,RabbitMQ是其中應用最普遍的一個之一,是MQ產品的典型表明,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。spring

本文主要介紹在Spring Boot中如何整合RabbitMQ並使用RabbitMQ。shell

安裝RabbitMQapache

本實例是在CentOS7上安裝RabbitMQ,不一樣的系統請經過搜索引擎查找不一樣的安裝方式。編程

一、安裝erlang 語言環境

  • 下載安裝
wget http://www.erlang.org/download/otp_src_20.1.tar.gz  //下載erlang包
tar -xzvf otp_src_20.1.tar.gz  //解壓
cd otp_src_20.1/  //切換到安裝路徑
./configure --prefix=/usr/local/erlang  //生產安裝配置
make && make install  //編譯安裝
  • 配置環境變量
vi /etc/profile

在底部添加以下內容

#set erlang environment
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

source /etc/profile  //生效

測試一下是否安裝成功,在控制檯輸入命令erl

erl

若是進入erlang的shell則證實安裝成功,退出便可。

注意:執行./configure –prefix=/usr/local/erlang命令時可能會報以下錯誤:

configure: error: No curses library functions found
configure: error: /bin/sh '/home/jiayi/otp_src_20.1/erts/configure' failed for erts

可嘗試執行以下命令解決:

yum -y install ncurses-devel

二、安裝RabbitMQ

  • 下載安裝
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xz  //下載RabbitMQ安裝包
xz -d rabbitmq-server-generic-unix-3.7.0.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.7.0.tar

解壓後多了個文件夾rabbitmq-server-3.7.0 ,重命名爲rabbitmq以便記憶。

mv rabbitmq_server-3.7.0/ rabbitmq
  • 配置rabbitmq環境變量
vi /etc/profile

在底部添加以下內容

#set rabbitmq environment
export PATH=$PATH:/data/rabbitmq/rabbitmq/sbin

source /etc/profile  //生效
  • 啓動服務
rabbitmq-server -detached //啓動rabbitmq,-detached表明後臺守護進程方式啓動。

查看狀態,若是顯示以下截圖說明安裝成功:

rabbitmqctl status

這裏寫圖片描述

其餘相關命令

啓動服務:rabbitmq-server -detached
查看狀態:rabbitmqctl status
關閉服務:rabbitmqctl stop
列出角色:rabbitmqctl list_users

配置網頁插件

首先建立目錄,不然可能報錯

mkdir /etc/rabbitmq

而後啓用插件

rabbitmq-plugins enable rabbitmq_management
  •  

配置防火牆

配置linux 端口 15672 網頁管理 5672 AMQP端口:

firewall-cmd --permanent --add-port=15672/tcp
firewall-cmd --permanent --add-port=5672/tcp
systemctl restart firewalld.service

如今你在瀏覽器中輸入服務器IP:15672 就能夠看到RabbitMQ的WEB管理頁面了,是否是很興奮,但是你沒有帳號密碼,別急。 
這裏寫圖片描述

配置訪問帳號密碼和權限

默認網頁是不容許訪問的,須要增長一個用戶修改一下權限,代碼以下

rabbitmqctl add_user superrd superrd  //添加用戶,後面兩個參數分別是用戶名和密碼,我這都用superrd了。
rabbitmqctl set_permissions -p / superrd ".*" ".*" ".*"  //添加權限
rabbitmqctl set_user_tags superrd administrator  //修改用戶角色
  • 1
  • 2
  • 3

而後就能夠遠程訪問了,而後可直接配置用戶權限等信息。 
這裏寫圖片描述

RabbitMQ/AMQP: Exchanges, Bindings和Queues

AMQP定義了三個很是容易理解的概念,分別是Exchanges、Bindings、Queues。

  • Exchanges:投遞消息到queue都是經由exchanges完成的,每一個exchange攜帶一個消息而且路由到Queues。

  • Bindings:消息的路由涉及到一個和exchanges類型及一些規則相關的算法,也就是說消息的路由須要遵照必定的規則,這些規則咱們就稱之爲Bindings。

  • Queues:Queues是Massage的落腳點和等待接收的地方。

AMQP定義了四個exchange類型,分別爲Direct, Fanout, Topic和Headers。下圖展現了不一樣exchange類型的特性。 
這裏寫圖片描述

雖然AMQP定義了不一樣的Exchange類型,可是其主要思想都是發送一個包含routing key的消息到exchange,而後exchange基於它本省的類型會分發消息到queue。若是沒有匹配到queue消息就會被扔到黑洞

default exchange將會自動與每一個已經建立的queue綁定。

direct exchange將會經過routing key與指定的queue綁定,你能夠看到這種exchange類型它是做爲一對一的綁定。

topic exchange和direct exchange比較相似,惟一的不一樣點就是在他的binding中你能夠在routing key中添加通配符。

headers exchange和topic exchange相似,惟一不一樣的地方就是binding是基於消息頭(message headers),這是一個很是強大的exchange,在消息頭裏面你能夠作全部事情且能夠添加任何表達式。

fanout exchange會拷貝消息發送給全部綁定的queues;這種exchange能夠做爲消息廣播。

本文經過使用默認exchange類型來描述如何整合Spring Boot與RabbitMQ。

一、構建消息消費者

配置pom.xml以下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.springboot</groupId>
    <artifactId>rabbitereceiver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>
    <name>rabbitereceiver</name>
    <description>Demo project for Spring Boot</description>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

在pom.xml中包含了spring-boot-starter-amqp starter pom,這個pom包含了全部連接到RabbitMQ中間件所須要的spring-amqp和rabbitmq-client庫。

新建一個Spring Boot項目,添加Consumer類並寫入以下代碼。

package com.springboot.rabbitereceiver.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);

    @RabbitListener(queues="${myqueue}")
    public void handler(String message){
        log.info("Consumer> " + message);
    }
}
  • @Component:這個註解把普通pojo實例化到spring容器中;

  • @RabbitListener:此註解標記的方法(該註解也能夠註解類)是爲全部接收到的消息所建立的處理器,意思就是說他能夠連接到RabbitMQ的queue上並建立一個監聽並將消息傳遞到該方法。這種監聽器能夠經過使用正確的消息轉換器盡它最大的努力將消息轉換成合適的類型。

在application.properties寫入以下內容。

myqueue=spring-boot
spring.rabbitmq.host=192.168.199.227
spring.rabbitmq.username=superrd
spring.rabbitmq.password=superrd
spring.rabbitmq.port=5672
spring.rabbitmq.dynamic=true

一、構建消息生產者

咱們在Spring Boot使用JDBC Template訪問數據一文中所建的項目的基礎上,將該項目改形成消息的生產者,咱們的需求是在每次刷新頁面時生產一條消息。

首先在pom.xml中添加amqp的依賴。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

新建rabbitmq包並增長Producer類,添加以下代碼。

package com.springboot.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    private static final Logger log = LoggerFactory.getLogger(Producer.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    public void sendTo(String routingkey,String message){
        log.info("Sending> ...");
        this.rabbitTemplate.convertAndSend(routingkey,message);
    }
}
  • @Autowired RabbitTemplate:RabbitTemplate是一個同步訪問RabbitMQ發送和接受的一個簡化的幫助類。

  • sendTo(routingKey,message):該方法有兩個參數routing Key和message,在本實例中,routing Key就是queue的名字。該方法使用rabbitTemplate實例調用convertAndSend方法接受routing key和message。message將會被路由到exchange,而後exchange會將消息路由到正確的queue。

在application.properties中寫入的內容同消費者一直,這裏再也不給出。

在JournalService中增長以下代碼:

@Value("${myqueue}")
String queue;

@Bean
Queue queue(){
        return new Queue(queue,false);
 }

@Autowired
Producer producer;

public void  sendMsg(String msg){
        producer.sendTo(queue,  msg+" at " + new Date());
 }
  • @Value:從application.properties取值;

  • @Bean Queue:實例化一個Queue類型的bean並使用提供的queue字符串建立一個Queue。Queue的構造函數接受queue的名稱和是否在服務重啓時持久化queue。

在頁面加載時,咱們就經過service調用sendMsg方法來生產一條消息。

@Controller
public class JournalController {
    @Autowired
    JournalService service;

    @RequestMapping("/")
    public String index(Model model){
        model.addAttribute("journal", service.findAll());

        service.sendMsg("Load index page");

        return "index";
    }
}

在啓動項目以前,經過RabbitMQ網頁管理工具添加一個queue。 
這裏寫圖片描述

啓動兩個項目,刷星web頁面,在消息消費者的控制檯您能夠看到以下內容。

2017-12-18 15:54:09.947  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:09 CST 2017
2017-12-18 15:54:10.648  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017
2017-12-18 15:54:10.827  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017
2017-12-18 15:54:11.007  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017
2017-12-18 15:54:11.183  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017
2017-12-18 15:54:11.336  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017
2017-12-18 15:54:19.452  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017
2017-12-18 15:54:19.614  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017
2017-12-18 15:54:19.797  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017
2017-12-18 15:54:19.987  INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer    : Con
相關文章
相關標籤/搜索