瞭解過RabbitMQ的Fanout模式,應該知道它本來的Fanout模式就是用來作廣播的。可是它的廣播有一點區別,來回顧下它的含義:Fanout類型沒有路由鍵的概念,只要隊列綁定到了改exchange上面,就會接收到全部的消息。java
使用過程通常就是先new 出一個Fanout類型的交換機,而後往這個交換機上綁定多個隊列queue,不一樣的消費者各自監聽不一樣的隊列,這就實現了廣播效果,由於同一個消息,會分發到全部隊列中。nginx
舉個例子:web
應用A監聽了隊列A,應用B監聽了隊列B,Fanout類型交換機同時綁定了隊列A和B.假設生產者端發送了一條消息到Fanout類型交換機,交換機就會把消息分發到全部隊列,這時應用A和應用B會收到同一條消息,這就是廣播。redis
說了上面一大堆,只是爲了強調,對於RabbitMQ的本來Fanout模式,它的設計就是多個消費者必須監聽不一樣的隊列,多個消費者之間纔會造成廣播關係。spring
那麼問題來了,假如在Fanout工做模式下,多個消費者同時監聽的是同一個隊列,會怎樣?實踐過的同窗應該都知道,這種狀況下,這些消費者會造成競爭關係,現象是同一個消息只會被其中一個消費者接收,達不到廣播的效果。。緩存
假如如今有一個需求,要作到對同一個應用的多個節點進行廣播,怎麼實現?服務器
注意,這裏所說的同一個應用多個節點,通俗點理解就是一個war包,布在多個服務器節點上。websocket
在實際部署集羣時,爲了高可用,同一個應用可能會部署多個節點,那假如工程裏已經經過配置定義某個隊列,那多個節點它們定義的隊列就會是相同的,那按照上面的背景,那這些節點間確定就會存在競爭關係,即使是Fanout模式的交換機,一條消息也只能被其中一個節點接收,其餘節點收不到,達不到廣播的效果。那該如何作?負載均衡
相信看到這裏,有人會問,爲什麼會有 對同一個應用的多個節點進行廣播的需求場景?爲何要有這個需求。生產中的業務系統不少,天然而然場景就不少。socket
1.想要同時刷新全部節點的緩存
業務系統離不開緩存,有時會用內存緩存,假如我要刷新全部節點的內存緩存,多個節點前可能有負載均衡例如nginx之類的,我只須要訪問其中一個節點,而後讓這個節點作廣播通知全部其餘節點刷緩存。(廣播刷緩存)
2.websocket會話尋找
websocket是比較受歡迎的實時消息推送方案。用過websocket應該知道,websocket只能與多個節點中的其中一個節點作長鏈接會話保持,也就是說用戶的會話只會存在於一個節點上,假設服務端要主動向用戶推一條消息,必需要知道用戶的會話在哪一個節點上,怎麼得知?能夠經過廣播,經過消息廣播,把消息發到多個節點上,而後節點收到消息只須要判斷用戶會話是否就在本節點上,假如在則主動推消息,不在,則丟棄這條消息。
相似上面這兩種需求,就須要用到廣播,而且是對同一個應用的多個節點進行廣播。固然不用廣播確定也有其餘通知方案,本文咱們只討論用MQ怎麼作到。
假如繼續用RabbitMQ的Fanout模式,怎麼作到對同一個應用的多個節點進行廣播?
要起到廣播效果,關鍵就是讓多個應用節點間不要存在競爭關係或者存在競爭關係時它們的消息怎麼共享?能夠從這兩個方向解決這個問題。
方法可能不少種,在這裏,我只描述兩種比較容易實現的方案。
方案1
過程大體以下
應用啓動,多個節點監聽同一個隊列(此時多個節點是競爭關係,一條消息只會發到其中一個節點上)
消息生產者發送消息,同一條消息只被其中一個節點收到
這種方案是最容易想到的,思路就是依賴其餘組件來作消息共享,例如redis這種能夠替換成其餘方案,只要能作到消息共享就行,那麼最終的效果就確定是廣播效果了。
方案2
過程大體以下
應用啓動,利用監聽器生成惟一ID
生成的惟一ID,經過文件寫入的方式寫到配置文件中
spring啓動,把這個惟一ID加載爲全局屬性(爲什麼要用惟一ID,就是爲了用這個ID做爲該節點的監聽隊列名,固然前綴能夠用相同的,後綴用惟一ID區分便可,舉個例子就是:節點1監聽隊列 kunghsu-123 節點2監聽隊列 kunghsu-456.必須保證它們的惟一ID是惟一的,否則仍是會存在競爭關係)
多個節點監聽了多個隊列(讓每一個隊列名都不一樣,目的就是讓他們不存在競爭關係,沒有競爭關係就不用作消息共享,只管由MQ分發便可,這時同一條消息就會發到多個節點上)
到MQ控制檯,將全部節點生成的隊列手動綁定到指定的Fanout交換機上(這一步是手動的,固然也能夠經過API作到,下面會說到)
生產者發送消息指定的Fanout交換機,交換機將同一條消息被分發到多個節點上
這種方案,也比較容易。這樣作,就是爲了讓多個節點間是廣播關係。總的來講不麻煩,其中第五步手動操做其實有點挫,這種手動操做步驟實際上是應該轉成自動化,讓應用程序來完成,方便之後自動化建設。
這種方案的spring配置也比較簡單,參考Fanout模式的配置便可。本文重點在這個思路的實現過程。
只列舉部分代碼以下:
消息生產者
<!-- 只申明交換機,不定義隊列 --> <rabbit:fanout-exchange name="exchangeFour" durable="true" auto-delete="false" > </rabbit:fanout-exchange> <!--定義rabbit template用於數據的接收和發送 --> <rabbit:template id="amqpTemplate4" connection-factory="connectionFactory2" exchange="exchangeFour" />
消息消費者
<rabbit:queue name="${queue-name-fanout}" durable="true" auto-delete="false" exclusive="false" declared-by="connectAdmin2" /> <bean id="fanoutTwoConsumer" class="com.lunch.foo.rabbitmq.FanoutTwoConsumer"></bean> <rabbit:listener-container connection-factory="connectionFactory2"> <rabbit:listener queues="${queue-name-fanout}" ref="fanoutOneConsumer" /> </rabbit:listener-container>
另外,RabbitMQ的客戶端API支持讓咱們 將隊列綁定到指定的交換機上。具體可參考個人工具類代碼。
代碼以下:
package com.lunch.foo.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * Created by xuyaokun On 2019/3/10 2:26 * @desc: */ public class RabbitMQUtil { private static final String HOST = "192.168.3.128"; private static final int PORT = AMQP.PROTOCOL.PORT; private static final String USERNAME = "kunghsu"; private static final String PASSWORD = "123456"; private static final String VIRTUALHOST = "/"; public static void main(String[] args) { String QUEUE_NAME = "queueOneX"; String EXCHANGE_NAME = "exchangeFour"; try { queueBind(EXCHANGE_NAME, QUEUE_NAME); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } /** * 獲取會話連接 * * @return * @throws IOException * @throws TimeoutException */ private static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); factory.setVirtualHost(VIRTUALHOST); return factory.newConnection(); } /** * 綁定隊列到指定交換機 * * @param exchangeName * @param queueName * @throws IOException * @throws TimeoutException */ public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException { Channel channel = null; try{ channel = getConnection().createChannel(); } catch(Exception e){ System.out.println("獲取RabbitMQ會話鏈接失敗!取消作隊列綁定。"); return ; } //默認持久化 channel.queueDeclare(queueName, true, false, false, null); // 聲明交換機:指定交換機的名稱和類型(廣播:fanout) channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true); // 在消費者端隊列綁定 channel.queueBind(queueName, exchangeName, ""); channel.close(); } }
RabbitMQ的Fanout模式相關的文章,網上一抓一大把,可是幾乎沒有人講到 如何實現 對同一個應用的多個節點進行廣播。。但願經過這篇文章,能幫助到有須要的同窗。另外,假如你們有更好的方案,歡迎交流。感謝閱讀!
歡迎你們關注個人公衆號【風平浪靜如碼】,海量Java相關文章,學習資料都會在裏面更新,整理的資料也會放在裏面。
以爲寫的還不錯的就點個贊,加個關注唄!點關注,不迷路,持續更新!!!