RabbitMQ是一個消息中間件,在一些須要異步處理、發佈/訂閱等場景的時候,使用RabbitMQ能夠完成咱們的需求。 下面是我在學習java語言實現RabbitMQ(自RabbitMQ官網的Tutorials)的一些記錄。html
首先有三個名稱瞭解一下(如下圖片來自rabbitMQ官網)java
producer
是用戶應用負責發送消息queue
是存儲消息的緩衝(buffer)consumer
是用戶應用負責接收消息下面是我使用rabbitMQ原生的jar包作的測試方法數組
maven pom.xml 加入服務器
<dependency>異步
<groupId>com.rabbitmq</groupId>maven
<artifactId>amqp-client</artifactId>ide
<version>3.5.6</version>學習
</dependency>測試
方法實現示意圖fetch
發送消息方法(Send.java)
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 public class Send { 6 7 private static final String QUEUE_NAME = "hello"; 8 9 public static void main(String[] args) throws Exception { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("192.168.1.7"); 12 factory.setPort(5672); 13 factory.setUsername("admin"); 14 factory.setPassword("admin"); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 19 String message = "Hello World!"; 20 // "" 表示默認exchange 21 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 22 System.out.println(" [x] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 28 }
10~16行 是獲取rabbitmq.client.Channel, rabbitMQ的API操做基本都是經過channel來完成的。
18行 channel.queueDeclare(QUEUE_NAME, false, false, false, null),這裏channel聲明瞭一個名字叫「hello」的queue,聲明queue的操做是冪等的,也就是說只有不存在相同名稱的queue的狀況下才會建立一個新的queue。
21行 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()),chaneel在這個queue裏發佈了消息(字節數組)。
24~25行 則是連接的關閉,注意關閉順序就行了。
接受消息方法 (Recv.java)
1 import com.rabbitmq.client.AMQP; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 11 public class Recv { 12 13 private final static String QUEUE_NAME = "hello"; 14 15 public static void main(String[] argv) throws Exception { 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("192.168.1.7"); 18 factory.setPort(5672); 19 factory.setUsername("admin"); 20 factory.setPassword("admin"); 21 Connection connection = factory.newConnection(); 22 Channel channel = connection.createChannel(); 23 24 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 25 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 26 27 Consumer consumer = new DefaultConsumer(channel) { 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 30 throws IOException { 31 String message = new String(body, "UTF-8"); 32 System.out.println(" [x] Received '" + message + "'"); 33 } 34 }; 35 channel.basicConsume(QUEUE_NAME, true, consumer); 36 } 37 }
16~22行 和Send類中同樣,也是獲取同一個rabbitMQ服務的channel,這也是能接受到消息的基礎。
24行 一樣聲明瞭一個和Send類中發佈的queue相同的queue。
27~35行 DefaultConsumer
類實現了Consumer
接口,因爲推送消息是異步的,所以在這裏提供了一個callback來緩衝接受到的消息。
先運行Recv 而後再運行Send,就能夠看到消息被接受輸出到控制檯了,若是多啓動幾個Recv,會發現消息被每一個消費者按順序分別消費了,
這也就是rabbitMQ默認採用Round-robin dispatching(輪詢分發機制)。
上面簡單的實現了rabbitMQ消息的發送和接受,可是不管Send類中的queueDeclare 、basicPublish方法還有Recv類中的basicConsume方法都有不少的參數,
下面咱們分析一下幾個重要的參數。
(一)Message acknowledgment 消息答覆
上面Recv.java的第35行中,channel.basicConsume(QUEUE_NAME, true, consumer),
在Channel接口中定義爲 String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
這個autoAck咱們當前實現爲true,表示服務器會自動確認ack,一旦RabbitMQ將一個消息傳遞到consumer,它立刻會被標記爲刪除狀態。
這樣若是consumer在正常執行任務過程當中,一旦consumer服務掛了,那麼咱們就永遠的失去了這個consumer正在處理的全部消息。
爲了防止這種狀況,rabbitMQ支持Message acknowledgment,當消息被一個consumer接受並處理完成後,consumer發送給rabbitMQ一個回執,而後rabbitMQ纔會刪除這個消息。
當一個消息掛了,rabbitMQ會給另外可用的consumer繼續發送上個consumer由於掛了而沒有處理成功的消息。
所以咱們能夠設置autoAck=false,來顯示的讓服務端作消息成功執行的確認。
(二)Message durability 消息持久化
Message acknowledgment 確保了consumer掛了的狀況下,消息還能夠被其餘consumer接受處理,可是若是rabbitMQ掛了呢?
在聲明隊列的方法中,Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
durable=true 意味着該隊列將在服務器重啓後繼續存在。Send和Recv兩個類中聲明隊列的方法都要設置durable=true。
如今,咱們須要將消息標記爲持久性——經過將MessageProperties(它實現BasicProperties)設置爲PERSISTENT_TEXT_PLAIN
(三)Fair dispatch 公平分發
rabbitMQ默認是輪詢分發,這樣對多個consumer而言,可能就會出現負載不均衡的問題,不管是任務自己難易度,仍是consumer處理能力的不一樣,都是致使這種問題。
爲了處理這種狀況咱們可使用basicQos
方法來設置prefetchCount = 1
。 這告訴rabbitMQ一次只給consumer一條消息,換句話來講,就是直到consumer發回ack,而後再向這個consumer發送下一條消息。
int
prefetchCount =
1
;
channel.basicQos(prefetchCount);
正是由於Fair dispatch是基於ack的,全部它最好和Message acknowledgment同時使用,不然在autoAck=true的狀況下,單獨設置Fair dispatch並無效果。
下面是本人測試以上三種狀況的測試代碼,能夠直接使用。
import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String QUEUE_NAME = "task_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; //消息持久化 channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 多個消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.*; import java.io.IOException; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.7"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // basicQos方法來設置prefetchCount = 1。 這告訴RabbitMQy一次只給worker一條消息,換句話來講,就是直到worker發回ack,而後再向這個worker發送下一條消息。 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { System.out.println(" [x] Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; // 當consumer確認收到某個消息,而且已經處理完成,RabbitMQ能夠刪除它時,consumer會向RabbitMQ發送一個ack(nowledgement)。 boolean autoAck = true; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } protected static void doWork(String message) throws InterruptedException { for (char ch: message.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } }
一個完整的rabbitMQ消息模型是會有Exchange的。
rabbitMQ的消息模型的核心思想是producer永遠不會直接發送任何消息到queue中,實際上,在不少狀況下producer根本不知道一條消息是否被髮送到了哪一個queue中。
在rabbitMQ中,producer僅僅將消息發送到一個exchange中。要理解exchange也很是簡單,它一邊負責接收producer發送的消息, 另外一邊將消息推送到queue中。
exchange必須清楚的知道在收到消息以後該如何進行下一步的處理,好比是否應該將這條消息發送到某個queue中? 仍是應該發送到多個queue中?仍是應該直接丟棄這條消息等。
exchange模型以下:
exchange類型也有好幾種:direct
,topic
,headers
以及fanout。
下面咱們來建立一個fanout
類型的exchange,顧名思義,fanout會向全部的queue廣播全部收到的消息。
1 import java.io.IOException; 2 import java.util.Scanner; 3 import java.util.concurrent.TimeoutException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 import rabbitMQ.RabbitMQTestUtil; 10 11 public class EmitLog { 12 13 private static final String EXCHANGE_NAME = "logs"; 14 15 public static void main(String[] argv) throws IOException, TimeoutException { 16 17 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 22 23 // 多個消息使用空格分隔 24 Scanner sc = new Scanner(System.in); 25 String[] splits = sc.nextLine().split(" "); 26 for (int i = 0; i < splits.length; i++) { 27 channel.basicPublish(EXCHANGE_NAME, "", null, splits[i].getBytes()); 28 System.out.println(" [x] Sent '" + splits[i] + "'"); 29 } 30 31 channel.close(); 32 connection.close(); 33 } 34 }
1 import java.io.IOException; 2 3 import com.rabbitmq.client.AMQP; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 import com.rabbitmq.client.ConnectionFactory; 7 import com.rabbitmq.client.Consumer; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 11 import rabbitMQ.RabbitMQTestUtil; 12 13 public class ReceiveLogs { 14 15 private static final String EXCHANGE_NAME = "logs"; 16 17 public static void main(String[] argv) throws Exception { 18 ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); 19 Connection connection = factory.newConnection(); 20 Channel channel = connection.createChannel(); 21 22 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 23 String queueName = channel.queueDeclare().getQueue(); 24 channel.queueBind(queueName, EXCHANGE_NAME, ""); 25 26 Consumer consumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, 29 AMQP.BasicProperties properties, byte[] body) throws IOException { 30 String message = new String(body, "UTF-8"); 31 System.out.println(" [x] Received '" + message + "'"); 32 } 33 }; 34 channel.basicConsume(queueName, true, consumer); 35 } 36 }
在fanout的exchange類型中,消息的發佈已經隊列的綁定方法中,routingKey參數都是默認空值,由於fanout類型會直接忽略這個值,
可是在其餘exchange類型中它擁有很重要的意義,
rabbitMQ支持以上兩種綁定,消息在發佈的時候,會指定一個routing key,而圖一中exchange會把routing key爲orange
發送的消息將會被路由到queue Q1
中,使用routing key爲black
或者green
的將會被路由到Q2
中。
將多個queue使用相同的binding key進行綁定也是可行的。能夠在X和Q1中間增長一個routing key black
。 它會向全部匹配的queue進行廣播,使用routing key爲black
發送的消息將會同時被Q1
和Q2
接收。
下面是我測試debug和error兩種routing key發佈消息並接受處理消息的代碼:
import java.util.Scanner; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import rabbitMQ.RabbitMQTestUtil; public class EmitLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, TimeoutException { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 多個消息使用空格分隔 Scanner sc = new Scanner(System.in); String[] splits = sc.nextLine().split(" "); for (int i = 0; i < splits.length; i++) { channel.basicPublish(EXCHANGE_NAME, splits[i], null, splits[i].getBytes()); System.out.println(" [x] Sent '" + splits[i] + "'"); } channel.close(); connection.close(); } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsDebug { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "debug"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import rabbitMQ.RabbitMQTestUtil; public class ReceiveLogsError { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = RabbitMQTestUtil.getConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "error"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
發送輸入:
debug接受:
error接受:
發送到topic exchange中的消息不能有一個任意的routing_key
——它必須是一個使用點分隔的單詞列表。單詞能夠是任意的。一些有效的routing key例子:」stock.usd.nyse」,」nyse.vmw」,」quick.orange.rabbit」。
routing key的長度限制爲255個字節數。
binding key也必須是相同的形式。topic exchange背後的邏輯相似於direct——一條使用特定的routing key發送的消息將會被傳遞至全部使用與該routing key相同的binding key進行綁定的隊列中。 然而,對binding key來講有兩種特殊的狀況:
和Direct exchange差很少,代碼就不copy了,有興趣的直接看看教程http://www.rabbitmq.com/tutorials/tutorial-five-java.html