RabbitMQ是由Erlang語言開發,基於AMQP(高級消息隊列協議)協議實現的消息隊列。spring
補充:JMS,Java提供的一套消息服務API標準。服務器
應用場景:併發
將不須要同步處理的而且耗時長的操做由消息隊列通知消息接收方進行異步處理,提升了應用程序的響應時間。app
MQ至關於一箇中介,生產方經過MQ與消費方交互,它將應用程序進行解耦合。異步
一、使用簡單,功能強大maven
二、基於AMQP協議分佈式
三、社區活躍,文檔完善ide
四、高併發性能好,得益於Erlang語言高併發
五、SpringBoot默認集成RabbitMQ工具
RabbitMQ的工做原理
基本結構以下:
組成部分說明:
消息發佈接收流程:
— — — — — —發送消息— — — — — —
— — — — — —接收消息— — — — — —
關於安裝
RabbitMQ有Erlang語言開發,Erlang語言用於併發及分佈式系統的開發,OTP(Open Telecon Platform)做爲Erlang語言的一部分,包含了不少基於Erlang開發的中間件及工具庫,安裝RabbitMQ須要安裝Erlang/OTP,並保持版本匹配。
咱們後面的案例採用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
具體的安裝過程這裏就不贅述,網上搜索教程便可。
咱們首先用RabbitMQ官方提供的Java client測試,瞭解一下RabbitMQ的交互過程。
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>5.1.2</version> 5 </dependency>
1 public class Producer01 { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE = "hello rabbitmq"; 6 7 public static void main(String[] args) { 8 Connection connection = null; 9 Channel channel = null; 10 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 factory.setPort(5672); 14 factory.setUsername("guest"); 15 factory.setPassword("guest"); 16 // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器 17 factory.setVirtualHost("/"); 18 19 try { 20 // 建立於RabbitMQ服務的鏈接 21 connection = factory.newConnection(); 22 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 23 channel = connection.createChannel(); 24 /** 25 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 26 * param1:隊列名稱 27 * param2:是否持久化 28 * param3:隊列是否獨佔此鏈接 29 * param4:隊列再也不使用時自動刪除此列 30 * param5:隊列參數 31 */ 32 channel.queueDeclare(QUEUE,true,false,false,null); 33 String message = "hello rabbit:" + System.currentTimeMillis(); 34 35 /** 36 * 消息發佈方法 37 * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 38 * 這裏沒有指定交換機,消息將發送給默認交換機,每一個隊列也會綁定默認交換機,可是不能顯示綁定或解除綁定 39 * 使用默認的交換機,routingkey等於隊列名稱 40 * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 41 * param3:消息包含的屬性 42 * param4:消息體 43 */ 44 channel.basicPublish("",QUEUE,null,message.getBytes()); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } catch (TimeoutException e) { 48 e.printStackTrace(); 49 } finally { 50 if (channel != null){ 51 try { 52 channel.close(); 53 } catch (IOException e) { 54 e.printStackTrace(); 55 } catch (TimeoutException e) { 56 e.printStackTrace(); 57 } 58 } 59 if (connection != null){ 60 try { 61 connection.close(); 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } 65 } 66 } 67 } 68 }
1 public class Consumer01 { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE = "hello rabbitmq"; 6 7 public static void main(String[] args) { 8 Connection connection = null; 9 Channel channel = null; 10 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 factory.setPort(5672); 14 15 try { 16 // 建立於RabbitMQ服務的鏈接 17 connection = factory.newConnection(); 18 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 19 channel = connection.createChannel(); 20 /** 21 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 22 * param1:隊列名稱 23 * param2:是否持久化 24 * param3:隊列是否獨佔此鏈接 25 * param4:隊列再也不使用時自動刪除此列 26 * param5:隊列參數 27 */ 28 channel.queueDeclare(QUEUE,true,false,false,null); 29 // 定義消費方法 30 DefaultConsumer consumer = new DefaultConsumer(channel){ 31 32 /** 33 * 消費者接受消息調用此方法 34 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 35 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 36 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 37 * @param properties 38 * @param body 39 * @throws IOException 40 */ 41 @Override 42 public void handleDelivery(String consumerTag, 43 Envelope envelope, 44 AMQP.BasicProperties properties, 45 byte[] body) throws IOException { 46 // 交換機 47 String exchange = envelope.getExchange(); 48 // routingkey 49 String routingKey = envelope.getRoutingKey(); 50 // 消息id 51 long deliveryTag = envelope.getDeliveryTag(); 52 // 消息內容 53 String msg = new String(body, "utf-8"); 54 System.out.println("receive message:" + msg); 55 } 56 }; 57 /** 58 * 監聽隊列String queue, boolean autoAck, Consumer callback 59 * param1:隊列名稱 60 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 61 * 設置爲false則須要手動回覆 62 * param3:消費消息的方法,消費者接收到消息後調用此方法 63 */ 64 channel.basicConsume(QUEUE, true, consumer); 65 } catch (IOException e) { 66 e.printStackTrace(); 67 } catch (TimeoutException e) { 68 e.printStackTrace(); 69 } 70 } 71 }
啓動消費者工程中的main方法進行監聽,再啓動生產者中的main方法。注:二者的啓動順序能夠顛倒。控制檯打印到接收到的消息以下:
發送端操做流程:
建立鏈接——>建立通道——>聲明隊列——>發送消息
接收端:
建立鏈接——>建立通道——>聲明隊列——>監聽隊列——>接收消息——>ack回覆
RabbitMQ有如下幾種工做模式:
Work queues與入門程序比,多以消費端,兩個消費端同時消費同一個隊列中的消息
發佈訂閱模式
1 public class Producer02Publish { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE_EMAIL_INFORM = "queue email inform"; 6 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 7 /** 8 * 聲明EXCHANGE_FANOUT_INFORM交換機 9 */ 10 private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 11 12 public static void main(String[] args) { 13 Connection connection = null; 14 Channel channel = null; 15 16 // 建立一個與MQ的鏈接 17 ConnectionFactory factory = new ConnectionFactory(); 18 factory.setHost("localhost"); 19 factory.setPort(5672); 20 factory.setUsername("guest"); 21 factory.setPassword("guest"); 22 // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器 23 factory.setVirtualHost("/"); 24 25 try { 26 // 建立於RabbitMQ服務的鏈接 27 connection = factory.newConnection(); 28 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 29 channel = connection.createChannel(); 30 /** 31 * 聲明交換機 32 * param1:交換機名稱 33 * param2:交換機類型,fanout、topic、direct、headers 34 */ 35 channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 36 /** 37 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 38 * param1:隊列名稱 39 * param2:是否持久化 40 * param3:隊列是否獨佔此鏈接 41 * param4:隊列再也不使用時自動刪除此列 42 * param5:隊列參數 43 */ 44 channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 45 channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 46 /** 47 * 交換機和隊列綁定 48 * param1:隊列名稱 49 * param2:交換機名稱 50 * param3:路由key 51 */ 52 channel.queueBind(QUEUE_EMAIL_INFORM,EXCHANGE_FANOUT_INFORM,""); 53 channel.queueBind(QUEUE_SMS_INFORM,EXCHANGE_FANOUT_INFORM,""); 54 55 // 發送消息 56 for (int i = 0; i<10; i++){ 57 String message = "inform to user:" + i; 58 /** 59 * 消息發佈方法 60 * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 61 * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 62 * param3:消息包含的屬性 63 * param4:消息體 64 */ 65 channel.basicPublish(EXCHANGE_FANOUT_INFORM,"",null,message.getBytes()); 66 67 } 68 } catch (IOException e) { 69 e.printStackTrace(); 70 } catch (TimeoutException e) { 71 e.printStackTrace(); 72 } finally { 73 if (channel != null){ 74 try { 75 channel.close(); 76 } catch (IOException e) { 77 e.printStackTrace(); 78 } catch (TimeoutException e) { 79 e.printStackTrace(); 80 } 81 } 82 if (connection != null){ 83 try { 84 connection.close(); 85 } catch (IOException e) { 86 e.printStackTrace(); 87 } 88 } 89 } 90 } 91 }
1 public class Consumer02SubscribeEmail { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE_EMAIL_INFORM = "queue email inform"; 6 /** 7 * 聲明EXCHANGE_FANOUT_INFORM交換機 8 */ 9 private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 11 public static void main(String[] args) { 12 Connection connection = null; 13 Channel channel = null; 14 15 // 建立一個與MQ的鏈接 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("localhost"); 18 factory.setPort(5672); 19 factory.setUsername("guest"); 20 factory.setPassword("guest"); 21 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 22 factory.setVirtualHost("/"); 23 try { 24 // 建立於RabbitMQ服務的鏈接 25 connection = factory.newConnection(); 26 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 27 channel = connection.createChannel(); 28 /** 29 * param1:交換機名稱 30 * param2:交換機類型,fanout、topic、direct、headers 31 */ 32 channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33 /** 34 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 35 * param1:隊列名稱 36 * param2:是否持久化 37 * param3:隊列是否獨佔此鏈接 38 * param4:隊列再也不使用時自動刪除此列 39 * param5:隊列參數 40 */ 41 channel.queueDeclare(QUEUE_EMAIL_INFORM,true,false,false,null); 42 43 /** 44 * 交換機和隊列綁定 45 * param1:隊列名稱 46 * param2:交換機名稱 47 * param3:路由key 48 */ 49 channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50 // 定義消費方法 51 DefaultConsumer consumer = new DefaultConsumer(channel){ 52 53 /** 54 * 消費者接受消息調用此方法 55 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 56 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 57 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 58 * @param properties 59 * @param body 60 * @throws IOException 61 */ 62 @Override 63 public void handleDelivery(String consumerTag, 64 Envelope envelope, 65 AMQP.BasicProperties properties, 66 byte[] body) throws IOException { 67 // 消息id 68 long deliveryTag = envelope.getDeliveryTag(); 69 // 交換機 70 String exchange = envelope.getExchange(); 71 // 消息內容 72 String msg = new String(body, "utf-8"); 73 System.out.println("receive message:" + msg); 74 } 75 }; 76 /** 77 * 監聽隊列String queue, boolean autoAck, Consumer callback 78 * param1:隊列名稱 79 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 80 * 設置爲false則須要手動回覆 81 * param3:消費消息的方法,消費者接收到消息後調用此方法 82 */ 83 channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 84 } catch (IOException e) { 85 e.printStackTrace(); 86 } catch (TimeoutException e) { 87 e.printStackTrace(); 88 } 89 } 90 }
1 public class Consumer02SubscribeSms { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 6 /** 7 * 聲明EXCHANGE_FANOUT_INFORM交換機 8 */ 9 private static final String EXCHANGE_FANOUT_INFORM = "exchange fanout inform"; 10 11 public static void main(String[] args) { 12 Connection connection = null; 13 Channel channel = null; 14 15 // 建立一個與MQ的鏈接 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("localhost"); 18 factory.setPort(5672); 19 factory.setUsername("guest"); 20 factory.setPassword("guest"); 21 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 22 factory.setVirtualHost("/"); 23 try { 24 // 建立於RabbitMQ服務的鏈接 25 connection = factory.newConnection(); 26 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 27 channel = connection.createChannel(); 28 /** 29 * param1:交換機名稱 30 * param2:交換機類型,fanout、topic、direct、headers 31 */ 32 channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT); 33 /** 34 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 35 * param1:隊列名稱 36 * param2:是否持久化 37 * param3:隊列是否獨佔此鏈接 38 * param4:隊列再也不使用時自動刪除此列 39 * param5:隊列參數 40 */ 41 channel.queueDeclare(QUEUE_SMS_INFORM,true,false,false,null); 42 43 /** 44 * 交換機和隊列綁定 45 * param1:隊列名稱 46 * param2:交換機名稱 47 * param3:路由key 48 */ 49 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_FANOUT_INFORM, ""); 50 // 定義消費方法 51 DefaultConsumer consumer = new DefaultConsumer(channel){ 52 53 /** 54 * 消費者接受消息調用此方法 55 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 56 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 57 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 58 * @param properties 59 * @param body 60 * @throws IOException 61 */ 62 @Override 63 public void handleDelivery(String consumerTag, 64 Envelope envelope, 65 AMQP.BasicProperties properties, 66 byte[] body) throws IOException { 67 // 消息id 68 long deliveryTag = envelope.getDeliveryTag(); 69 // 交換機 70 String exchange = envelope.getExchange(); 71 // 消息內容 72 String msg = new String(body, "utf-8"); 73 System.out.println("receive message:" + msg); 74 } 75 }; 76 /** 77 * 監聽隊列String queue, boolean autoAck, Consumer callback 78 * param1:隊列名稱 79 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 80 * 設置爲false則須要手動回覆 81 * param3:消費消息的方法,消費者接收到消息後調用此方法 82 */ 83 channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 84 } catch (IOException e) { 85 e.printStackTrace(); 86 } catch (TimeoutException e) { 87 e.printStackTrace(); 88 } 89 } 90 }
路由模式:
1 public class Producer03Routing { 2 /** 3 * 隊列名稱 4 * 路由名稱設定與隊列名稱同樣 5 */ 6 private static final String QUEUE_EMAIL_INFORM = "queue email inform"; 7 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 8 /** 9 * 聲明EXCHANGE_ROUTING_INFORM交換機 10 */ 11 private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 12 13 public static void main(String[] args) { 14 Connection connection = null; 15 Channel channel = null; 16 17 // 建立一個與MQ的鏈接 18 ConnectionFactory factory = new ConnectionFactory(); 19 factory.setHost("localhost"); 20 factory.setPort(5672); 21 factory.setUsername("guest"); 22 factory.setPassword("guest"); 23 // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器 24 factory.setVirtualHost("/"); 25 26 try { 27 // 建立於RabbitMQ服務的鏈接 28 connection = factory.newConnection(); 29 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 30 channel = connection.createChannel(); 31 /** 32 * 聲明交換機 33 * param1:交換機名稱 34 * param2:交換機類型,fanout、topic、direct、headers 35 */ 36 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 37 /** 38 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 39 * param1:隊列名稱 40 * param2:是否持久化 41 * param3:隊列是否獨佔此鏈接 42 * param4:隊列再也不使用時自動刪除此列 43 * param5:隊列參數 44 */ 45 channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 46 channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47 /** 48 * 交換機和隊列綁定 49 * param1:隊列名稱 50 * param2:交換機名稱 51 * param3:路由key 52 */ 53 channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 54 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 55 56 // 發送消息 57 for (int i = 0; i < 10; i++) { 58 String message = "inform to user:" + i; 59 /** 60 * 消息發佈方法 61 * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 62 * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 63 * param3:消息包含的屬性 64 * param4:消息體 65 */ 66 channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM, null, message.getBytes()); 67 } 68 } catch (IOException e) { 69 e.printStackTrace(); 70 } catch (TimeoutException e) { 71 e.printStackTrace(); 72 } finally { 73 if (channel != null) { 74 try { 75 channel.close(); 76 } catch (IOException e) { 77 e.printStackTrace(); 78 } catch (TimeoutException e) { 79 e.printStackTrace(); 80 } 81 } 82 if (connection != null) { 83 try { 84 connection.close(); 85 } catch (IOException e) { 86 e.printStackTrace(); 87 } 88 } 89 } 90 } 91 }
1 public class Consumer03RoutingEmail { 2 /** 3 * 隊列名稱 4 * 路由名稱設定與隊列名稱同樣 5 */ 6 private static final String QUEUE_EMAIL_INFORM = "queue email inform"; 7 /** 8 * 聲明EXCHANGE_ROUTING_INFORM交換機 9 */ 10 private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 12 public static void main(String[] args) { 13 Connection connection = null; 14 Channel channel = null; 15 16 // 建立一個與MQ的鏈接 17 ConnectionFactory factory = new ConnectionFactory(); 18 factory.setHost("localhost"); 19 factory.setPort(5672); 20 factory.setUsername("guest"); 21 factory.setPassword("guest"); 22 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 23 factory.setVirtualHost("/"); 24 try { 25 // 建立於RabbitMQ服務的鏈接 26 connection = factory.newConnection(); 27 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 28 channel = connection.createChannel(); 29 /** 30 * param1:交換機名稱 31 * param2:交換機類型,fanout、topic、direct、headers 32 */ 33 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34 /** 35 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 36 * param1:隊列名稱 37 * param2:是否持久化 38 * param3:隊列是否獨佔此鏈接 39 * param4:隊列再也不使用時自動刪除此列 40 * param5:隊列參數 41 */ 42 channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 43 44 /** 45 * 交換機和隊列綁定 46 * param1:隊列名稱 47 * param2:交換機名稱 48 * param3:路由key 49 */ 50 channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_EMAIL_INFORM); 51 // 定義消費方法 52 DefaultConsumer consumer = new DefaultConsumer(channel) { 53 54 /** 55 * 消費者接受消息調用此方法 56 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 57 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 58 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 59 * @param properties 60 * @param body 61 * @throws IOException 62 */ 63 @Override 64 public void handleDelivery(String consumerTag, 65 Envelope envelope, 66 AMQP.BasicProperties properties, 67 byte[] body) throws IOException { 68 // 消息id 69 long deliveryTag = envelope.getDeliveryTag(); 70 // 交換機 71 String exchange = envelope.getExchange(); 72 // 消息內容 73 String msg = new String(body, "utf-8"); 74 System.out.println("email receive message:" + msg); 75 } 76 }; 77 /** 78 * 監聽隊列String queue, boolean autoAck, Consumer callback 79 * param1:隊列名稱 80 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 81 * 設置爲false則須要手動回覆 82 * param3:消費消息的方法,消費者接收到消息後調用此方法 83 */ 84 channel.basicConsume(QUEUE_EMAIL_INFORM, true, consumer); 85 } catch (IOException e) { 86 e.printStackTrace(); 87 } catch (TimeoutException e) { 88 e.printStackTrace(); 89 } 90 } 91 }
1 public class Consumer03RoutingSms { 2 /** 3 * 隊列名稱 4 * 路由名稱設定與隊列名稱同樣 5 */ 6 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 7 /** 8 * 聲明EXCHANGE_ROUTING_INFORM交換機 9 */ 10 private static final String EXCHANGE_ROUTING_INFORM = "exchange routing inform"; 11 12 public static void main(String[] args) { 13 Connection connection = null; 14 Channel channel = null; 15 16 // 建立一個與MQ的鏈接 17 ConnectionFactory factory = new ConnectionFactory(); 18 factory.setHost("localhost"); 19 factory.setPort(5672); 20 factory.setUsername("guest"); 21 factory.setPassword("guest"); 22 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 23 factory.setVirtualHost("/"); 24 try { 25 // 建立於RabbitMQ服務的鏈接 26 connection = factory.newConnection(); 27 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 28 channel = connection.createChannel(); 29 /** 30 * param1:交換機名稱 31 * param2:交換機類型,fanout、topic、direct、headers 32 */ 33 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); 34 /** 35 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 36 * param1:隊列名稱 37 * param2:是否持久化 38 * param3:隊列是否獨佔此鏈接 39 * param4:隊列再也不使用時自動刪除此列 40 * param5:隊列參數 41 */ 42 channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 43 44 /** 45 * 交換機和隊列綁定 46 * param1:隊列名稱 47 * param2:交換機名稱 48 * param3:路由key 49 */ 50 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_ROUTING_INFORM, QUEUE_SMS_INFORM); 51 // 定義消費方法 52 DefaultConsumer consumer = new DefaultConsumer(channel) { 53 54 /** 55 * 消費者接受消息調用此方法 56 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 57 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 58 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 59 * @param properties 60 * @param body 61 * @throws IOException 62 */ 63 @Override 64 public void handleDelivery(String consumerTag, 65 Envelope envelope, 66 AMQP.BasicProperties properties, 67 byte[] body) throws IOException { 68 // 消息id 69 long deliveryTag = envelope.getDeliveryTag(); 70 // 交換機 71 String exchange = envelope.getExchange(); 72 // 消息內容 73 String msg = new String(body, "utf-8"); 74 System.out.println("sms receive message:" + msg); 75 } 76 }; 77 /** 78 * 監聽隊列String queue, boolean autoAck, Consumer callback 79 * param1:隊列名稱 80 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 81 * 設置爲false則須要手動回覆 82 * param3:消費消息的方法,消費者接收到消息後調用此方法 83 */ 84 channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 85 } catch (IOException e) { 86 e.printStackTrace(); 87 } catch (TimeoutException e) { 88 e.printStackTrace(); 89 } 90 } 91 }
匹配模式
1 public class Producer04Topics { 2 /** 3 * 隊列名稱 4 */ 5 private static final String QUEUE_EMAIL_INFORM = "queue email inform"; 6 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 7 /** 8 * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語 9 */ 10 private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 11 private static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 12 /** 13 * 聲明EXCHANGE_TOPICS_INFORM交換機 14 */ 15 private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 16 17 public static void main(String[] args) { 18 Connection connection = null; 19 Channel channel = null; 20 21 // 建立一個與MQ的鏈接 22 ConnectionFactory factory = new ConnectionFactory(); 23 factory.setHost("localhost"); 24 factory.setPort(5672); 25 factory.setUsername("guest"); 26 factory.setPassword("guest"); 27 // RabbitMQ默認虛擬機名稱爲"/",虛擬機至關於一個獨立的MQ服務器 28 factory.setVirtualHost("/"); 29 30 try { 31 // 建立於RabbitMQ服務的鏈接 32 connection = factory.newConnection(); 33 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 34 channel = connection.createChannel(); 35 /** 36 * 聲明交換機 37 * param1:交換機名稱 38 * param2:交換機類型,fanout、topic、direct、headers 39 */ 40 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 41 /** 42 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 43 * param1:隊列名稱 44 * param2:是否持久化 45 * param3:隊列是否獨佔此鏈接 46 * param4:隊列再也不使用時自動刪除此列 47 * param5:隊列參數 48 */ 49 channel.queueDeclare(QUEUE_EMAIL_INFORM, true, false, false, null); 50 channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 51 /** 52 * 交換機和隊列綁定 53 * param1:隊列名稱 54 * param2:交換機名稱 55 * param3:路由key 56 */ 57 channel.queueBind(QUEUE_EMAIL_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL); 58 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS); 59 60 61 /** 62 * 消息發佈方法 63 * param1:Exchange的名稱,若是沒有指定,則使用Default Exchange 64 * param2:routingkey,消息的路由,適用於Exchange將消息轉發到指定的消息隊列 65 * param3:消息包含的屬性 66 * param4:消息體 67 */ 68 for (int i = 0; i < 5; i++) { 69 //發送消息的時候指定routingKey 70 String message = "send email inform message to user"; 71 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes()); 72 System.out.println("send to mq " + message); 73 } 74 for (int i = 0; i < 5; i++) { 75 //發送消息的時候指定routingKey 76 String message = "send sms inform message to user"; 77 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes()); 78 System.out.println("send to mq " + message); 79 } 80 for (int i = 0; i < 5; i++) { 81 //發送消息的時候指定routingKey 82 String message = "send sms and email inform message to user"; 83 channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes()); 84 System.out.println("send to mq " + message); 85 } 86 } catch (IOException e) { 87 e.printStackTrace(); 88 } catch (TimeoutException e) { 89 e.printStackTrace(); 90 } finally { 91 if (channel != null) { 92 try { 93 channel.close(); 94 } catch (IOException e) { 95 e.printStackTrace(); 96 } catch (TimeoutException e) { 97 e.printStackTrace(); 98 } 99 } 100 if (connection != null) { 101 try { 102 connection.close(); 103 } catch (IOException e) { 104 e.printStackTrace(); 105 } 106 } 107 } 108 } 109 }
1 public class Consumer04TopicsEmail { 2 /** 3 * 隊列名稱 4 * 路由名稱設定與隊列名稱同樣 5 */ 6 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 7 8 /** 9 * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語 10 */ 11 private static final String ROUTINGKEY_EMAIL = "inform.#.email.#"; 12 /** 13 * 聲明EXCHANGE_TOPICS_INFORM交換機 14 */ 15 private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 16 17 public static void main(String[] args) { 18 Connection connection = null; 19 Channel channel = null; 20 21 // 建立一個與MQ的鏈接 22 ConnectionFactory factory = new ConnectionFactory(); 23 factory.setHost("localhost"); 24 factory.setPort(5672); 25 factory.setUsername("guest"); 26 factory.setPassword("guest"); 27 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 28 factory.setVirtualHost("/"); 29 try { 30 // 建立於RabbitMQ服務的鏈接 31 connection = factory.newConnection(); 32 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 33 channel = connection.createChannel(); 34 /** 35 * param1:交換機名稱 36 * param2:交換機類型,fanout、topic、direct、headers 37 */ 38 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 39 /** 40 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 41 * param1:隊列名稱 42 * param2:是否持久化 43 * param3:隊列是否獨佔此鏈接 44 * param4:隊列再也不使用時自動刪除此列 45 * param5:隊列參數 46 */ 47 channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 48 49 /** 50 * 交換機和隊列綁定 51 * param1:隊列名稱 52 * param2:交換機名稱 53 * param3:路由key 54 */ 55 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_EMAIL); 56 // 定義消費方法 57 DefaultConsumer consumer = new DefaultConsumer(channel) { 58 59 /** 60 * 消費者接受消息調用此方法 61 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 62 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 63 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 64 * @param properties 65 * @param body 66 * @throws IOException 67 */ 68 @Override 69 public void handleDelivery(String consumerTag, 70 Envelope envelope, 71 AMQP.BasicProperties properties, 72 byte[] body) throws IOException { 73 // 消息id 74 long deliveryTag = envelope.getDeliveryTag(); 75 // 交換機 76 String exchange = envelope.getExchange(); 77 // 消息內容 78 String msg = new String(body, "utf-8"); 79 System.out.println("email receive message:" + msg); 80 } 81 }; 82 /** 83 * 監聽隊列String queue, boolean autoAck, Consumer callback 84 * param1:隊列名稱 85 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 86 * 設置爲false則須要手動回覆 87 * param3:消費消息的方法,消費者接收到消息後調用此方法 88 */ 89 channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 90 } catch (IOException e) { 91 e.printStackTrace(); 92 } catch (TimeoutException e) { 93 e.printStackTrace(); 94 } 95 } 96 }
1 public class Consumer04TopicsSms { 2 /** 3 * 隊列名稱 4 * 路由名稱設定與隊列名稱同樣 5 */ 6 private static final String QUEUE_SMS_INFORM = "queue sms inform"; 7 /** 8 * 路由key,#表明能夠匹配多個詞,符號*能夠匹配一個詞語 9 */ 10 private static final String ROUTINGKEY_SMS = "inform.#.sms.#"; 11 /** 12 * 聲明EXCHANGE_TOPICS_INFORM交換機 13 */ 14 private static final String EXCHANGE_TOPICS_INFORM = "exchange topics inform"; 15 16 public static void main(String[] args) { 17 Connection connection = null; 18 Channel channel = null; 19 20 // 建立一個與MQ的鏈接 21 ConnectionFactory factory = new ConnectionFactory(); 22 factory.setHost("localhost"); 23 factory.setPort(5672); 24 factory.setUsername("guest"); 25 factory.setPassword("guest"); 26 // rabbitmq默認虛擬機名稱爲「/」,虛擬機至關於一個獨立的mq服務器 27 factory.setVirtualHost("/"); 28 try { 29 // 建立於RabbitMQ服務的鏈接 30 connection = factory.newConnection(); 31 // 建立與Exchange的通道,每一個鏈接能夠建立多個通道,每一個通道表明一個會話任務 32 channel = connection.createChannel(); 33 /** 34 * param1:交換機名稱 35 * param2:交換機類型,fanout、topic、direct、headers 36 */ 37 channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC); 38 /** 39 * 聲明隊列,若是RabbitMQ中沒有此隊列將自動建立 40 * param1:隊列名稱 41 * param2:是否持久化 42 * param3:隊列是否獨佔此鏈接 43 * param4:隊列再也不使用時自動刪除此列 44 * param5:隊列參數 45 */ 46 channel.queueDeclare(QUEUE_SMS_INFORM, true, false, false, null); 47 48 /** 49 * 交換機和隊列綁定 50 * param1:隊列名稱 51 * param2:交換機名稱 52 * param3:路由key 53 */ 54 channel.queueBind(QUEUE_SMS_INFORM, EXCHANGE_TOPICS_INFORM, ROUTINGKEY_SMS); 55 // 定義消費方法 56 DefaultConsumer consumer = new DefaultConsumer(channel) { 57 58 /** 59 * 消費者接受消息調用此方法 60 * @param consumerTag 消費者標籤,在channel,basicConsumer()去指定 61 * @param envelope 消息包含的的內容,能夠從中獲取消息id,消息routingkey,交換機, 62 * 消息和重傳標誌(收到消息失敗後是否須要從新發送) 63 * @param properties 64 * @param body 65 * @throws IOException 66 */ 67 @Override 68 public void handleDelivery(String consumerTag, 69 Envelope envelope, 70 AMQP.BasicProperties properties, 71 byte[] body) throws IOException { 72 // 消息id 73 long deliveryTag = envelope.getDeliveryTag(); 74 // 交換機 75 String exchange = envelope.getExchange(); 76 // 消息內容 77 String msg = new String(body, "utf-8"); 78 System.out.println("sms receive message:" + msg); 79 } 80 }; 81 /** 82 * 監聽隊列String queue, boolean autoAck, Consumer callback 83 * param1:隊列名稱 84 * param2:是否自動回覆,設置爲true爲表示消息接收到自動向mq回覆收到了,mq接收到回覆會刪除消息 85 * 設置爲false則須要手動回覆 86 * param3:消費消息的方法,消費者接收到消息後調用此方法 87 */ 88 channel.basicConsume(QUEUE_SMS_INFORM, true, consumer); 89 } catch (IOException e) { 90 e.printStackTrace(); 91 } catch (TimeoutException e) { 92 e.printStackTrace(); 93 } 94 } 95 }
1 Map<String, Object> headers_email = new Hashtable<String, Object>(); 2 headers_email.put("inform_type", "email"); 3 Map<String, Object> headers_sms = new Hashtable<String, Object>(); 4 headers_sms.put("inform_type", "sms"); 5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
1 String message = "email inform to user"+i; 2 Map<String,Object> headers = new Hashtable<String, Object>(); 3 headers.put("inform_type", "email");//匹配email通知消費者綁定的header 4 //headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header 5 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); 6 properties.headers(headers); 7 //Email通知 8 channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());
1 channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS); 2 Map<String, Object> headers_email = new Hashtable<String, Object>(); 3 headers_email.put("inform_email", "email"); 4 //交換機和隊列綁定 5 channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email); 6 //指定消費隊列 7 channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
RPC即客戶端遠程調用服務端的方法,使用MQ能夠實現RPC的異步調用,基於Direct交換機實現,流程以下:
咱們選擇基於Spring-Rabbit去操做RabbitMQ
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring‐boot‐starter‐amqp</artifactId> 4 </dependency> 5 <dependency> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring‐boot‐starter‐test</artifactId> 8 </dependency> 9 <dependency> 10 <groupId>org.springframework.boot</groupId> 11 <artifactId>spring‐boot‐starter‐logging</artifactId> 12 </dependency>
1 server: 2 port: 44000 3 spring: 4 application: 5 name: test‐rabbitmq‐producer/consumer #兩個工程的名稱 6 rabbitmq: 7 host: 127.0.0.1 8 port: 5672 9 username: guest 10 password: guest 11 virtualHost: /
1 @Configuration 2 public class RabbitmqConfig { 3 public static final String QUEUE_EMAIL_INFORM = "queue_email_inform"; 4 public static final String QUEUE_SMS_INFORM = "queue_sms_inform"; 5 public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform"; 6 7 /** 8 * 聲明交換機 9 * @return 10 */ 11 @Bean(EXCHANGE_TOPICS_INFORM) 12 public Exchange EXCHANGE_TOPICS_INFORM(){ 13 // durable(true):持久化,消息隊列重啓後交換機仍然存在 14 return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); 15 } 16 17 /** 18 * 聲明隊列 19 * @return 20 */ 21 @Bean(QUEUE_EMAIL_INFORM) 22 public Queue QUEUE_EMAIL_INFORM(){ 23 return new Queue(QUEUE_EMAIL_INFORM); 24 } 25 26 @Bean(QUEUE_SMS_INFORM) 27 public Queue QUEUE_SMS_INFORM(){ 28 return new Queue(QUEUE_SMS_INFORM); 29 } 30 31 /** 32 * 綁定隊列到交換機 33 * @param queue 34 * @param exchange 35 * @return 36 */ 37 public Binding BINDING_QUEUE_EMAIL(@Qualifier(QUEUE_EMAIL_INFORM) Queue queue, 38 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 39 return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs(); 40 } 41 42 public Binding BINDING_QUEUE_SMS(@Qualifier(QUEUE_SMS_INFORM) Queue queue, 43 @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ 44 return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs(); 45 } 46 }
1 @Component 2 public class ReceiveHandler { 3 4 // 監聽email隊列 5 @RabbitListener(queues = {RabbitmqConfig.QUEUE_EMAIL_INFORM}) 6 public void receiveEmail(String msg) { 7 System.out.println("email隊列接收到消息:" + msg); 8 } 9 10 // 監聽sms隊列 11 @RabbitListener(queues = {RabbitmqConfig.QUEUE_SMS_INFORM}) 12 public void receiveSMS(String msg) { 13 System.out.println("sms隊列接收到消息:" + msg); 14 } 15 }
1 @RunWith(SpringRunner.class) 2 @SpringBootTest 3 public class SpringBootTopicsProducerTest { 4 5 @Autowired 6 public RabbitTemplate rabbitTemplate; 7 8 @Test 9 public void testSendMessageByTopics(){ 10 for (int i = 0; i < 5; i++) { 11 String message = "sms email inform to user"+i; 12 rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message); 13 } 14 } 15 }
消費端執行工程中的SpringBoot的引導類的main方法執行消息的監聽,生產端執行測試方法testSendMessageByTopics()
控制檯打印接收到的消息:
由於隊列消息的監聽是異步的,因此會出現消息打印交替出現的現象,這裏簡單的SpringBoot整合RabbitMQ的案例就結束了。:)