RabbitMQ-從基礎到實戰(3)— 消息的交換(上)

轉載請註明出處:http://www.cnblogs.com/4----/p/6549865.html

0.目錄

RabbitMQ-從基礎到實戰(1)— Hello RabbitMQhtml

RabbitMQ-從基礎到實戰(2)— 防止消息丟失java

RabbitMQ-從基礎到實戰(4)— 消息的交換(中)app

RabbitMQ-從基礎到實戰(5)— 消息的交換(下)ide

RabbitMQ-從基礎到實戰(6)— 與Spring集成函數

1.簡介

在前面的例子中,每一個消息都只對應一個消費者,即便有多個消費者在線,也只會有一個消費者接收並處理一條消息,這是消息中間件的一種經常使用方式。性能

另一種方式,生產者生產一條消息,廣播給一個或多個隊列,全部訂閱了這個隊列的消費者,均可以消費這條消息,這就是消息訂閱。this

官方教程列舉了這樣一個場景,生產者發出一條記錄日誌的消息,消費者1接收到後寫日誌到硬盤,消費者2接收到後打印日誌到屏幕。工做中還有不少這樣的場景有待發掘,適當的使用消息訂閱後能夠成倍的增長效率。idea

2.RabbitMQ的交換中心(Exchange)

在前兩章的例子中,咱們涉及到了三個概念spa

  1. 生產者
  2. 隊列
  3. 消費者

這不由讓咱們覺得,生產者生產消息後直接到發送到隊列,消費者從隊列中獲取消息,再消費掉。debug

其實這是錯誤的,在RabbitMQ中,生產者不會直接把消息發送給隊列,實際上,生產者甚至不知道一條消息會不會被髮送到隊列上。

正確的概念是,生產者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生產者,另外一側則是一個或多個隊列,由Exchange決定一條消息的生命週期--發送給某些隊列,或者直接丟棄掉。

這個概念在官方文檔中被稱做RabbitMQ消息模型的核心思想(core idea)

以下圖,其中X表明的是Exchange。

image

RabbitMQ中,有4種類型的Exchange

  • direct    經過消息的routing key比較queue的key,相等則發給該queue,經常使用於相同應用多實例之間的任務分發
    • 默認類型   自己是一個direct類型的exchange,routing key自動設置爲queue name。注意,direct不等於默認類型,默認類型是在queue沒有指定exchange時的默認處理方式,發消息時,exchange字段也要相應的填成空字符串「」
  • topic    話題,經過可配置的規則分發給綁定在該exchange上的隊列,經過地理位置推送等場景適用
  • headers    當分發規則很複雜,用routing key很差表達時適用,忽略routing key,用header取代之,header能夠爲非字符串,例如Integer或者String
  • fanout    分發給全部綁定到該exchange上的隊列,忽略routing key,適用於MMO遊戲、廣播、羣聊等場景

更詳細的介紹,請看官方文檔

3.臨時隊列

能夠對一個隊列命名是十分重要的,在消費者消費消息時,要指明消費哪一個隊列的消息(下面的queue),這樣就可讓多個消費者同時分享一個隊列

String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

上述記錄日誌的場景中,有如下幾個特色

  • 全部消費者都須要監聽全部的日誌消息,所以每一個消費者都須要一個單獨的隊列,不須要和別人分享
  • 消費者只關心最新的消息,鏈接到RabbitMQ以前的消息不須要關心,所以,每次鏈接時須要建立一個隊列,綁定到相應的exchange上,鏈接斷開後,刪除該隊列

本身聲明隊列是比較麻煩的,所以,RabbitMQ提供了簡便的獲取臨時隊列的方法,該隊列會在鏈接斷開後銷燬

String queueName = channel.queueDeclare().getQueue();

這行代碼會獲取一個名字相似於「amq.gen-JzTY20BRgKO-HjmUJj0wLg」的臨時隊列

4.綁定

再次注意,在RabbitMQ中,消息是發送到Exchange的,不是直接發送的Queue。所以,須要把Queue和Exchange進行綁定,告訴RabbitMQ把指定的Exchange上的消息發送的這個隊列上來

綁定隊列使用此方法

Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

其中,queue是隊列名,exchange是要綁定的交換中心,routingKey就是這個queue的routingKey

5.實踐

下面來實現上述場景,生產者發送日誌消息,消費者1記錄日誌,消費者2打印日誌

下面的代碼中,把鏈接工廠等方法放到了構造函數中,也就是說,每new一個對象,都會建立一個鏈接,在生產環境這樣作是很浪費性能的,每次建立一個connection都會創建一次TCP鏈接,生產環境應使用鏈接池。而Channel又不同,多個Channel是共用一個TCP鏈接的,所以能夠放心的獲取Channel(本結論出自官方文檔對Channel的定義)

AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".

For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.

日誌消息發送類 LogSender

複製代碼
1 import java.io.IOException;
 2 import java.util.concurrent.TimeoutException;
 3 
 4 import org.slf4j.Logger;
 5 import org.slf4j.LoggerFactory;
 6 
 7 import com.rabbitmq.client.Channel;
 8 import com.rabbitmq.client.Connection;
 9 import com.rabbitmq.client.ConnectionFactory;
10 
11 public class LogSender {
12 
13     private Logger logger = LoggerFactory.getLogger(LogSender.class);
14     private  ConnectionFactory factory;
15     private  Connection connection;
16     private  Channel channel;
17     
18     /**
19      * 在構造函數中獲取鏈接
20      */
21     public LogSender(){
22         super();
23         try {
24             factory = new ConnectionFactory();
25             factory.setHost("127.0.0.1");
26             connection = factory.newConnection();
27             channel = connection.createChannel();
28         } catch (Exception e) {
29             logger.error(" [X] INIT ERROR!",e);
30         }
31     }
32     /**
33      * 提供個關閉方法,如今並無什麼卵用
34      * @return
35      */
36     public boolean closeAll(){
37         try {
38             this.channel.close();
39             this.connection.close();
40         } catch (IOException | TimeoutException e) {
41             logger.error(" [X] CLOSE ERROR!",e);
42             return false;
43         }
44         return true;
45     }
46     
47     /**
48      * 咱們更加關心的業務方法
49      * @param message
50      */
51     public void sendMessage(String message) {
52             try {
53                 //聲明一個exchange,命名爲logs,類型爲fanout
54                 channel.exchangeDeclare("logs", "fanout");
55                 //exchange是logs,表示發送到此Exchange上
56                 //fanout類型的exchange,忽略routingKey,因此第二個參數爲空
57                 channel.basicPublish("logs", "", null, message.getBytes());
58                 logger.debug(" [D] message sent:"+message);
59             } catch (IOException e) {
60                 e.printStackTrace();
61             }
62     }
63 }
複製代碼

在LogSender中,和以前的例子不同的地方是,咱們沒有直接聲明一個Queue,取而代之的是聲明瞭一個exchange

發佈消息時,第一個參數填了咱們聲明的exchange名字,routingKey留空,由於fanout類型忽略它。

在前面的例子中,咱們routingKey填的是隊列名,由於默認的exchange(exchange位空字符串時使用默認交換中心)會把隊列的routingKey設置爲queueName(聲明隊列的時候設置的,不是發送消息的時候),又是direct類型,因此能夠經過queueName當作routingKey找到隊列。

消費類 LogConsumer

複製代碼
1 package com.liyang.ticktock.rabbitmq;
 2 
 3 import java.io.IOException;
 4 import java.util.concurrent.TimeoutException;
 5 
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8 
 9 import com.rabbitmq.client.AMQP;
10 import com.rabbitmq.client.Channel;
11 import com.rabbitmq.client.Connection;
12 import com.rabbitmq.client.ConnectionFactory;
13 import com.rabbitmq.client.Consumer;
14 import com.rabbitmq.client.DefaultConsumer;
15 import com.rabbitmq.client.Envelope;
16 
17 public class LogConsumer {
18 
19     private Logger logger = LoggerFactory.getLogger(LogConsumer.class);
20     private ConnectionFactory factory;
21     private Connection connection;
22     private Channel channel;
23 
24     /**
25      * 在構造函數中獲取鏈接
26      */
27     public LogConsumer() {
28         super();
29         try {
30             factory = new ConnectionFactory();
31             factory.setHost("127.0.0.1");
32             connection = factory.newConnection();
33             channel = connection.createChannel();
34             // 聲明exchange,防止生產者沒啓動,exchange不存在
35             channel.exchangeDeclare("logs","fanout");
36         } catch (Exception e) {
37             logger.error(" [X] INIT ERROR!", e);
38         }
39     }
40 
41     /**
42      * 提供個關閉方法,如今並無什麼卵用
43      * 
44      * @return
45      */
46     public boolean closeAll() {
47         try {
48             this.channel.close();
49             this.connection.close();
50         } catch (IOException | TimeoutException e) {
51             logger.error(" [X] CLOSE ERROR!", e);
52             return false;
53         }
54         return true;
55     }
56 
57     /**
58      * 咱們更加關心的業務方法
59      */
60     public void consume() {
61         try {
62             // 獲取一個臨時隊列
63             String queueName = channel.queueDeclare().getQueue();
64             // 把剛剛獲取的隊列綁定到logs這個交換中心上,fanout類型忽略routingKey,因此第三個參數爲空
65             channel.queueBind(queueName, "logs", "");
66             //定義一個Consumer,消費Log消息
67             Consumer consumer = new DefaultConsumer(channel) {
68                 @Override
69                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
70                         byte[] body) throws IOException {
71                     String message = new String(body, "UTF-8");
72                     logger.debug(" [D] 我是來打印日誌的:"+message);
73                 }
74             };
75             //這裏自動確認爲true,接收到消息後該消息就銷燬了
76             channel.basicConsume(queueName, true, consumer);
77         } catch (IOException e) {
78             e.printStackTrace();
79         }
80     }
81 }
複製代碼

複製一個項目,把72行改成以下代碼,表明兩個作不一樣工做的消費者

1 logger.debug(" [D] 我已經把消息寫到硬盤了:"+message);

消費者App

複製代碼
1 public class App 
2 {
3     public static void main( String[] args )
4     {
5         LogConsumer consumer = new LogConsumer();
6         consumer.consume();
7     }
8 }
複製代碼

生產者App

複製代碼
1 public class App {
2     public static void main( String[] args ) throws InterruptedException{
3         LogSender sender = new LogSender();
4         while(true){
5             sender.sendMessage(System.nanoTime()+"");
6             Thread.sleep(1000);
7         }
8     }
9 }
複製代碼

把消費者打包成兩個可執行的jar包,方便觀察控制檯

用java -jar 命令執行,結果以下

6.結束語

本章介紹了RabbitMQ中消息的交換,再次強調,RabbitMQ中,消息是經過交換中心轉發到隊列的,不要被默認的exchange混淆,默認的exchange會自動把queue的名字設置爲它的routingKey,因此消息發佈時,才能經過queueName找到該隊列,其實此時queueName扮演的角色就是routingKey。

本教程是參考官方文檔寫出來的,後續章節會介紹更多RabbitMQ的相關知識以及項目中的實戰技巧

相關文章
相關標籤/搜索