RabbitMq消息隊列

一 Rabbitmq是什麼

  RabbitMq是一個消息隊列,是以一種隊列的結構來存放message,遵循這FIFO的規則。主要能夠用來在不一樣的進程和線程之間進行通訊。服務器

  爲何會產生消息隊列?markdown

不一樣進程(process)之間傳遞消息時,兩個進程之間耦合程度太高,改動一個進程,引起必須修改另外一個進程,爲了隔離這兩個進程,在兩進程間抽離出一層(一個模塊),全部兩進程之間傳遞的消息,都必須經過消息隊列來傳遞,單獨修改某一個進程,不會影響另外一個;app

不一樣進程(process)之間傳遞消息時,爲了實現標準化,將消息的格式規範化了,而且,某一個進程接受的消息太多,一會兒沒法處理完,而且也有前後順序,必須對收到的消息進行排隊,所以誕生了事實上的消息隊列;maven

  RabbitMq官方網站:https://www.rabbitmq.com/#supportide

二 RabbitMq五種消息隊列模式

  RabbitMQ有多種消息隊列模式,主要使用五種消息隊列模式,結構如圖所示:學習

 

  其中有幾個概念須要先介紹一下,Mq中至關與一個消息的生產消費過程,因此主要有消息的生產者,消息隊列,消息的消費者三種組成。測試

  message消息類型能夠是markdown等。 網站

1.簡單隊列

  在簡單的隊列裏,p表明producer生產者,紅色表明消息隊列,c爲consumer消費者。在最簡單的隊列模式中,生產者發送消息到消息隊列中,消費者只要隊列中有消息就取出消費。spa

1.1 maven配置

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>    

1.2 ConnectionUtil鏈接

  RabbitMq的一些配置鏈接,封裝爲一個Util類。.net

 1 public class ConnectionUtil {
 2 
 3     public static Connection getConnection() throws Exception {
 4         //定義鏈接工廠
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //設置服務地址
 7         factory.setHost("localhost");
 8         //端口
 9         factory.setPort(5672);
10         //設置帳號信息,用戶名、密碼、vhost
11         factory.setVirtualHost("testhost");
12         factory.setUsername("admin");
13         factory.setPassword("admin");
14         // 經過工程獲取鏈接
15         Connection connection = factory.newConnection();
16         return connection;
17     }
18 }

1.3 生產者發送消息

  生產者發送消息到指定到隊列中,首先獲取鏈接到mq,而後建立通道。生產發送消息發送完以後就須要斷開與隊列的鏈接。

  其中若是把隊列的持久化設置爲true,則隊列中的消息爲持久化消息,當消費者沒有消費時會一直堆積在隊列中。

 1 public class Send {
 2 
 3     //隊列名
 4     private final static String QUEUE_NAME = "q_test_01";
 5 
 6     public static void main(String[] argv) throws Exception {
 7         // 獲取到鏈接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         // 從鏈接中建立通道
10         Channel channel = connection.createChannel();
11 
12         // 聲明(建立)隊列,第二個參數爲是否持久化隊列
13         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
14 
15         // 消息內容
16         String message = "Hello World!";
17         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
18         System.out.println(" [x] Sent '" + message + "'");
19         //關閉通道和鏈接
20         channel.close();
21         connection.close();
22     }
23 }

1.4 消費者獲取消息

  將獲取消息實現接口,使其在程序運行時被調起並執行。在接收消息的方法中,一樣首先和rabbitMq進行創建鏈接,建立通道而且聲明隊列。程序在運行到channel.basicConsume時會被阻塞,只有當有消息時,纔會執行上一步中的取消息後的相關操做。

 1 public class ReceivingFromRabbitmq implements ApplicationListener<ApplicationReadyEvent> {
 2 @Override
 3     public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
 4         try {
 5             // 獲取到鏈接以及mq通道
 6             Connection connection = connectionUtil.getConnection();
 7             // 從鏈接中建立通道
 8             Channel channel = connection.createChannel();
 9             // 聲明隊列
10             channel.queueDeclare(queueName, queueDurable, false, false, null);
11 
12             DeliverCallback deliverCallback = (consumerTag, delivery) -> {
13                 String message = new String(delivery.getBody(), "UTF-8");
14                 System.out.println(" [x] Received '" + message + "'");
15 
16                 //執行獲取消息後的相關操做
17             };
18         
19             channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
20             });
21         } catch (Exception e) {
22             e.printStackTrace();
23         }
24     }
25 }    

 

2. Work模式 

  在work模式中包括一個生產者和兩個消費者,然而生產者發送的消息只能被一個消費者所獲取消費。

2.1 消費者1

 1 public class Recv {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6 
 7         // 獲取到鏈接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10 
11         // 聲明隊列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13 
14         // 同一時刻服務器只會發一條消息給消費者
15         //channel.basicQos(1);
16 
17         // 定義隊列的消費者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 監聽隊列,false表示手動返回完成狀態,true表示自動
20         channel.basicConsume(QUEUE_NAME, true, consumer);
21 
22         // 獲取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [y] Received '" + message + "'");
27             //休眠
28             Thread.sleep(10);
29             // 返回確認狀態,註釋掉表示使用自動確認模式
30             //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }

2.2 消費者2

 1 public class Recv2 {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6 
 7         // 獲取到鏈接以及mq通道
 8         Connection connection = ConnectionUtil.getConnection();
 9         Channel channel = connection.createChannel();
10 
11         // 聲明隊列
12         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
13 
14         // 同一時刻服務器只會發一條消息給消費者
15         //channel.basicQos(1);
16 
17         // 定義隊列的消費者
18         QueueingConsumer consumer = new QueueingConsumer(channel);
19         // 監聽隊列,false表示手動返回完成狀態,true表示自動
20         channel.basicConsume(QUEUE_NAME, true, consumer);
21 
22         // 獲取消息
23         while (true) {
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
25             String message = new String(delivery.getBody());
26             System.out.println(" [x] Received '" + message + "'");
27             // 休眠1秒
28             Thread.sleep(1000);
29             //下面這行註釋掉表示使用自動確認模式
30             //channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
31         }
32     }
33 }

2.3 生產者

 1 public class Send {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work";
 4 
 5     public static void main(String[] argv) throws Exception {
 6         // 獲取到鏈接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9 
10         // 聲明隊列
11         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
12 
13         for (int i = 0; i < 100; i++) {
14             // 消息內容
15             String message = "" + i;
16             channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
17             System.out.println(" [x] Sent '" + message + "'");
18 
19             Thread.sleep(i * 10);
20         }
21 
22         channel.close();
23         connection.close();
24     }
25 }

  生產者向隊列中發送了一百條消息。

結果:

  兩個消費者接收不一樣的消息,可是接收的數量是相同的。

  雖然設置了每一個接收者的休眠時間不一樣,可是他們接收的消息的數量倒是相同的,由於RabbitMq使用輪詢方式進行信息分發,及默認將消息順序的發給下一個消費者。

  同時在這種模式中,消息分發給消費者後,只有當消費者完成消費並向RabbitMq返回確認消息,Mq纔會對消息進行刪除。若是在執行中消費者死亡,或沒有發揮確認消息,則mq會將該消息即便分發給其餘消費者。

  當Rabbitmq退出或奔潰時,隊列和消息會丟失,因此咱們須要將隊列進行持久化聲明:

boolean durable = true ; 
channel.queueDeclare(「hello」,durable,falsefalsenull);

2.4 work模式實現「能者多勞」

  打開上述代碼的註釋:

// 同一時刻服務器只會發一條消息給消費者
channel.basicQos(1);

//開啓這行 表示使用手動確認模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

//同時改成手動模式
// 監聽隊列,false表示手動返回完成狀態,true表示自動
channel.basicConsume(QUEUE_NAME, false, consumer);

  測試結果,消費者1比消費者2獲取的消息更多。

 

3. 訂閱模式

  一次向多個消費者發送消息。

  

一、1個生產者,多個消費者
二、每個消費者都有本身的一個隊列
三、生產者沒有將消息直接發送到隊列,而是發送到了交換機
四、每一個隊列都要綁定到交換機
五、生產者發送的消息,通過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
注意:一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費

3.1 生產者發送消息(可看做後臺系統)

向交換機X發送消息。可是若是交換機沒綁定隊列時,消息就會丟失。由於交換機沒有存儲消息的能力,消息只能存放在隊列中。

 1 public class Send {
 2 
 3     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 4 
 5     public static void main(String[] argv) throws Exception {
 6         // 獲取到鏈接以及mq通道
 7         Connection connection = ConnectionUtil.getConnection();
 8         Channel channel = connection.createChannel();
 9 
10         // 聲明exchange
11         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
12 
13         // 消息內容
14         String message = "Hello World!";
15         channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
16         System.out.println(" [x] Sent '" + message + "'");
17 
18         channel.close();
19         connection.close();
20     }
21 }

3.2 消費者1(相似前臺系統)

 1 public class Recv {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work1";
 4 
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6 
 7     public static void main(String[] argv) throws Exception {
 8 
 9         // 獲取到鏈接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12 
13         // 聲明隊列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 
16         // 綁定隊列到交換機
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18 
19         // 同一時刻服務器只會發一條消息給消費者
20         channel.basicQos(1);
21 
22         // 定義隊列的消費者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 監聽隊列,手動返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26 
27         // 獲取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" [Recv] Received '" + message + "'");
32             Thread.sleep(10);
33 
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

3.3 消費者2(搜索系統)

 1 public class Recv2 {
 2 
 3     private final static String QUEUE_NAME = "test_queue_work2";
 4 
 5     private final static String EXCHANGE_NAME = "test_exchange_fanout";
 6 
 7     public static void main(String[] argv) throws Exception {
 8 
 9         // 獲取到鏈接以及mq通道
10         Connection connection = ConnectionUtil.getConnection();
11         Channel channel = connection.createChannel();
12 
13         // 聲明隊列
14         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
15 
16         // 綁定隊列到交換機
17         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
18 
19         // 同一時刻服務器只會發一條消息給消費者
20         channel.basicQos(1);
21 
22         // 定義隊列的消費者
23         QueueingConsumer consumer = new QueueingConsumer(channel);
24         // 監聽隊列,手動返回完成
25         channel.basicConsume(QUEUE_NAME, false, consumer);
26 
27         // 獲取消息
28         while (true) {
29             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
30             String message = new String(delivery.getBody());
31             System.out.println(" [Recv2] Received '" + message + "'");
32             Thread.sleep(10);
33 
34             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
35         }
36     }
37 }

測試結果:同一個消息被多個消費者獲取。一個消費者隊列能夠有多個消費者實例,只有其中一個消費者實例會消費到消息。

4. 路由模式

 

——持續更新——

 

學習自大佬:

https://blog.csdn.net/hellozpc/article/details/81436980

相關文章
相關標籤/搜索