什麼是消息?java
消息是一個或者多個實體之間溝通的一種方式而且無處不在。linux
自從計算機發明以來,計算機以多種多樣的方式發送消息,消息定義了軟硬件或者應用程序之間的溝通方式。消息老是有一個發送者和多個接受者,消息有synchronous和asynchronous、pub-sub和peer-to-peer, RPC和enterprise-based, Message Broker, ESB (Enterprise Service Bus), MOM
(Message Oriented Middleware)等。git
從這些咱們能夠確定,消息使分佈式溝通變得鬆耦合,意味着發送者如何發送和發送了什麼消息並不重要,接受者在消費消息時並不會告知發送者。github
Spring Boot RabbitMQweb
由於像Sun/Oracle/IBM的JMS,Microsoft的MSMQ,他們所使用的協議都是有全部權的,咱們也知道JMS僅僅只是提供了一些接口API,可是若是說要嘗試着去混合技術和編程語言去使用JMS,那這個就會至關的複雜。技術的不斷革新,給咱們帶來的一個結果就是有複雜的問題出現,就會有層出不窮的簡單的解決辦法被提出。在計算機消息領域,特別須要感謝JPMorgan團隊,由於他們發明了AMQP (Advance Message Queuing Protocol )協議。AMQP是爲MOM(Message Oriented Middleware)設計的一個開放式標準應用層協議。換句話說,你使用這個協議你就可使用任何技術或者語言。算法
在實現了AMQP協議的諸多消息系統當中,RabbitMQ是其中應用最普遍的一個之一,是MQ產品的典型表明,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。spring
本文主要介紹在Spring Boot中如何整合RabbitMQ並使用RabbitMQ。shell
安裝RabbitMQapache
本實例是在CentOS7上安裝RabbitMQ,不一樣的系統請經過搜索引擎查找不一樣的安裝方式。編程
一、安裝erlang 語言環境
wget http://www.erlang.org/download/otp_src_20.1.tar.gz //下載erlang包 tar -xzvf otp_src_20.1.tar.gz //解壓 cd otp_src_20.1/ //切換到安裝路徑 ./configure --prefix=/usr/local/erlang //生產安裝配置 make && make install //編譯安裝
vi /etc/profile
在底部添加以下內容
#set erlang environment ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH source /etc/profile //生效
測試一下是否安裝成功,在控制檯輸入命令erl
erl
若是進入erlang的shell則證實安裝成功,退出便可。
注意:執行./configure –prefix=/usr/local/erlang命令時可能會報以下錯誤:
configure: error: No curses library functions found configure: error: /bin/sh '/home/jiayi/otp_src_20.1/erts/configure' failed for erts
可嘗試執行以下命令解決:
yum -y install ncurses-devel
二、安裝RabbitMQ
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.0/rabbitmq-server-generic-unix-3.7.0.tar.xz //下載RabbitMQ安裝包 xz -d rabbitmq-server-generic-unix-3.7.0.tar.xz tar -xvf rabbitmq-server-generic-unix-3.7.0.tar
解壓後多了個文件夾rabbitmq-server-3.7.0 ,重命名爲rabbitmq以便記憶。
mv rabbitmq_server-3.7.0/ rabbitmq
vi /etc/profile
在底部添加以下內容
#set rabbitmq environment export PATH=$PATH:/data/rabbitmq/rabbitmq/sbin source /etc/profile //生效
rabbitmq-server -detached //啓動rabbitmq,-detached表明後臺守護進程方式啓動。
查看狀態,若是顯示以下截圖說明安裝成功:
rabbitmqctl status
其餘相關命令
啓動服務:rabbitmq-server -detached 查看狀態:rabbitmqctl status 關閉服務:rabbitmqctl stop 列出角色:rabbitmqctl list_users
配置網頁插件
首先建立目錄,不然可能報錯
mkdir /etc/rabbitmq
而後啓用插件
rabbitmq-plugins enable rabbitmq_management
配置防火牆
配置linux 端口 15672 網頁管理 5672 AMQP端口:
firewall-cmd --permanent --add-port=15672/tcp firewall-cmd --permanent --add-port=5672/tcp systemctl restart firewalld.service
如今你在瀏覽器中輸入服務器IP:15672 就能夠看到RabbitMQ的WEB管理頁面了,是否是很興奮,但是你沒有帳號密碼,別急。
配置訪問帳號密碼和權限
默認網頁是不容許訪問的,須要增長一個用戶修改一下權限,代碼以下
rabbitmqctl add_user superrd superrd //添加用戶,後面兩個參數分別是用戶名和密碼,我這都用superrd了。 rabbitmqctl set_permissions -p / superrd ".*" ".*" ".*" //添加權限 rabbitmqctl set_user_tags superrd administrator //修改用戶角色
而後就能夠遠程訪問了,而後可直接配置用戶權限等信息。
RabbitMQ/AMQP: Exchanges, Bindings和Queues
AMQP定義了三個很是容易理解的概念,分別是Exchanges、Bindings、Queues。
Exchanges:投遞消息到queue都是經由exchanges完成的,每一個exchange攜帶一個消息而且路由到Queues。
Bindings:消息的路由涉及到一個和exchanges類型及一些規則相關的算法,也就是說消息的路由須要遵照必定的規則,這些規則咱們就稱之爲Bindings。
Queues:Queues是Massage的落腳點和等待接收的地方。
AMQP定義了四個exchange類型,分別爲Direct, Fanout, Topic和Headers。下圖展現了不一樣exchange類型的特性。
雖然AMQP定義了不一樣的Exchange類型,可是其主要思想都是發送一個包含routing key的消息到exchange,而後exchange基於它本省的類型會分發消息到queue。若是沒有匹配到queue消息就會被扔到黑洞
default exchange將會自動與每一個已經建立的queue綁定。
direct exchange將會經過routing key與指定的queue綁定,你能夠看到這種exchange類型它是做爲一對一的綁定。
topic exchange和direct exchange比較相似,惟一的不一樣點就是在他的binding中你能夠在routing key中添加通配符。
headers exchange和topic exchange相似,惟一不一樣的地方就是binding是基於消息頭(message headers),這是一個很是強大的exchange,在消息頭裏面你能夠作全部事情且能夠添加任何表達式。
fanout exchange會拷貝消息發送給全部綁定的queues;這種exchange能夠做爲消息廣播。
本文經過使用默認exchange類型來描述如何整合Spring Boot與RabbitMQ。
一、構建消息消費者
配置pom.xml以下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.springboot</groupId> <artifactId>rabbitereceiver</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitereceiver</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
在pom.xml中包含了spring-boot-starter-amqp starter pom,這個pom包含了全部連接到RabbitMQ中間件所須要的spring-amqp和rabbitmq-client庫。
新建一個Spring Boot項目,添加Consumer類並寫入以下代碼。
package com.springboot.rabbitereceiver.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { private static final Logger log = LoggerFactory.getLogger(Consumer.class); @RabbitListener(queues="${myqueue}") public void handler(String message){ log.info("Consumer> " + message); } }
@Component:這個註解把普通pojo實例化到spring容器中;
@RabbitListener:此註解標記的方法(該註解也能夠註解類)是爲全部接收到的消息所建立的處理器,意思就是說他能夠連接到RabbitMQ的queue上並建立一個監聽並將消息傳遞到該方法。這種監聽器能夠經過使用正確的消息轉換器盡它最大的努力將消息轉換成合適的類型。
在application.properties寫入以下內容。
myqueue=spring-boot spring.rabbitmq.host=192.168.199.227 spring.rabbitmq.username=superrd spring.rabbitmq.password=superrd spring.rabbitmq.port=5672 spring.rabbitmq.dynamic=true
一、構建消息生產者
咱們在Spring Boot使用JDBC Template訪問數據一文中所建的項目的基礎上,將該項目改形成消息的生產者,咱們的需求是在每次刷新頁面時生產一條消息。
首先在pom.xml中添加amqp的依賴。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
新建rabbitmq包並增長Producer類,添加以下代碼。
package com.springboot.rabbitmq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class Producer { private static final Logger log = LoggerFactory.getLogger(Producer.class); @Autowired RabbitTemplate rabbitTemplate; public void sendTo(String routingkey,String message){ log.info("Sending> ..."); this.rabbitTemplate.convertAndSend(routingkey,message); } }
@Autowired RabbitTemplate:RabbitTemplate是一個同步訪問RabbitMQ發送和接受的一個簡化的幫助類。
sendTo(routingKey,message):該方法有兩個參數routing Key和message,在本實例中,routing Key就是queue的名字。該方法使用rabbitTemplate實例調用convertAndSend方法接受routing key和message。message將會被路由到exchange,而後exchange會將消息路由到正確的queue。
在application.properties中寫入的內容同消費者一直,這裏再也不給出。
在JournalService中增長以下代碼:
@Value("${myqueue}") String queue; @Bean Queue queue(){ return new Queue(queue,false); } @Autowired Producer producer; public void sendMsg(String msg){ producer.sendTo(queue, msg+" at " + new Date()); }
@Value:從application.properties取值;
@Bean Queue:實例化一個Queue類型的bean並使用提供的queue字符串建立一個Queue。Queue的構造函數接受queue的名稱和是否在服務重啓時持久化queue。
在頁面加載時,咱們就經過service調用sendMsg方法來生產一條消息。
@Controller public class JournalController { @Autowired JournalService service; @RequestMapping("/") public String index(Model model){ model.addAttribute("journal", service.findAll()); service.sendMsg("Load index page"); return "index"; } }
在啓動項目以前,經過RabbitMQ網頁管理工具添加一個queue。
啓動兩個項目,刷星web頁面,在消息消費者的控制檯您能夠看到以下內容。
2017-12-18 15:54:09.947 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:09 CST 2017 2017-12-18 15:54:10.648 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017 2017-12-18 15:54:10.827 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:10 CST 2017 2017-12-18 15:54:11.007 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:11.183 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:11.336 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:11 CST 2017 2017-12-18 15:54:19.452 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.614 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.797 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Consumer> Load index page at Mon Dec 18 15:54:19 CST 2017 2017-12-18 15:54:19.987 INFO 4604 --- [cTaskExecutor-1] c.s.rabbitereceiver.rabbitmq.Consumer : Con