RabbitMQ-從基礎到實戰(2)— 防止消息丟失
html
RabbitMQ-從基礎到實戰(3)— 消息的交換(上)java
RabbitMQ-從基礎到實戰(4)— 消息的交換(中)windows
RabbitMQ-從基礎到實戰(5)— 消息的交換(下)api
RabbitMQ-從基礎到實戰(6)— 與Spring集成數組
本篇博文介紹了在windows平臺下安裝RabbitMQ Server端,並用JAVA代碼實現收發消息maven
Windows平臺安裝完成後如圖ide
RabbitMQ提供一個控制檯,用於管理和監控RabbitMQ,默認是不啓動的,須要運行如下命令進行啓動post
rabbitmq-plugins enable rabbitmq_managementthis
目前能夠先不用理會此界面,後面使用到時會詳細介紹,也能夠到這裏查看官方文檔。spa
Spring對RabbitMQ已經進行了封裝,正常使用中,會使用Spring集成,第一個項目中,咱們先不考慮那麼多
在IDE中新建一個Maven項目,並在pom.xml中貼入以下依賴,RabbitMQ的最新版本依賴能夠在這裏找到
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
等待Maven下載完成後,就能夠在Maven Dependencies中看到RabbitMQ的JAR
在這裏,咱們發現,RabbitMQ的日誌依賴了slf4j-api這個包,slf4j-api並非一個日誌實現,這樣子是打不出日誌的,因此,咱們給pom加上一個日誌實現,這裏用了logback
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.1</version> </dependency>
以後maven依賴以下,能夠放心寫代碼了
新建一個MessageSender類,代碼以下
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class MessageSender { 12 13 private Logger logger = LoggerFactory.getLogger(MessageSender.class); 14 15 //聲明一個隊列名字 16 private final static String QUEUE_NAME = "hello"; 17 18 public boolean sendMessage(String message){ 19 //new一個RabbitMQ的鏈接工廠 20 ConnectionFactory factory = new ConnectionFactory(); 21 //設置須要鏈接的RabbitMQ地址,這裏指向本機 22 factory.setHost("127.0.0.1"); 23 Connection connection = null; 24 Channel channel = null; 25 try { 26 //嘗試獲取一個鏈接 27 connection = factory.newConnection(); 28 //嘗試建立一個channel 29 channel = connection.createChannel(); 30 //這裏的參數在後面詳解 31 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 32 //注意這裏調用了getBytes(),發送的實際上是byte數組,接收方收到消息後,須要從新組裝成String 33 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 34 logger.info("Sent '" + message + "'"); 35 //關閉channel和鏈接 36 channel.close(); 37 connection.close(); 38 } catch (IOException | TimeoutException e) { 39 //失敗後記錄日誌,返回false,表明發送失敗 40 logger.error("send message faild!",e); 41 return false; 42 } 43 return true; 44 } 45 }
而後在App類的main方法中調用sendMessage
1 public class App { 2 public static void main( String[] args ){ 3 MessageSender sender = new MessageSender(); 4 sender.sendMessage("hello RabbitMQ!"); 5 } 6 }
打印日誌以下
打開RabbitMQ的控制檯,能夠看到消息已經進到了RabbitMQ中
點進去,用控制檯自帶的getMessage功能,能夠看到消息已經成功由RabbitMQ管理了
至此,MessageSender已經寫好了,在該類的31和33行,咱們分別調用了隊列聲明和消息發送
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
queueDeclare,有不少參數,咱們能夠看一下他的源碼,註釋上有詳細的解釋,我簡單翻譯了一下
1 /** 2 * Declare a queue 聲明一個隊列 3 * @see com.rabbitmq.client.AMQP.Queue.Declare 4 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk 5 * @param queue the name of the queue隊列的名字 6 * @param durable true if we are declaring a durable queue (the queue will survive a server restart)是否持久化,爲true則在rabbitMQ重啓後生存 7 * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)是不是排他性隊列(別人看不到),只對當前鏈接有效,當前鏈接斷開後,隊列刪除(設置了持久化也刪除) 8 * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)自動刪除,在最後一個鏈接斷開後刪除隊列 9 * @param arguments other properties (construction arguments) for the queue 其餘參數 10 * @return a declaration-confirm method to indicate the queue was successfully declared 11 * @throws java.io.IOException if an error is encountered 12 */ 13 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, 14 Map<String, Object> arguments) throws IOException;
前面4個都很是好理解,最後一個「其餘參數」,究竟是什麼其餘參數,這個東西真的很難找,用到再解釋吧,官方文檔以下
basicPublish的翻譯以下
1 /** 2 * Publish a message.發送一條消息 3 * 4 * Publishing to a non-existent exchange will result in a channel-level 5 * protocol exception, which closes the channel. 6 * 7 * Invocations of <code>Channel#basicPublish</code> will eventually block if a 8 * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect. 9 * 10 * @see com.rabbitmq.client.AMQP.Basic.Publish 11 * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a> 12 * @param exchange the exchange to publish the message to 交換模式,會在後面講,官方文檔在這裏 13 * @param routingKey the routing key 控制消息發送到哪一個隊列 14 * @param props other properties for the message - routing headers etc 其餘參數 15 * @param body the message body 消息,byte數組 16 * @throws java.io.IOException if an error is encountered 17 */ 18 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
這裏又有個其餘參數,它的類型是這樣的,設置消息的一些詳細屬性
爲了和Sender區分開,新建一個Maven項目MessageConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class MessageConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(MessageConsumer.class); 20 21 public boolean consume(String queueName){ 22 //鏈接RabbitMQ 23 ConnectionFactory factory = new ConnectionFactory(); 24 factory.setHost("127.0.0.1"); 25 Connection connection = null; 26 Channel channel = null; 27 try { 28 connection = factory.newConnection(); 29 channel = connection.createChannel(); 30 //這裏聲明queue是爲了取消息的時候,queue確定會存在 31 //注意,queueDeclare是冪等的,也就是說,消費者和生產者,不論誰先聲明,都只會有一個queue 32 channel.queueDeclare(queueName, false, false, false, null); 33 34 //這裏重寫了DefaultConsumer的handleDelivery方法,由於發送的時候對消息進行了getByte(),在這裏要從新組裝成String 35 Consumer consumer = new DefaultConsumer(channel){ 36 @Override 37 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 38 throws IOException { 39 String message = new String(body, "UTF-8"); 40 logger.info("Received '" + message + "'"); 41 } 42 }; 43 //上面是聲明消費者,這裏用聲明的消費者消費掉隊列中的消息 44 channel.basicConsume(queueName, true, consumer); 45 46 //這裏不能關閉鏈接,調用了消費方法後,消費者會一直鏈接着rabbitMQ等待消費 47 48 } catch (IOException | TimeoutException e) { 49 //失敗後記錄日誌,返回false,表明消費失敗 50 logger.error("send message faild!",e); 51 return false; 52 } 53 54 55 return true; 56 } 57 }
而後在App的main方法中調用Cunsumer進行消費
1 public class App 2 { 3 //這個隊列名字要和生產者中的名字同樣,不然找不到隊列 4 private final static String QUEUE_NAME = "hello"; 5 6 public static void main( String[] args ) 7 { 8 MessageConsumer consumer = new MessageConsumer(); 9 consumer.consume(QUEUE_NAME); 10 } 11 }
結果以下,消費者一直在等待消息,每次有消息進來,就會馬上消費掉
改造一下Consumer
在App中new多個消費者
改造Sender,使它不停的往RabbitMQ中發送消息
啓動Sender
啓動Consumer,發現消息很平均的發給四個客戶端,一人一個,誰也不插隊
若是咱們把速度加快呢?把Sender的休息時間去掉,發現消費開始變得沒有規律了,其實呢,它仍是有規律的,這個是RabbitMQ的特性,稱做「Round-robin dispatching」,消息會平均的發送給每個消費者,能夠看第一第二行,消息分別是56981和56985,相應的8二、8二、84都被分給了其餘線程,只是在當前線程的時間片內,能夠處理這麼多任務,因此就一次打印出來了
這一章介紹了從安裝到用JAVA語言編寫生產者與消費者,在這裏只是簡單的消費消息並打印日誌,若是一個消息須要處理的時間很長,而處理的過程當中,這個消費者掛掉了,那消息會不會丟失呢?答案是確定的,並且已經分配給這個消費者,但還沒來得及處理的消息也會一併丟失掉,這個問題,RabbitMQ早就考慮到了,而且提供瞭解決方案,下一篇博文將進行詳細介紹