【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】java
RabbitMQ 即一個消息隊列,主要是用來實現應用程序的異步和解耦,同時也能起到消息緩衝,消息分發的做用。web
消息中間件最主要的做用是解耦,中間件最標準的用法是生產者生產消息傳送到隊列,消費者從隊列中拿取消息並處理,生產者不用關心是誰來消費,消費者不用關心誰在生產消息,從而達到解耦的目的。在分佈式的系統中,消息隊列也會被用在不少其它的方面,好比:分佈式事務的支持,RPC的調用等等。spring
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。shell
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。編程
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。json
一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。數組
那麼,其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。安全
交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout服務器
RabbitMQ簡單來講,就是生產者發送消息到虛擬主機,虛擬主機把消息交給指定的交換機,交換機按照規則扔給消息隊列進行存儲,消息隊列等待消費者來消費。微信
由此我想到了商品買賣:廠家生產商品賣給批發部,批發部交給指定的超市出售,超市按照售價擺放在門店,並等待顧客上門購買。
完美~~
由於RabbitMQ是由erlang語言寫的,就像Java程序須要jdk環境同樣,運行RabbitMQ也須要erlang環境。
環境:Centos7.4
下載地址:http://erlang.org/download/
在Linux終端運行命令行
下載:
wget http://erlang.org/download/otp_src_18.3.tar.gz
下載必定要認準otp_src_字樣。
解壓:
tar -zxvf otp_src_18.3.tar.gz
進入解壓後的根目錄:
./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
make && make install
上面有點慢。
把erlang加入環境變量:
vi /etc/profile
export ERLANG=/usr/local/erlang/erlang export PATH=$ERLANG/bin:$PATH
使環境變量生效
source /etc/profile
而後,咱們測試下是否安裝成功:
[root@ystcode ~]# erl Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false] Eshell V7.3 (abort with ^G) 1>
下載地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/
下載:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-generic-unix-3.6.1.tar.xz
對於下載xz包進行解壓,首先先下載xz壓縮工具:
yum install xz
對rabbitmq包進行解壓:
xz -d xz -d rabbitmq-server-generic-unix-3.6.1.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.1.tar
隨後在sbin
目錄啓用MQ管理方式:
./rabbitmq-plugins enable rabbitmq_management #啓動後臺管理
./rabbitmq-server -detached #後臺運行rabbitmq
添加用戶和權限 默認網頁guest用戶是不容許訪問的,須要增長一個用戶修改一下權限,代碼以下:
添加用戶:
./rabbitmqctl add_user admin admin
添加權限:
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用戶角色:
./rabbitmqctl set_user_tags admin administrator
而後就能夠遠程訪問了,而後可直接配置用戶權限等信息。
驗證
訪問http://localhost:15672/,輸入admin用戶密碼,登陸成功!
新建一個1.5版本的Spring boot項目,選擇rabbitmq+web模塊。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
virtual-host: /
默認就是斜槓,具體查看rabbitmq後臺admin欄。若是默認這行不用寫。
spring: rabbitmq: host: 127.0.0.1 username: admin password: admin virtual-host: / #能夠不用寫
direct 類型的行爲是」先匹配, 再投送」. 即在綁定時設定一個 routing_key, 消息的routing_key 匹配時, 纔會被交換器投送到綁定的隊列中去.
1.咱們在RabbitMQ後臺新建一個交換機,demo-direct
交換機名,direct
交換機類型,Durable
持久化:
2.再新建一個消息隊列,取名爲demo
:
3.點擊demo-direct
交換機進入綁定消息隊列demo
:
若是沒有指定routingkey,消息隊列的名稱就是routingkey
4.綁定成功後查看:
你能夠直接在交換機頁面下方的Publish message
發送消息,在消息隊列的Get message
查看消息,不過咱們實際生產比較多使用編程:
在Spring Boot建立單元測試
demo-direct
交換機名
demo
是該交換機綁定的消息隊列名
發送消息
@Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //message須要本身構造一個,定義消息體內容和消息體 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","值"); map.put("msg",true); //對象被默認序列化後發送 rabbitTemplate.convertAndSend("demo-direct","demo",map); }
此時發送消息咱們在rabbitmq網頁發現消息是經序列化後的,咱們若是想改變序列化機制爲JSON,也很簡單,只須要注入一我的Bean:
@Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
而後咱們再啓動測試,發送。
接收消息
demo
是消息隊列名,也就是消費者只須要獲得消息隊列的名字就能夠接收隊列中的消息。
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
查看打印
class java.util.HashMap {msg=true, key=值}
【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】
轉發消息到全部綁定隊列(廣播:忽略routing_key )
1.首先咱們須要在RabbitMQ後臺建立一個廣播交換機:
2.再建立一些(demo1,demo2)消息隊列,以一個做爲演示:
3.進入建立的交換機頁面:
4.交換機與消息隊列(demo1,demo2)進行綁定:
若是沒有指定routingkey,消息隊列的名稱就是routingkey
在Spring Boot建立單元測試
注意在廣播模式下會忽略忽略routing_key
發送消息
@Autowired RabbitTemplate rabbitTemplate; @Test public void send() { Book book = new Book(); book.setName("<西遊記>"); book.setAnthony("吳承恩"); //對象被默認序列化後發送 rabbitTemplate.convertAndSend("demo-fanout","",book); }
接收消息
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
打印輸出:
class cn.zyzpp.rabbitmq.entity.Book Book{name='<西遊記>', anthony='吳承恩'}
按規則轉發消息(使用通配符)
1.在rabbitmq後臺新建一個topic交換機
2.新建一個消息隊列demo
3.交換機綁定消息隊列,注意此處的路由鍵Routing key使用了通配符
*
表示一個詞.#
表示零個或多個詞.那咱們如何區分幾個字母爲一個單詞呢?
答案是經過」點分」的 routing_key 形式,好比兩個單詞是 *.demo
hello.demo
,
若是路由鍵爲demo.#
,那能夠匹配demo.
開頭的全部路由鍵。
4.查看此時的交換機
在Spring Boot建立單元測試
發送消息
@Test public void contextLoads() { //message須要本身構造一個,定義消息體內容和消息體 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","topic交換機"); map.put("msg",true); //對象被默認序列化後發送 rabbitTemplate.convertAndSend("demo-topic","demo.hello",map); }
接收消息
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
打印輸出
class java.util.HashMap {msg=true, key=topic交換機}
1.上面演示的是經過RabbitMQ網頁後臺建立,經過編程的方式也很是簡單:
@Autowired AmqpAdmin amqpAdmin; /** * 代碼建立交換機與消息隊列並綁定 */ @Test public void createExChange(){ new TopicExchange("topic.exChange"); new FanoutExchange("fanout.exChange"); amqpAdmin.declareExchange(new DirectExchange("amqp.exChange"));//建立交換機(remove爲刪除交換機) System.out.println("單播交換機建立完成"); amqpAdmin.declareQueue(new Queue("amqp.queue",true)); //建立消息隊列 amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,"amqp.exChange","amqp.exChange",null));//綁定 }
2.登陸後臺查看,建立成功!
監聽消息隊列,當有消息發到消息隊列,立馬獲取並操做。方法也很簡單。
1.開啓基於註解的Rabbit模式
@EnableRabbit //開啓基於註解的Rabbit模式 @SpringBootApplication public class RabbitmqApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); } }
2.@RabbitListener註解實現監聽
注意此時你的消息隊列已經有了demo
和demo.news
@Service public class BookService { @RabbitListener(queues = "demo") public void receive(Book book){ System.out.println("收到消息:"+ book); } @RabbitListener(queues = "demo.news") public void receiveMess(Message message){ System.out.println("收到消息:"+ message); System.out.println(message.getBody()); //getBody返回的byte[]字節數組 System.out.println(message.getMessageProperties()); } }
而後咱們在開啓主程序時再運行測試用例:
測試用例
@Test public void send() { Book book = new Book(); book.setName("<西遊記>"); book.setAnthony("吳承恩"); //對象被默認序列化後發送 rabbitTemplate.convertAndSend("demo-fanout","",book); }
查看主控制檯打印
收到消息:Book{name='<西遊記>', anthony='吳承恩'}
測試用例
@Test public void contextLoads() { //message須要本身構造一個,定義消息體內容和消息體 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","topic交換機"); map.put("msg",true); //對象被默認序列化後發送 rabbitTemplate.convertAndSend("demo-topic","demo.hello",map); }
查看主控制檯打印
收到消息:(Body:'{"msg":true,"key":"topic交換機"}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news]) [B@5332f99e MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news]
【本文版權歸微信公衆號"代碼藝術"(ID:onblog)全部,如果轉載請務必保留本段原創聲明,違者必究。如果文章有不足之處,歡迎關注微信公衆號私信與我進行交流!】