RabbitMQ-從基礎到實戰(1)— Hello RabbitMQ

轉載請註明出處

0.目錄

RabbitMQ-從基礎到實戰(2)— 防止消息丟失
html

RabbitMQ-從基礎到實戰(3)— 消息的交換(上)java

RabbitMQ-從基礎到實戰(4)— 消息的交換(中)windows

RabbitMQ-從基礎到實戰(5)— 消息的交換(下)api

RabbitMQ-從基礎到實戰(6)— 與Spring集成數組

1.簡介

本篇博文介紹了在windows平臺下安裝RabbitMQ Server端,並用JAVA代碼實現收發消息maven

2.安裝RabbitMQ

  1. RabbitMQ是用Erlang開發的,因此須要先安裝Erlang環境,在這裏下載對應系統的Erlang安裝包進行安裝
  2. 點擊這裏下載對應平臺的RabbitMQ安裝包進行安裝

Windows平臺安裝完成後如圖ide

 

3.啓用RabbitMQ Web控制檯

RabbitMQ提供一個控制檯,用於管理和監控RabbitMQ,默認是不啓動的,須要運行如下命令進行啓動post

  1. 點擊上圖的Rabbit Command Prompt,打開rabbitMQ控制檯
  2. 官方介紹管理控制檯的頁面,能夠看到,輸入如下命令啓動後臺控制插件

    rabbitmq-plugins enable rabbitmq_managementthis

  3. 登陸後臺頁面:http://localhost:15672/   密碼和用戶名都是 guest ,界面以下

 

目前能夠先不用理會此界面,後面使用到時會詳細介紹,也能夠到這裏查看官方文檔。spa

4.編寫MessageSender

Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring集成,第一個項目中,咱們先不考慮那麼多

在IDE中新建一個Maven項目,並在pom.xml中貼入以下依賴,RabbitMQ的最新版本依賴能夠在這裏找到

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>

等待Maven下載完成後,就能夠在Maven Dependencies中看到RabbitMQ的JAR

在這裏,咱們發現,RabbitMQ的日誌依賴了slf4j-api這個包,slf4j-api並非一個日誌實現,這樣子是打不出日誌的,因此,咱們給pom加上一個日誌實現,這裏用了logback

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.1</version>
</dependency>

以後maven依賴以下,能夠放心寫代碼了

 

新建一個MessageSender類,代碼以下

 1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3 
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 public class MessageSender {
12     
13     private Logger logger = LoggerFactory.getLogger(MessageSender.class);
14 
15     //聲明一個隊列名字
16     private final static String QUEUE_NAME = "hello";
17     
18     public boolean sendMessage(String message){
19         //new一個RabbitMQ的鏈接工廠
20         ConnectionFactory factory = new ConnectionFactory();
21         //設置須要鏈接的RabbitMQ地址,這裏指向本機
22         factory.setHost("127.0.0.1");
23         Connection connection = null;
24         Channel channel = null;
25         try {
26             //嘗試獲取一個鏈接
27             connection = factory.newConnection();
28             //嘗試建立一個channel
29             channel = connection.createChannel();
30             //這裏的參數在後面詳解
31             channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32             //注意這裏調用了getBytes(),發送的實際上是byte數組,接收方收到消息後,須要從新組裝成String
33             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
34             logger.info("Sent '" + message + "'");
35             //關閉channel和鏈接
36             channel.close();
37             connection.close();
38         } catch (IOException | TimeoutException e) {
39             //失敗後記錄日誌,返回false,表明發送失敗
40             logger.error("send message faild!",e);
41             return false;
42         }
43         return true;
44     }
45 }

而後在App類的main方法中調用sendMessage

1 public class App {
2     public static void main( String[] args ){
3         MessageSender sender = new MessageSender();
4         sender.sendMessage("hello RabbitMQ!");
5     }
6 }

打印日誌以下

打開RabbitMQ的控制檯,能夠看到消息已經進到了RabbitMQ中

點進去,用控制檯自帶的getMessage功能,能夠看到消息已經成功由RabbitMQ管理了

至此,MessageSender已經寫好了,在該類的31和33行,咱們分別調用了隊列聲明和消息發送

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

queueDeclare,有不少參數,咱們能夠看一下他的源碼,註釋上有詳細的解釋,我簡單翻譯了一下

 1 /**
 2      * Declare a queue 聲明一個隊列
 3      * @see com.rabbitmq.client.AMQP.Queue.Declare
 4      * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 5      * @param queue the name of the queue隊列的名字
 6      * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,爲true則在rabbitMQ重啓後生存
 7      * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是不是排他性隊列(別人看不到),只對當前鏈接有效,當前鏈接斷開後,隊列刪除(設置了持久化也刪除)
 8      * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動刪除,在最後一個鏈接斷開後刪除隊列
 9      * @param arguments other properties (construction arguments) for the queue 其餘參數
10      * @return a declaration-confirm method to indicate the queue was successfully declared
11      * @throws java.io.IOException if an error is encountered
12      */
13     Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
14                                  Map<String, Object> arguments) throws IOException;

前面4個都很是好理解,最後一個「其餘參數」,究竟是什麼其餘參數,這個東西真的很難找,用到再解釋吧,官方文檔以下

  • TTL Time To Live  存活時間

basicPublish的翻譯以下

 1  /**
 2      * Publish a message.發送一條消息
 3      *
 4      * Publishing to a non-existent exchange will result in a channel-level
 5      * protocol exception, which closes the channel.
 6      *
 7      * Invocations of <code>Channel#basicPublish</code> will eventually block if a
 8      * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
 9      *
10      * @see com.rabbitmq.client.AMQP.Basic.Publish
11      * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>
12      * @param exchange the exchange to publish the message to 交換模式,會在後面講,官方文檔在這裏 13      * @param routingKey the routing key 控制消息發送到哪一個隊列
14      * @param props other properties for the message - routing headers etc 其餘參數
15      * @param body the message body 消息,byte數組
16      * @throws java.io.IOException if an error is encountered
17      */
18     void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

這裏又有個其餘參數,它的類型是這樣的,設置消息的一些詳細屬性

 

5.編寫MessageConsumer

爲了和Sender區分開,新建一個Maven項目MessageConsumer

 1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class MessageConsumer {
18     
19     private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
20     
21     public boolean consume(String queueName){
22         //鏈接RabbitMQ
23         ConnectionFactory factory = new ConnectionFactory();
24         factory.setHost("127.0.0.1");
25         Connection connection = null;
26         Channel channel = null;
27         try {
28             connection = factory.newConnection();
29             channel = connection.createChannel();
30             //這裏聲明queue是爲了取消息的時候,queue確定會存在
31             //注意,queueDeclare是冪等的,也就是說,消費者和生產者,不論誰先聲明,都只會有一個queue
32             channel.queueDeclare(queueName, false, false, false, null);
33             
34             //這裏重寫了DefaultConsumer的handleDelivery方法,由於發送的時候對消息進行了getByte(),在這裏要從新組裝成String
35             Consumer consumer = new DefaultConsumer(channel){
36                 @Override
37                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
38                           throws IOException {
39                         String message = new String(body, "UTF-8");
40                         logger.info("Received '" + message + "'");
41                 }
42             };
43             //上面是聲明消費者,這裏用聲明的消費者消費掉隊列中的消息
44             channel.basicConsume(queueName, true, consumer);
45             
46             //這裏不能關閉鏈接,調用了消費方法後,消費者會一直鏈接着rabbitMQ等待消費
47            
48         } catch (IOException | TimeoutException e) {
49             //失敗後記錄日誌,返回false,表明消費失敗
50             logger.error("send message faild!",e);
51             return false;
52         }
53         
54         
55         return true;
56     }
57 }

而後在App的main方法中調用Cunsumer進行消費

 1 public class App 
 2 {
 3     //這個隊列名字要和生產者中的名字同樣,不然找不到隊列
 4     private final static String QUEUE_NAME = "hello";
 5     
 6     public static void main( String[] args )
 7     {
 8         MessageConsumer consumer = new MessageConsumer();
 9         consumer.consume(QUEUE_NAME);
10     }
11 }

結果以下,消費者一直在等待消息,每次有消息進來,就會馬上消費掉

6.多個消費者同時消費一個隊列

改造一下Consumer

在App中new多個消費者

改造Sender,使它不停的往RabbitMQ中發送消息

啓動Sender

啓動Consumer,發現消息很平均的發給四個客戶端,一人一個,誰也不插隊

若是咱們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它仍是有規律的,這個是RabbitMQ的特性,稱做「Round-robin dispatching」,消息會平均的發送給每個消費者,能夠看第一第二行,消息分別是56981和56985,相應的8二、8二、84都被分給了其餘線程,只是在當前線程的時間片內,能夠處理這麼多任務,因此就一次打印出來了

 

7.結束語

這一章介紹了從安裝到用JAVA語言編寫生產者與消費者,在這裏只是簡單的消費消息並打印日誌,若是一個消息須要處理的時間很長,而處理的過程當中,這個消費者掛掉了,那消息會不會丟失呢?答案是確定的,並且已經分配給這個消費者,但還沒來得及處理的消息也會一併丟失掉,這個問題,RabbitMQ早就考慮到了,而且提供瞭解決方案,下一篇博文將進行詳細介紹

相關文章
相關標籤/搜索