微信公衆號:跟着老萬學java
歡迎關注,瞭解更多編程技巧,一塊兒交流,一塊兒成長。html
若是你們看了我以前的文章,應該都知道,rabbitmq中經常使用的交換器有4中,分別是:direct、fanout、topic、headers。java
那麼rabbitmq中,真的只有4中交換器嗎?git
今天和你們一塊兒研究下。github
一個簡單的方式,經過啓動rabbitmq_management插件,在管理控制檯去嘗試建立exchange。
這裏說明下,我使用的3.8.3版本的rabbitmq。不一樣版本的rabbitmq可能存在一些查詢,有些類型的exchange可能須要安裝對應的插件後才能建立。web
能夠發現多了2種類型的exchange,分別是: x-consistent-hash和x-modulus-hash以及一個參數Alternate exchange.
常規的四種交換器類型,這裏就再也不贅述,有興趣的小夥伴能夠看看我整理的RabbitMQ入門知識整理
算法
x-consistent-hash和x-modulus-hash類型的exchange,是在從 RabbitMQ 3.6.0 版本開始,整合到 RabbitMQ 發佈包中的。sql
x-consistent-hash
git地址:https://github.com/rabbitmq/rabbitmq-consistent-hash-exchange
說明:
一致性hash交換器,主要是使用一致性hash算法將消息分發到綁定在交換器上的隊列上。編程
工做原理:
當使用「一致性哈希」做爲交換類型的狀況下,會根據消息屬性(最多見的是路由密鑰 routing key)計算一個hash值,而後根據這個hash值,將消息分發到綁定在該交換器下的隊列中。
所以,若是沒有發生綁定更改,具備相同路由關鍵字的消息將具備相同哈希值將被路由到同一隊列。微信
下面是工做原理圖:
app
權重
簡單的說,就是綁定鍵的數字越大,那麼綁定的隊列的權重就越大,分發消息的時候接受到的消息就越多。
請注意,綁定中的routing_keys是數字字符串。這是由於AMQP 0-9-1指定routing_key字段必須爲字符串。
另外,發佈消息的時候,路由鍵routing_key必定要是隨機的。
綁定鍵binding key決定隊列的權重 路由鍵routing_key決定消息的分發
x-modulus-hash
git地址:https://github.com/rabbitmq/rabbitmq-sharding
x-modulus-hash路由器對應rabbitmq sharding插件,主要是實現自動對隊列進行分片。
也就是說,一旦將一個exchange 定義爲x-modulus-hash,就能夠在每一個集羣節點上自動建立支持隊列,並在它們之間共享消息。rabbitmq sharding向使用者顯示了一個隊列,但可能在後臺運行了它對應的多個隊列。rabbitmq sharding插件爲您提供了一個集中的位置,經過向集羣中的其餘節點添加隊列,您能夠將消息以及跨多個節點的負載平衡發送到該位置。
工做原理圖:
主要特徵:新加節點後,新加自動分片
該插件的主要特性之一是,當將新節點添加到RabbitMQ集羣時,該插件將在新節點上自動建立更多分片。假設節點a上有一個帶有4個隊列的分片,而節點b剛加入集羣。該插件將在節點b上自動建立4個隊列,並將它們「加入」分片分區。已經傳遞的消息將不會從新平衡,可是新到達的消息將被劃分到新隊列中。
兩種路由器的選用:
若是隻須要消息分區,而不須要自動調整分片數量的話,可使用Consistent Hash Exchange;反之,若是須要根據策略或節點數量,動態調整分片數量的話,則選擇x-modulus-hash。
備份交換器Alternate Exchange
生產者在發送消息時不設置mandatory 參數,那麼消息達到路由器後匹配不到相應的隊列後消息將會丟失。
設置了mandatory 參數,那麼須要添加ReturnListener的編程邏輯。
若是既不想複雜化生產者的編程邏輯,又不想消息丟失,那麼可使用備份交換器。
顧名思義 備份交換器就是當第一個交換器未能有效匹配到隊列時,路由到備份交換器,再由備份交換器區匹配隊列
模型圖
從模型圖中能夠看到消息發送到名字爲TestAE的路由器中,可是由於沒有跟隊列匹配,這個時候消息就會被髮送到名字爲exchange-unroute的備份交換器,這個交換器通常會爲fanout型,隨後就會被路由到AE-queue隊列
核心代碼:
//存儲備份交換器的參數map
Map<String, Object> spare = new HashMap<String , Object>(2);
spare.put("alternate-exchange" , MY_SPARE);
//聲明瞭一個direct 類型的交換器,而且添加存儲備份交換器的map參數
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);
完成示例代碼:
public class SendSpare {
private final static String QUEUE_NAME = "wsd_test";
private final static String QUEUE_NAME_2 = "wsd_test2";
private final static String EXCHANGE_NAME = "wsd_exchange";
private final static String ROUTING_KEY = "wsd_exchange";
private final static String EXCHANGE_KEY = "wsd_exchange";
private final static String MY_SPARE = "mySpare";
private static Connection connection =null;
private static Channel channel = null;
public static void main(String[] args) {
Map<String, Object> spare = new HashMap<String , Object>(2);
spare.put("alternate-exchange" , MY_SPARE);
try{
// 獲取到鏈接以及mq通道
connection = ConnectionUtil.getConnection();
// 從鏈接中建立通道
channel = connection.createChannel();
//聲明瞭一個direct 類型的交換器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,spare);
//聲明一個備胎路由器
channel.exchangeDeclare(MY_SPARE,"fanout",true,false,null);
// 聲明(建立)隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//將路由與隊列綁定,再爲綁定的路徑賦值一個綁定鍵
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
// 聲明(建立)隊列
channel.queueDeclare(QUEUE_NAME_2, false, false, false, null);
//綁定備胎路由器
channel.queueBind(QUEUE_NAME_2,MY_SPARE,"");
//發送數據
for (int i=0;i<10;i++){
// 消息內容
String message = "Hello World!"+i;
//指定發送消息到哪一個路由,以及他的路由鍵,消息等
if (i%2==0){
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null, message.getBytes());
}else {
//匹配不到隊列
channel.basicPublish(EXCHANGE_NAME, "kkkk",null, message.getBytes());
}
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(200);
}
}catch (Exception e){
e.printStackTrace();
}finally {
//關閉通道和鏈接
try {
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
}
聯邦交換器 Federation
Federation插件的高級目標是在代理之間傳輸消息而無需集羣。
該插件可讓交換器和隊列組成同盟。一個聯邦的交換或隊列能夠從一個或多個上游(遠程交換和其餘代理上的隊列)接收消息,能夠將上游發佈的消息路由到本地隊列。一個聯邦隊列可讓一個本地消費者從上游隊列接收消息。
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management
工做原理:
其餘
經過執行rabbitmq-plugins list命令,查看支持的默認帶有哪些插件
rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange
依次通過命令安裝對應插件後:
rabbitmq-plugins enable rabbitmq_event_exchange
進一步研究,還會不會有其餘一些插件呢?
去官網的插件網址:
https://www.rabbitmq.com/community-plugins.html
rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange 延遲消息插件
rabbitmq_management_exchange
pgsql_listen_exchange
總結
3.8.3版本的rabbitmq,默認有6種類型的交換器類型,分別是:
direct、fanout、topic、headers,x-consistent-hash和x-modulus-hash,一個參數Alternate exchange來實現備份交換器。
經過插件擴展,還能夠聲明以下交換器:
rabbitmq_event_exchange
rabbitmq_jms_topic_exchange
rabbitmq_random_exchange
rabbitmq_recent_history_exchange
rabbitmq_lvc_exchange
rabbitmq_rtopic_exchange
rabbitmq_delayed_message_exchange 延遲消息插件
rabbitmq_management_exchange
pgsql_listen_exchange
那麼,你還知道哪些其餘的交換器器嗎?歡迎留言交流。
參考:
https://blog.csdn.net/wsdfym/article/details/101800624
https://www.rabbitmq.com/distributed.html
本文分享自微信公衆號 - 跟着老萬學java(douzhe_2019)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。