RabbitMQ是開源的消息中間件,它是輕量級的,支持多種消息傳遞協議,能夠部署在分佈式和聯合配置中,以知足高級別、高可用性需求。而且可在許多操做系統和雲環境上運行,併爲大多數流行語言提供了普遍的開發工具。(這裏只介紹JAVA下的RabbitMQ的使用,感興趣的能夠查看官方文檔:http://www.rabbitmq.com/getstarted.html);html
安裝:前端
參考:http://www.cnblogs.com/lfalex0831/p/8951955.html(windows安裝)java
應用場景:git
一、異步處理,主要爲了較少請求的響應時間和解耦。即將比較耗時又不須要同步的操做放入消息隊列中進行傳遞請求,只要保證消息格式(能夠理解爲接頭的暗號)不變,這樣消息的發送方和接收方互不干擾交互,即爲解耦;
windows
二、廣播,顧名思義,廣播的好處就是一次發送,你們共享,大大的減小了冗餘的操做,也下降了新增消費者的成本;緩存
三、流量削峯,好比秒殺活動,由於流量過大,致使應用掛掉,爲了不這個問題,會在應用前端加入消息隊列。
做用:
1.能夠控制進入後臺的服務,超過閥值的訂單直接丟棄;
2.能夠緩解瞬時的高流量壓垮應用;服務器
ps:秒殺系統優化思路能夠從將請求儘可能攔截在系統上游和充分利用緩存;
app
(若是還有別的應用場景,請你們多多指教。。。)dom
使用場景(本文使用的RabbitMQ的版本爲5.20版本):異步
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
注意:根據官方文檔說明,RabbitMQ的默認訪問端口爲5672,而管理端口爲15672,但願不要搞混了(我剛接觸時就沒注意,果斷亂了。。-_-||)。
基本概念:
在下圖中,P是咱們的生產者,C是咱們的消費者。中間的框是一個隊列——RabbitMQ表明消費者保存的消息緩衝區。
建立RabbitMQ的工廠類:
1 import com.rabbitmq.client.Connection; 2 import com.rabbitmq.client.ConnectionFactory; 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 public class ConnectionUtil { 7 8 private static final String RABBIT_HOST = "localhost"; 9 10 private static final String RABBIT_USERNAME = "guest"; 11 12 private static final String RABBIT_PASSWORD = "guest"; 13 14 private static Connection connection = null; 15 16 public static Connection getConnection() { 17 if(connection == null) { 18 ConnectionFactory connectionFactory = new ConnectionFactory(); 19 connectionFactory.setHost(RABBIT_HOST); 20 connectionFactory.setUsername(RABBIT_USERNAME); 21 connectionFactory.setPassword(RABBIT_PASSWORD); 22 try { 23 connection = connectionFactory.newConnection(); 24 } catch (IOException e) { 25 e.printStackTrace(); 26 } catch (TimeoutException e) { 27 e.printStackTrace(); 28 } 29 } 30 return connection; 31 } 32 33 }
建立生產者Producer:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import java.io.IOException; 5 import java.util.concurrent.TimeoutException; 6 7 public class Producer { 8 9 private static final String QUEUE_NAME="test_queue"; 10 11 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 12 //獲取鏈接 13 Connection connection = ConnectionUtil.getConnection(); 14 System.out.println(connection); 15 //建立通道 16 Channel channel = connection.createChannel(1); 17 /* 18 * 聲明(建立)隊列 19 * 參數1:隊列名稱 20 * 參數2:爲true時server重啓隊列不會消失 21 * 參數3:隊列是不是獨佔的,若是爲true只能被一個connection使用,其餘鏈接創建時會拋出異常 22 * 參數4:隊列再也不使用時是否自動刪除(沒有鏈接,而且沒有未處理的消息) 23 * 參數5:創建隊列時的其餘參數 24 */ 25 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 26 String message = "Hello World!"; 27 for (int i = 0; i < 20; i++) { 28 message = message + i; 29 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 30 Thread.sleep(1000); 31 } 32 System.out.println("生產者 send :"+message); 33 channel.close(); 34 connection.close(); 35 } 36 37 }
建立消費者Consumer:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.DefaultConsumer; 6 import com.rabbitmq.client.Envelope; 7 import java.io.IOException; 8 9 public class Consumer { 10 11 private static final String QUEUE_NAME = "test_queue"; 12 13 public static void main(String[] args) throws IOException { 14 Connection connection = ConnectionUtil.getConnection(); 15 Channel channel = connection.createChannel(1); 16 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 17 StringBuffer message = new StringBuffer(); 18 //自4.0+ 版本後沒法再使用QueueingConsumer,而官方推薦使用DefaultConsumer 19 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { 20 @Override 21 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 22 throws IOException { 23 try { 24 Thread.sleep(2000); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 super.handleDelivery(consumerTag, envelope, properties, body); 29 message.append(new String(body,"UTF-8")); 30 System.out.println(new String(body,"UTF-8")); 31 } 32 }; 33 //監聽隊列,當b爲true時,爲自動提交(只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費), 34 // 當b爲false時,爲手動提交(消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋, 35 // 若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。 36 //若是選用自動確認,在消費者拿走消息執行過程當中出現宕機時,消息可能就會丟失!!) 37 //使用channel.basicAck(envelope.getDeliveryTag(),false);進行消息確認 38 channel.basicConsume(QUEUE_NAME,true,consumer); 39 System.out.println(message.toString()); 40 } 41 }
測試結果,Consumer收到Producer的消息。
上一個例子是一對一發送接收形式,而工做隊列爲一對多發送接收形式。工做隊列(即任務隊列)背後的主要思想是避免當即執行資源密集型任務,而且必須等待它完成。相反,咱們把任務安排在之後作。咱們將任務封裝爲消息並將其發送到隊列。在後臺運行的工做進程會彈出任務並最終執行任務。當你運行許多Consumer時,任務將在他們之間共享,以下圖:
因爲工廠類已經建立,直接使用便可。
建立生產者Producer:
package com.cn.work; import com.cn.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * @program: rabbit-learn * @description: 生產者 * @create: 2018-04-26 16:18 **/ public class Producer { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null); for (int i = 0; i < 50; i++) { String message = "" + i; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); Thread.sleep(100 * i); } channel.close(); connection.close(); } }
建立消費者1,2:
import com.cn.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; public class Consumer1 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] args) throws IOException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.basicQos(1);//能者多勞模式 //聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //自4.0+ 版本後沒法再使用QueueingConsumer,而官方推薦使用DefaultConsumer Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); System.out.println(message); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } try { doWork(message); }finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }; //監聽隊列,當b爲true時,爲自動提交(只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費), // 當b爲false時,爲手動提交(消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋, // 若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。 //若是選用自動確認,在消費者拿走消息執行過程當中出現宕機時,消息可能就會丟失!!) //使用channel.basicAck(envelope.getDeliveryTag(),false);進行消息確認 channel.basicConsume(QUEUE_NAME,false,consumer); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import java.io.IOException; 9 10 public class Consumer2 { 11 12 private final static String QUEUE_NAME = "test_queue_work"; 13 14 public static void main(String[] args) throws IOException { 15 Connection connection = ConnectionUtil.getConnection(); 16 Channel channel = connection.createChannel(); 17 channel.basicQos(1);//能者多勞模式 18 //聲明隊列 19 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 20 //自4.0+ 版本後沒法再使用QueueingConsumer,而官方推薦使用DefaultConsumer 21 Consumer consumer = new DefaultConsumer(channel) { 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 24 throws IOException { 25 super.handleDelivery(consumerTag, envelope, properties, body); 26 String message = new String(body,"UTF-8"); 27 System.out.println(message); 28 try { 29 Thread.sleep(1000); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 try { 34 doWork(message); 35 }finally { 36 channel.basicAck(envelope.getDeliveryTag(), false); 37 } 38 } 39 }; 40 //監聽隊列,當b爲true時,爲自動提交(只要消息從隊列中獲取,不管消費者獲取到消息後是否成功消息,都認爲是消息已經成功消費), 41 // 當b爲false時,爲手動提交(消費者從隊列中獲取消息後,服務器會將該消息標記爲不可用狀態,等待消費者的反饋, 42 // 若是消費者一直沒有反饋,那麼該消息將一直處於不可用狀態。 43 //若是選用自動確認,在消費者拿走消息執行過程當中出現宕機時,消息可能就會丟失!!) 44 //使用channel.basicAck(envelope.getDeliveryTag(),false);進行消息確認 45 channel.basicConsume(QUEUE_NAME,false,consumer); 46 } 47 48 /** 49 * @Description: 業務代碼 50 * @Param: 51 * @return: 52 * @Author: 535504 53 * @Date: 2018/4/26 54 */ 55 private static void doWork(String task) { 56 for (char ch : task.toCharArray()) { 57 if (ch == '.') { 58 try { 59 Thread.sleep(1000); 60 } catch (InterruptedException _ignored) { 61 Thread.currentThread().interrupt(); 62 } 63 } 64 } 65 } 66 67 }
測試結果,當消費者中的channel.basicQos(1);這行代碼的註釋打開時,執行會發現,休眠時間短的消費者執行的任務多,而註釋後,再次執行會發現消費者1和消費者2獲取到的消息內容是不一樣的,同一個消息只能被一個消費者獲取,消費者1和消費者2獲取到的消息的數量是相同的,一個是奇數一個是偶數。
在發佈訂閱模式中,消息須要發送到MQ的交換機exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer, 消息隊列的使用過程大概以下:
1.客戶端鏈接到消息隊列服務器,打開一個channel。
2.客戶端聲明一個exchange,並設置相關屬性。
3.客戶端聲明一個queue,並設置相關屬性。
4.客戶端在exchange和queue之間創建好綁定關係。
5.客戶端投遞消息到exchange。
建立生產者Producer:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import java.io.IOException; 5 import java.util.concurrent.TimeoutException; 6 7 /** 8 * @program: rabbit-learn 9 * @description: 生產者,訂閱模式 10 * @author: 11 * @create: 12 * 消息發送到沒有隊列綁定的交換機時,消息將丟失,由於,交換機沒有存儲消息的能力,消息只能存在在隊列中。 13 **/ 14 public class Producer { 15 16 //交換機名稱 17 private static final String EXCHANGE_NAME = "test_exchange_fanout"; 18 19 public static void main(String[] args) throws IOException, TimeoutException { 20 Connection connection = ConnectionUtil.getConnection(); 21 Channel channel = connection.createChannel(); 22 /* 23 聲明exchange交換機 24 參數1:交換機名稱 25 參數2:交換機類型 26 參數3:交換機持久性,若是爲true則服務器重啓時不會丟失 27 參數4:交換機在不被使用時是否刪除 28 參數5:交換機的其餘屬性 29 */ 30 channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true,true,null); 31 32 String message = "訂閱消息"; 33 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); 34 System.out.println("生產者 send :"+message); 35 channel.close(); 36 connection.close(); 37 } 38 39 }
建立消費者Consumer一、2:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import java.io.IOException; 9 10 public class Consumer1 { 11 12 private static final String QUEUE_NAME = "test_queue_exchange_1"; 13 14 private static final String EXCHANGE_NAME = "test_exchange_fanout"; 15 16 public static void main(String[] args) throws IOException { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 20 //聲明隊列 21 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 22 23 /* 24 綁定隊列到交換機(這個交換機名稱必定要和生產者的交換機名相同) 25 參數1:隊列名 26 參數2:交換機名 27 參數3:Routing key 路由鍵 28 */ 29 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); 30 31 //同一時刻服務器只會發一條數據給消費者 32 channel.basicQos(1); 33 34 Consumer consumer = new DefaultConsumer(channel) { 35 @Override 36 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 37 throws IOException { 38 super.handleDelivery(consumerTag, envelope, properties, body); 39 String message = new String(body,"UTF-8"); 40 System.out.println("收到消息:"+message); 41 channel.basicAck(envelope.getDeliveryTag(),false); 42 } 43 }; 44 channel.basicConsume(QUEUE_NAME,false,consumer); 45 } 46 47 }
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import java.io.IOException; 9 10 public class Consumer2 { 11 12 private static final String QUEUE_NAME = "test_queue_exchange_2"; 13 14 private static final String EXCHANGE_NAME = "test_exchange_fanout"; 15 16 public static void main(String[] args) throws IOException { 17 Connection connection = ConnectionUtil.getConnection(); 18 Channel channel = connection.createChannel(); 19 20 //聲明隊列 21 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 22 23 /* 24 綁定隊列到交換機(這個交換機名稱必定要和生產者的交換機名相同) 25 參數1:隊列名 26 參數2:交換機名 27 參數3:Routing key 路由鍵 28 */ 29 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); 30 31 //同一時刻服務器只會發一條數據給消費者 32 channel.basicQos(1); 33 34 Consumer consumer = new DefaultConsumer(channel) { 35 @Override 36 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 37 throws IOException { 38 super.handleDelivery(consumerTag, envelope, properties, body); 39 String message = new String(body,"UTF-8"); 40 System.out.println("收到消息:"+message); 41 channel.basicAck(envelope.getDeliveryTag(),false); 42 } 43 }; 44 channel.basicConsume(QUEUE_NAME,false,consumer); 45 } 46 47 }
根據指定的路由鍵發送到對應的消息隊列中,以下圖,在這個設置中,咱們能夠看到與它綁定的兩個隊列的直接交換X。第一個隊列綁定了綁定鍵橙色,第二個隊列有兩個綁定,一個綁定鍵爲黑色,另外一個爲綠色。在這樣的設置中,將發送到與路由鍵橙色的交換的消息將被路由到隊列Q1。帶有黑色或綠色的路由鍵的消息將會進入Q2。全部其餘消息將被丟棄。
建立生產者Producer:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 5 public class Producer { 6 7 private static final String EXCHANGE_NAME = "test_exchange_direct"; 8 9 public static void main(String[] argv) throws Exception { 10 // 獲取到鏈接以及mq通道 11 Connection connection = ConnectionUtil.getConnection(); 12 Channel channel = connection.createChannel(); 13 14 // 聲明exchange,路由模式聲明direct 15 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 16 17 // 消息內容 18 String message = "這是消息B"; 19 channel.basicPublish(EXCHANGE_NAME, "B", null, message.getBytes()); 20 String messageA = "這是消息A"; 21 channel.basicPublish(EXCHANGE_NAME, "A", null, messageA.getBytes()); 22 System.out.println(" [生產者] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 28 }
建立消費者Consumer一、2:
1 package com.cn.routing; 2 3 import com.cn.ConnectionUtil; 4 import com.rabbitmq.client.AMQP.BasicProperties; 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.Consumer; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 import java.io.IOException; 11 12 /** 13 * @program: rabbit-learn 14 * @description: 消費者1 15 * @author: 535504 16 * @create: 2018-04-26 17:52 17 **/ 18 public class Consumer1 { 19 private final static String QUEUE_NAME = "test_queue_direct_A"; 20 21 private final static String EXCHANGE_NAME = "test_exchange_direct"; 22 23 public static void main(String[] argv) throws Exception { 24 25 // 獲取到鏈接以及mq通道 26 Connection connection = ConnectionUtil.getConnection(); 27 Channel channel = connection.createChannel(); 28 29 // 聲明隊列 30 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); 31 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 32 /* 33 * 綁定隊列到交換機 34 * 參數1:隊列的名稱 35 * 參數2:交換機的名稱 36 * 參數3:routingKey 37 */ 38 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "A"); 39 40 // 同一時刻服務器只會發一條消息給消費者 41 channel.basicQos(1); 42 43 // 定義隊列的消費者 44 Consumer consumer = new DefaultConsumer(channel) { 45 @Override 46 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 47 throws IOException { 48 super.handleDelivery(consumerTag, envelope, properties, body); 49 System.out.println(new String(body,"UTF-8")); 50 } 51 }; 52 channel.basicConsume(QUEUE_NAME,true,consumer); 53 } 54 }
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 import java.io.IOException; 9 10 public class Consumer2 { 11 private final static String QUEUE_NAME = "test_queue_direct_B"; 12 13 private final static String EXCHANGE_NAME = "test_exchange_direct"; 14 15 public static void main(String[] argv) throws Exception { 16 17 // 獲取到鏈接以及mq通道 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 聲明隊列 22 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 channel.exchangeDeclare(EXCHANGE_NAME,"direct"); 24 /* 25 * 綁定隊列到交換機 26 * 參數1:隊列的名稱 27 * 參數2:交換機的名稱 28 * 參數3:routingKey 29 */ 30 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "B"); 31 32 // 同一時刻服務器只會發一條消息給消費者 33 channel.basicQos(1); 34 35 // 定義隊列的消費者 36 Consumer consumer = new DefaultConsumer(channel) { 37 @Override 38 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 39 throws IOException { 40 super.handleDelivery(consumerTag, envelope, properties, body); 41 System.out.println(new String(body,"UTF-8")); 42 } 43 }; 44 channel.basicConsume(QUEUE_NAME,true,consumer); 45 } 46 }
能夠理解爲Routing的通配符模式,以下圖:
「#」:表示匹配一個或多個詞;(lazy.a.b.c)
「*」:表示匹配一個詞;(a.orange.b)
建立生產者Producer:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Producer { 9 10 private static final String EXCHANGE_NAME = "test_exchange_topic"; 11 12 public static void main(String[] args) throws IOException, TimeoutException { 13 Connection connection = ConnectionUtil.getConnection(); 14 Channel channel = connection.createChannel(); 15 //聲明交換機 16 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); 17 String message = "匹配insert"; 18 channel.basicPublish(EXCHANGE_NAME,"order.update",false,false,null,message.getBytes()); 19 20 channel.close(); 21 connection.close(); 22 } 23 }
建立消費者Consumer一、2:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 11 public class Consumer1 { 12 13 private static final String EXCHANGE_NAME = "test_exchange_topic"; 14 15 private static final String QUEUE_NAME = "test_queue_topic_1"; 16 17 public static void main(String[] args) throws IOException { 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 //channel.queueDeclare(QUEUE_NAME,false,false,false,null); 21 channel.exchangeDeclare(EXCHANGE_NAME,"topic"); 22 //order.# 23 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order.*"); 24 25 channel.basicQos(1); 26 27 Consumer consumer = new DefaultConsumer(channel) { 28 @Override 29 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) 30 throws IOException { 31 super.handleDelivery(consumerTag, envelope, properties, body); 32 System.out.println(new String(body,"UTF-8")); 33 } 34 }; 35 channel.basicConsume(QUEUE_NAME,true,consumer); 36 37 } 38 }
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP.BasicProperties; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 11 public class Consumer2 { 12 13 private static final String EXCHANGE_NAME = "test_exchange_topic"; 14 15 private static final String QUEUE_NAME = "test_queue_topic_2"; 16 17 public static void main(String[] args) throws IOException { 18 Connection connection = ConnectionUtil.getConnection(); 19 Channel channel = connection.createChannel(); 20 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 21 22 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order.insert"); 23 24 channel.basicQos(1); 25 26 Consumer consumer = new DefaultConsumer(channel) { 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { 29 super.handleDelivery(consumerTag, envelope, properties, body); 30 System.out.println("接收消息:" + new String(body, "UTF-8")); 31 } 32 }; 33 channel.basicConsume(QUEUE_NAME,true,consumer); 34 35 } 36 }
若是咱們須要在遠程計算機上運行一個函數並等待結果,這種模式一般稱爲遠程過程調用或RPC;
建立RPC服務:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.Consumer; 6 import com.rabbitmq.client.DefaultConsumer; 7 import com.rabbitmq.client.Envelope; 8 9 import java.io.IOException; 10 import java.util.concurrent.TimeoutException; 11 12 public class RPCServer { 13 14 private static final String RPC_QUEUE_NAME = "rpc_queue"; 15 16 public static void main(String[] args) throws IOException, TimeoutException { 17 Connection connection = ConnectionUtil.getConnection(); 18 final Channel channel = connection.createChannel(); 19 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); 20 channel.basicQos(1); 21 22 Consumer consumer = new DefaultConsumer(channel) { 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 super.handleDelivery(consumerTag, envelope, properties, body); 26 AMQP.BasicProperties properties1 = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build(); 27 String mes = new String(body, "UTF-8"); 28 int num = Integer.valueOf(mes); 29 System.out.println("接收數據:" + num); 30 num = fib(num); 31 channel.basicPublish("", properties.getReplyTo(), properties1, String.valueOf(num).getBytes()); 32 channel.basicAck(envelope.getDeliveryTag(), false); 33 } 34 }; 35 channel.basicConsume(RPC_QUEUE_NAME, false, consumer); 36 while (true) { 37 synchronized (consumer) { 38 try { 39 consumer.wait(); 40 } catch (InterruptedException e) { 41 e.printStackTrace(); 42 } 43 } 44 } 45 } 46 47 /* 48 獲取斐波那契數列的第n個值得大小 49 */ 50 private static int fib(int n) { 51 System.out.println(n); 52 if (n == 0) 53 return 0; 54 if (n == 1) 55 return 1; 56 return fib(n - 1) + fib(n - 2); 57 } 58 }
建立RPC客戶端:
1 import com.cn.ConnectionUtil; 2 import com.rabbitmq.client.AMQP; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.DefaultConsumer; 6 import com.rabbitmq.client.Envelope; 7 8 import java.io.IOException; 9 import java.util.UUID; 10 import java.util.concurrent.ArrayBlockingQueue; 11 import java.util.concurrent.BlockingQueue; 12 import java.util.concurrent.TimeoutException; 13 14 public class RPCClient { 15 16 private Connection connection; 17 private Channel channel; 18 private String requestQueueName = "rpc_queue"; 19 private String replyQueueName; 20 21 public RPCClient() throws IOException, TimeoutException { 22 connection = ConnectionUtil.getConnection(); 23 channel = connection.createChannel(); 24 25 replyQueueName = channel.queueDeclare().getQueue(); 26 } 27 28 public String call(String message) throws IOException, InterruptedException { 29 final String corrId = UUID.randomUUID().toString(); 30 31 AMQP.BasicProperties props = new AMQP.BasicProperties 32 .Builder() 33 .correlationId(corrId) 34 .replyTo(replyQueueName) 35 .build(); 36 37 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); 38 39 final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1); 40 41 channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) { 42 @Override 43 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 44 if (properties.getCorrelationId().equals(corrId)) { 45 response.offer(new String(body, "UTF-8")); 46 } 47 } 48 }); 49 50 //close(); 51 return response.take(); 52 } 53 54 public void close() throws IOException { 55 connection.close(); 56 } 57 58 }
建立RPC測試類:
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 public class RPCTest { 5 6 public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { 7 RPCClient rpcClient = new RPCClient(); 8 System.out.println(rpcClient.call("2")); 9 } 10 }