在前面的兩篇博客中html
遇到的實例都是一個消息只發送給一個消費者(工做者),他們的消息模型分別爲(P表明生產者,C表明消費者,紅色表明隊列):ide
此次咱們來看下將一個消息發送給多個消費者(工做者),這種模式通常被稱爲「發佈/訂閱」模式。其工做模型爲(P表明生產者,X表明Exchange(路由器/交換機),C表明消費者,紅色表明隊列):ui
咱們發現,工做模型中首次出現路由器,而且每一個消費者有單獨的隊列。生產者生成消息後將其發送給路由器,而後路由器轉送到隊列,消費者各自到本身的隊列裏面獲取消息進行消費。在實際的應用場景中,生產者通常不會直接將消息發送給隊列,而是發送給路由器進行中轉,Exchange必須清楚的知道怎麼處理收到的消息:是將消息發送到一個特定隊列仍是多有隊列,或者直接廢棄消息。這種才符合RabbitMQ消息模型的核心思想。this
接下來咱們詳細展開今天的話題:spa
1、Exchange3d
Exchange在咱們的工做模型中首次出現,所以須要詳細介紹下。日誌
Exchange分爲4種類型:code
Direct:徹底根據key進行投遞的,例如,綁定時設置了routing key爲」abc」,那麼客戶端提交的消息,只有設置了key爲」abc」的纔會投遞到隊列。 Topic:對key進行模式匹配後進行投遞,符號」#」匹配一個或多個詞,符號」*」匹配正好一個詞。例如」abc.#」匹配」abc.def.ghi」,」abc.*」只匹配」abc.def」。 Fanout:不須要key,它採起廣播模式,一個消息進來時,投遞到與該交換機綁定的全部隊列。 Headers:咱們能夠不考慮它。
今天咱們的實例採用fanout類型的exchange。htm
儘管首次出現,可是其實咱們前面的案例中也有用到exchange,只是咱們沒有給他名字,用的是RabbitMQ默認的,好比下面這段代碼,咱們將路由器名這個參數傳入了「」,若是咱們須要本身聲明exchange的話,這個就不能傳入「」了,而是傳入本身定義好的值。blog
2、臨時隊列
前面兩篇博客中,咱們都在使用隊列的時候給出了定義好的名字,這在生產者和消費者共用相同隊列的時候頗有必要,可是咱們有了exchange,生產者不須要知道有哪些隊列,所以隊列名字能夠不用指定了,而是經過RabbitMQ 接口本身去生成臨時隊列,隊列名字也由RabbitMQ自動生成。經過
能夠聲明一個非持久的、通道獨佔的、自動刪除的隊列,getQueue()方法能夠獲取隨機隊列名字。這個名字用來在隊列和exchange之間創建binding關係的時候使用:
3、代碼實現
基於上面exchange和臨時隊列的知識鋪墊,能夠展開今天的代碼實現了。
public class Product { //exchange名字 public static String EXCHANGE_NAME = "exchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明exchange和exchange的類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = " hello rabbitmq, this is publish/subscribe mode"; // 3.發送消息到指定的exchange,隊列指定爲空,由exchange根據狀況判斷須要發送到哪些隊列 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉鏈接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Consumer1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立鏈接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.爲通道聲明exchange以及exchange類型 channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 3.建立隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.創建exchange和隊列的綁定關係 channel.queueBind(queueName, Product.EXCHANGE_NAME, ""); System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them"); // 5.經過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容而後處理 String msg = new String(body, "UTF-8"); System.out.println("*********** Consumer1" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生產者: product send a msg: hello rabbitmq, this is publish/subscribe mode 消費者1: **** Consumer1 keep alive ,waiting for messages, and then deal them *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode] 消費者2: **** Consumer2 keep alive ,waiting for messages, and then deal them *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]
能夠看到,當生產者發出消息後,兩個消費者最終都收到了消息。
在Exchanges 標籤頁裏面多了一個名爲「exchange」的路由器,他的類型是fanout。點exchange 的link進入詳細頁面:
發如今binding項目中有了兩條綁定關係,隊列的名字也能夠看到。將頁面切換到Queues標籤頁:
出現了兩個新的隊列,隊列名字和綁定關係中的同樣,而且隊列都是自動刪除的、通道獨佔的。