RabbitMq是一個消息隊列,是以一種隊列的結構來存放message,遵循這FIFO的規則。主要能夠用來在不一樣的進程和線程之間進行通訊。服務器
爲何會產生消息隊列?markdown
不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;app
不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;maven
RabbitMq官方網站:https://www.rabbitmq.com/#supportide
RabbitMQ有多種消息隊列模式,主要使用五種消息隊列模式,結構如圖所示:學習
其中有幾個概念須要先介紹一下,Mq中至關與一個消息的生產消費過程,因此主要有消息的生產者,消息隊列,消息的消費者三種組成。測試
message消息類型能夠是markdown等。 網站
在簡單的隊列裏,p表明producer生產者,紅色表明消息隊列,c爲consumer消費者。在最簡單的隊列模式中,生產者發送消息到消息隊列中,消費者只要隊列中有消息就取出消費。spa
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.6.0</version> </dependency>
RabbitMq的一些配置鏈接,封裝爲一個Util類。.net
1 public class ConnectionUtil { 2 3 public static Connection getConnection() throws Exception { 4 //定義鏈接工廠 5 ConnectionFactory factory = new ConnectionFactory(); 6 //設置服務地址 7 factory.setHost("localhost"); 8 //端口 9 factory.setPort(5672); 10 //設置帳號信息,用戶名、密碼、vhost 11 factory.setVirtualHost("testhost"); 12 factory.setUsername("admin"); 13 factory.setPassword("admin"); 14 // 經過工程獲取鏈接 15 Connection connection = factory.newConnection(); 16 return connection; 17 } 18 }
生產者發送消息到指定到隊列中,首先獲取鏈接到mq,而後建立通道。生產發送消息發送完以後就須要斷開與隊列的鏈接。
其中若是把隊列的持久化設置爲true,則隊列中的消息爲持久化消息,當消費者沒有消費時會一直堆積在隊列中。
1 public class Send { 2 3 //隊列名 4 private final static String QUEUE_NAME = "q_test_01"; 5 6 public static void main(String[] argv) throws Exception { 7 // 獲取到鏈接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 // 從鏈接中建立通道 10 Channel channel = connection.createChannel(); 11 12 // 聲明(建立)隊列,第二個參數爲是否持久化隊列 13 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 14 15 // 消息內容 16 String message = "Hello World!"; 17 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 18 System.out.println(" [x] Sent '" + message + "'"); 19 //關閉通道和鏈接 20 channel.close(); 21 connection.close(); 22 } 23 }
將獲取消息實現接口,使其在程序運行時被調起並執行。在接收消息的方法中,一樣首先和rabbitMq進行創建鏈接,建立通道而且聲明隊列。程序在運行到channel.basicConsume時會被阻塞,只有當有消息時,纔會執行上一步中的取消息後的相關操做。
1 public class ReceivingFromRabbitmq implements ApplicationListener<ApplicationReadyEvent> { 2 @Override 3 public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { 4 try { 5 // 獲取到鏈接以及mq通道 6 Connection connection = connectionUtil.getConnection(); 7 // 從鏈接中建立通道 8 Channel channel = connection.createChannel(); 9 // 聲明隊列 10 channel.queueDeclare(queueName, queueDurable, false, false, null); 11 12 DeliverCallback deliverCallback = (consumerTag, delivery) -> { 13 String message = new String(delivery.getBody(), "UTF-8"); 14 System.out.println(" [x] Received '" + message + "'"); 15 16 //執行獲取消息後的相關操做 17 }; 18 19 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { 20 }); 21 } catch (Exception e) { 22 e.printStackTrace(); 23 } 24 } 25 }
在work模式中包括一個生產者和兩個消費者,然而生產者發送的消息只能被一個消費者所獲取消費。
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到鏈接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者 15 //channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,false表示手動返回完成狀態,true表示自動 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [y] Received '" + message + "'"); 27 //休眠 28 Thread.sleep(10); 29 // 返回確認狀態,註釋掉表示使用自動確認模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 }
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到鏈接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者 15 //channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,false表示手動返回完成狀態,true表示自動 20 channel.basicConsume(QUEUE_NAME, true, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 //下面這行註釋掉表示使用自動確認模式 30 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到鏈接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明隊列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 for (int i = 0; i < 100; i++) { 14 // 消息內容 15 String message = "" + i; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 Thread.sleep(i * 10); 20 } 21 22 channel.close(); 23 connection.close(); 24 } 25 }
生產者向隊列中發送了一百條消息。
結果:
兩個消費者接收不一樣的消息,可是接收的數量是相同的。
雖然設置了每一個接收者的休眠時間不一樣,可是他們接收的消息的數量倒是相同的,由於RabbitMq使用輪詢方式進行信息分發,及默認將消息順序的發給下一個消費者。
同時在這種模式中,消息分發給消費者後,只有當消費者完成消費並向RabbitMq返回確認消息,Mq纔會對消息進行刪除。若是在執行中消費者死亡,或沒有發揮確認消息,則mq會將該消息即便分發給其餘消費者。
當Rabbitmq退出或奔潰時,隊列和消息會丟失,因此咱們須要將隊列進行持久化聲明:
boolean durable = true ; channel.queueDeclare(「hello」,durable,false,false,null);
打開上述代碼的註釋:
// 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); //開啓這行 表示使用手動確認模式 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //同時改成手動模式 // 監聽隊列,false表示手動返回完成狀態,true表示自動 channel.basicConsume(QUEUE_NAME, false, consumer);
測試結果,消費者1比消費者2獲取的消息更多。
一次向多個消費者發送消息。
一、1個生產者,多個消費者
二、每個消費者都有本身的一個隊列
三、生產者沒有將消息直接發送到隊列,而是發送到了交換機
四、每一個隊列都要綁定到交換機
五、生產者發送的消息,通過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
注意:一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費
向交換機X發送消息。可是若是交換機沒綁定隊列時,消息就會丟失。由於交換機沒有存儲消息的能力,消息只能存放在隊列中。
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到鏈接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息內容 14 String message = "Hello World!"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" [x] Sent '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到鏈接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一時刻服務器只會發一條消息給消費者 20 channel.basicQos(1); 21 22 // 定義隊列的消費者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 監聽隊列,手動返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 獲取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到鏈接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一時刻服務器只會發一條消息給消費者 20 channel.basicQos(1); 21 22 // 定義隊列的消費者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 監聽隊列,手動返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 獲取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" [Recv2] Received '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
測試結果:同一個消息被多個消費者獲取。一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費到消息。
——持續更新——
學習自大佬: