RabbitMQ中真的只有四種交換器嗎?

微信公衆號:跟着老萬學java
歡迎關注,瞭解更多編程技巧,一塊兒交流,一塊兒成長。html

若是你們看了我以前的文章,應該都知道,rabbitmq中經常使用的交換器有4中,分別是:direct、fanout、topic、headersjava

那麼rabbitmq中,真的只有4中交換器嗎?git

今天和你們一塊兒研究下。github

一個簡單的方式,經過啓動rabbitmq_management插件,在管理控制檯去嘗試建立exchange。
這裏說明下,我使用的3.8.3版本的rabbitmq。不一樣版本的rabbitmq可能存在一些查詢,有些類型的exchange可能須要安裝對應的插件後才能建立。web

在這裏插入圖片描述

能夠發現多了2種類型的exchange,分別是: x-consistent-hash和x-modulus-hash以及一個參數Alternate exchange.

常規的四種交換器類型,這裏就再也不贅述,有興趣的小夥伴能夠看看我整理的RabbitMQ入門知識整理
算法

x-consistent-hash和x-modulus-hash類型的exchange,是在從 RabbitMQ 3.6.0 版本開始,整合到 RabbitMQ 發佈包中的。sql

x-consistent-hash

git地址:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange
說明:
一致性hash交換器,主要是使用一致性hash算法將消息分發到綁定在交換器上的隊列上。編程

工做原理:
當使用「一致性哈希」做爲交換類型的狀況下,會根據消息屬性(最多見的是路由密鑰 routing key)計算一個hash值,而後根據這個hash值,將消息分發到綁定在該交換器下的隊列中。
所以,若是沒有發生綁定更改,具備相同路由關鍵字的消息將具備相同哈希值將被路由到同一隊列。微信

下面是工做原理圖:
app

在這裏插入圖片描述

權重
當隊列綁 定到Consistent Hash交換時,綁定密鑰 binding key會使用一個數字字符串,表示綁定權重: 這個桶的數量將與目標隊列關聯(範圍的部分)。

簡單的說,就是綁定鍵的數字越大,那麼綁定的隊列的權重就越大,分發消息的時候接受到的消息就越多。

請注意,綁定中的routing_keys是數字字符串。這是由於AMQP 0-9-1指定routing_key字段必須爲字符串。

另外,發佈消息的時候,路由鍵routing_key必定要是隨機的。

綁定鍵binding key決定隊列的權重 路由鍵routing_key決定消息的分發

x-modulus-hash

git地址:https://github.com/rabbitmq/rabbitmq-sharding
x-modulus-hash路由器對應rabbitmq sharding插件,主要是實現自動對隊列進行分片。
也就是說,一旦將一個exchange 定義爲x-modulus-hash,就能夠在每一個集羣節點上自動建立支持隊列,並在它們之間共享消息。rabbitmq sharding向使用者顯示了一個隊列,但可能在後臺運行了它對應的多個隊列。rabbitmq sharding插件爲您提供了一個集中的位置,經過向集羣中的其餘節點添加隊列,您能夠將消息以及跨多個節點的負載平衡發送到該位置。

工做原理圖:

在這裏插入圖片描述

主要特徵:新加節點後,新加自動分片
該插件的主要特性之一是,當將新節點添加到RabbitMQ集羣時,該插件將在新節點上自動建立更多分片。假設節點a上有一個帶有4個隊列的分片,而節點b剛加入集羣。該插件將在節點b上自動建立4個隊列,並將它們「加入」分片分區。已經傳遞的消息將不會從新平衡,可是新到達的消息將被劃分到新隊列中。

兩種路由器的選用:
若是隻須要消息分區,而不須要自動調整分片數量的話,可使用Consistent Hash Exchange;反之,若是須要根據策略或節點數量,動態調整分片數量的話,則選擇x-modulus-hash。

備份交換器Alternate Exchange

生產者在發送消息時不設置mandatory 參數,那麼消息達到路由器後匹配不到相應的隊列後消息將會丟失。
設置了mandatory 參數,那麼須要添加ReturnListener的編程邏輯。
若是既不想複雜化生產者的編程邏輯,又不想消息丟失,那麼可使用備份交換器。

顧名思義 備份交換器就是當第一個交換器未能有效匹配到隊列時,路由到備份交換器,再由備份交換器區匹配隊列

模型圖
從模型圖中能夠看到消息發送到名字爲TestAE的路由器中,可是由於沒有跟隊列匹配,這個時候消息就會被髮送到名字爲exchange-unroute的備份交換器,這個交換器通常會爲fanout型,隨後就會被路由到AE-queue隊列

在這裏插入圖片描述

核心代碼:

//存儲備份交換器的參數map
Map<String, Object> spare = new HashMap<String , Object>(2);
spare.put("alternate-exchange" , MY_SPARE);
//聲明瞭一個direct 類型的交換器,而且添加存儲備份交換器的map參數
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);

完成示例代碼:

public class SendSpare {

    private final static String QUEUE_NAME = "wsd_test";
    private final static String QUEUE_NAME_2 = "wsd_test2";
    private final static String EXCHANGE_NAME = "wsd_exchange";
    private final static String ROUTING_KEY = "wsd_exchange";
    private final static String EXCHANGE_KEY = "wsd_exchange";
    private final static String MY_SPARE = "mySpare";
    private static Connection connection =null;
    private static Channel channel = null;
    public static void main(String[] args) {
        Map<String, Object> spare = new HashMap<String , Object>(2);
        spare.put("alternate-exchange" , MY_SPARE);
        try{
            // 獲取到鏈接以及mq通道
            connection = ConnectionUtil.getConnection();
            // 從鏈接中建立通道
           channel = connection.createChannel();
           //聲明瞭一個direct 類型的交換器
           channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);
           //聲明一個備胎路由器
            channel.exchangeDeclare(MY_SPARE,"fanout",true,false,null);
            // 聲明(建立)隊列
            channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);
            //將路由與隊列綁定,再爲綁定的路徑賦值一個綁定鍵
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);

            // 聲明(建立)隊列
            channel.queueDeclare(QUEUE_NAME_2, falsefalsefalsenull);
            //綁定備胎路由器
            channel.queueBind(QUEUE_NAME_2,MY_SPARE,"");
            //發送數據
            for (int i=0;i<10;i++){
                // 消息內容
                String message = "Hello World!"+i;
                //指定發送消息到哪一個路由,以及他的路由鍵,消息等
                if (i%2==0){
                    channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes());
                }else {
                    //匹配不到隊列
                    channel.basicPublish(EXCHANGE_NAME, "kkkk",null, message.getBytes());
                }
                System.out.println(" [x] Sent '" + message + "'");
                Thread.sleep(200);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            //關閉通道和鏈接
            try {
                channel.close();
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

聯邦交換器 Federation

Federation插件的高級目標是在代理之間傳輸消息而無需集羣。

該插件可讓交換器和隊列組成同盟。一個聯邦的交換或隊列能夠從一個或多個上游(遠程交換和其餘代理上的隊列)接收消息,能夠將上游發佈的消息路由到本地隊列。一個聯邦隊列可讓一個本地消費者從上游隊列接收消息。

rabbitmq-plugins  enable  rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
在這裏插入圖片描述

工做原理:

在這裏插入圖片描述

其餘

經過執行rabbitmq-plugins list命令,查看支持的默認帶有哪些插件

在這裏插入圖片描述

rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange

次通過命令安裝對應插件後:

rabbitmq-plugins  enable  rabbitmq_event_exchange
在這裏插入圖片描述

進一步研究,還會不會有其餘一些插件呢?
去官網的插件網址:
https://www.rabbitmq.com/community-plugins.html

在這裏插入圖片描述

rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange     延遲消息插件
rabbitmq_management_exchange
pgsql_listen_exchange

總結

3.8.3版本的rabbitmq,默認有6種類型的交換器類型,分別是:
direct、fanout、topic、headers,x-consistent-hash和x-modulus-hash,一個參數Alternate exchange來實現備份交換器。
經過插件擴展,還能夠聲明以下交換器:
rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange
rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange     延遲消息插件
rabbitmq_management_exchange
pgsql_listen_exchange

那麼,你還知道哪些其餘的交換器器嗎?歡迎留言交流。

參考:
https://blog.csdn.net/wsdfym/article/details/101800624
https://www.rabbitmq.com/distributed.html

   更多精彩,關注我吧。
圖注:跟着老萬學java



本文分享自微信公衆號 - 跟着老萬學java(douzhe_2019)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索