轉發請標明出處:http://blog.csdn.net/lmj623565791/article/details/37657225 java
本系列教程主要來自於官網入門教程的翻譯,而後本身進行了部分的修改與實驗,內容僅供參考。 服務器
上一篇博客中,咱們實現了工做隊列,而且咱們的工做隊列中的一個任務只會發給一個工做者,除非某個工做者未完成任務意外被殺死,會轉發給另外的工做者,若是你還不瞭解:RabbitMQ (二)工做隊列。這篇博客中,咱們會作一些改變,就是把一個消息發給多個消費者,這種模式稱之爲發佈/訂閱(相似觀察者模式)。 less
爲了驗證這種模式,咱們準備構建一個簡單的日誌系統。這個系統包含兩類程序,一類程序發動日誌,另外一類程序接收和處理日誌。 spa
在咱們的日誌系統中,每個運行的接收者程序都會收到日誌。而後咱們實現,一個接收者將接收到的數據寫到硬盤上,與此同時,另外一個接收者把接收到的消息展示在屏幕上。 .net
本質上來講,就是發佈的日誌消息會轉發給全部的接收者。 翻譯
一、轉發器(Exchanges)
前面的博客中咱們主要的介紹都是發送者發送消息給隊列,接收者從隊列接收消息。下面咱們會引入Exchanges,展現RabbitMQ的完整的消息模型。 日誌
RabbitMQ消息模型的核心理念是生產者永遠不會直接發送任何消息給隊列,通常的狀況生產者甚至不知道消息應該發送到哪些隊列。 code
相反的,生產者只能發送消息給轉發器(Exchange)。轉發器是很是簡單的,一邊接收從生產者發來的消息,另外一邊把消息推送到隊列中。轉發器必須清楚的知道消息如何處理它收到的每一條消息。是否應該追加到一個指定的隊列?是否應該追加到多個隊列?或者是否應該丟棄?這些規則經過轉發器的類型進行定義。 orm
下面列出一些可用的轉發器類型: blog
Direct
Topic
Headers
Fanout
目前咱們關注最後一個fanout,聲明轉發器類型的代碼:
channel.exchangeDeclare("logs","fanout");
fanout類型轉發器特別簡單,把全部它介紹到的消息,廣播到全部它所知道的隊列。不過這正是咱們前述的日誌系統所須要的。
二、匿名轉發器(nameless exchange)
前面說到生產者只能發送消息給轉發器(Exchange),可是咱們前兩篇博客中的例子並無使用到轉發器,咱們仍然能夠發送和接收消息。這是由於咱們使用了一個默認的轉發器,它的標識符爲」」。以前發送消息的代碼:
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
第一個參數爲轉發器的名稱,咱們設置爲」」 : 若是存在routingKey(第二個參數),消息由routingKey決定發送到哪一個隊列。
如今咱們能夠指定消息發送到的轉發器:
channel.basicPublish( "logs","", null, message.getBytes());
三、臨時隊列(Temporary queues)
前面的博客中咱們都爲隊列指定了一個特定的名稱。可以爲隊列命名對咱們來講是很關鍵的,咱們須要指定消費者爲某個隊列。當咱們但願在生產者和消費者間共享隊列時,爲隊列命名是很重要的。
不過,對於咱們的日誌系統咱們並不關心隊列的名稱。咱們想要接收到全部的消息,並且咱們也只對當前正在傳遞的數據的感興趣。爲了知足咱們的需求,須要作兩件事:
第一, 不管什麼時間鏈接到Rabbit咱們都須要一個新的空的隊列。爲了實現,咱們可使用隨機數建立隊列,或者更好的,讓服務器給咱們提供一個隨機的名稱。
第二, 一旦消費者與Rabbit斷開,消費者所接收的那個隊列應該被自動刪除。
Java中咱們可使用queueDeclare()方法,不傳遞任何參數,來建立一個非持久的、惟一的、自動刪除的隊列且隊列名稱由服務器隨機產生。
String queueName = channel.queueDeclare().getQueue();
通常狀況這個名稱與amq.gen-JzTY20BRgKO-HjmUJj0wLg 相似。
四、綁定(Bindings)
咱們已經建立了一個fanout轉發器和隊列,咱們如今須要經過binding告訴轉發器把消息發送給咱們的隊列。
channel.queueBind(queueName, 「logs」, 」」)參數1:隊列名稱 ;參數2:轉發器名稱
五、完整的例子
日誌發送端:
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import java.io.IOException;
- import java.util.Date;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLog
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] args) throws IOException
- {
- // 建立鏈接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明轉發器和類型
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
-
- String message = new Date().toLocaleString()+" : log something";
- // 往轉發器上發送消息
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
-
- System.out.println(" [x] Sent '" + message + "'");
-
- channel.close();
- connection.close();
-
- }
-
- }
沒什麼太大的改變,聲明隊列的代碼,改成聲明轉發器了,一樣的消息的傳遞也交給了轉發器。
接收端1 :ReceiveLogsToSave.java:
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import java.io.File;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.IOException;
- import java.text.SimpleDateFormat;
- import java.util.Date;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsToSave
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 建立鏈接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 建立一個非持久的、惟一的且自動刪除的隊列
- String queueName = channel.queueDeclare().getQueue();
- // 爲轉發器指定隊列,設置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二個參數爲自動應答,無需手動應答
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
-
- print2File(message);
- }
-
- }
-
- private static void print2File(String msg)
- {
- try
- {
- String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
- String logFileName = new SimpleDateFormat("yyyy-MM-dd")
- .format(new Date());
- File file = new File(dir, logFileName+".txt");
- FileOutputStream fos = new FileOutputStream(file, true);
- fos.write((msg + "\r\n").getBytes());
- fos.flush();
- fos.close();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace();
- }
- }
- }
隨機建立一個隊列,而後將隊列與轉發器綁定,而後將消費者與該隊列綁定,而後寫入日誌文件。
接收端2:ReceiveLogsToConsole.java
- package com.zhy.rabbit._03_bindings_exchanges;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
-
- public class ReceiveLogsToConsole
- {
- private final static String EXCHANGE_NAME = "ex_log";
-
- public static void main(String[] argv) throws java.io.IOException,
- java.lang.InterruptedException
- {
- // 建立鏈接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 建立一個非持久的、惟一的且自動刪除的隊列
- String queueName = channel.queueDeclare().getQueue();
- // 爲轉發器指定隊列,設置binding
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- // 指定接收者,第二個參數爲自動應答,無需手動應答
- channel.basicConsume(queueName, true, consumer);
-
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
-
- }
-
- }
-
- }
隨機建立一個隊列,而後將隊列與轉發器綁定,而後將消費者與該隊列綁定,而後打印到控制檯。
如今把兩個接收端運行,而後運行3次發送端:
輸出結果:
發送端:
[x] Sent '2014-7-10 16:04:54 : log something'
[x] Sent '2014-7-10 16:04:58 : log something'
[x] Sent '2014-7-10 16:05:02 : log something'
接收端1:
接收端2:
[*] Waiting for messages. To exit press CTRL+C
[x] Received '2014-7-10 16:04:54 : log something'
[x] Received '2014-7-10 16:04:58 : log something'
[x] Received '2014-7-10 16:05:02 : log something'
這個例子實現了咱們文章開頭所描述的日誌系統,利用了轉發器的類型:fanout。
本篇說明了,生產者將消息發送至轉發器,轉發器決定將消息發送至哪些隊列,消費者綁定隊列獲取消息。