前面咱們把每一個Message都是deliver到某個單一的Consumer。今天咱們將瞭解如何把同一個Message deliver到多個Consumer中。這個模式也被稱做 "publish / subscribe"。
首先咱們將建立一個日誌系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並打印(Consumer)。 咱們將構建兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。java
關於exchange的概念在在這裏作一下簡單的介紹。web
RabbitMQ 的Messaging Model就是Producer並不會直接發送Message到queue。實際上,Producer並不知道它發送的Message是否已經到達queue。less
RabbitMQ消息模型的核心理念是生產者永遠不會直接發送給任何的消息隊列,通常狀況下Producer是不知道消息應該發送到那個隊列的。Producer發送的Message其實是發到了Exchange中。它的功能也很簡單:從Producer接收Message,而後投遞到queue中。Exchange須要知道如何處理Message,是把它放到某個queue中,仍是放到多個queue中?這個rule是經過Exchange 的類型定義的。
spa
咱們知道有三種類型的Exchange:direct, topic ,Headers和fanout。fanout就是廣播模式,會將全部的Message都放到它所知道的queue中。建立一個名字爲logs,類型爲fanout的Exchange:日誌
channel.exchange_declare(exchange='logs',type='fanout');orm
fanout類型轉發器特別簡單,吧全部他接受到的消息,廣播多有的他知道的隊列。rabbitmq
前面說到的生產者只能發送詳細給轉發器(Exchange),可是咱們以前的例子中並無使用到轉發器啊,咱們仍然能夠發送和接收消息,這是爲何呢?是匿名轉發器(nameless exchange)搞的鬼。由於咱們使用了一個默認轉發器。他的標識爲" ".隊列
channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());get
第一個參數爲轉發器名,第二個爲消息隊列名,若是不爲空由其決定發送到那個隊列中。消息隊列
如今咱們能夠指定消息發送到轉發器中。
channel.basicPublish( "logs","", null, message.getBytes());
Listing exchanges
經過rabbitmqctl能夠列出當前全部的Exchange:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默認建立的。
如今咱們能夠經過exchange,而不是routing_key來publish Message了:
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
截至如今,咱們用的queue都是有名字的,可以爲隊列命名對咱們來講很關鍵。使用有名字的queue,使得在Producer和Consumer以前共享queue成爲可能。
可是對於咱們將要構建的日誌系統,並不須要有名字的queue。咱們但願獲得全部的log,而不是它們中間的一部分。並且咱們只對當前的log感興趣。爲了實現這個目標,咱們須要兩件事情:
1) 每當Consumer鏈接時,咱們須要一個新的,空的queue。由於咱們不對老的log感興趣。幸運的是,若是在聲明queue時不指定名字,那麼RabbitMQ會隨機爲咱們選擇這個名字。
2)當Consumer關閉鏈接時,這個queue要被deleted。
String queueName = channel.queueDeclare().getQueue();
經過result.method.queue 能夠取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj1wLg。
如今咱們已經建立了fanout類型的exchange和沒有名字的queue(其實是RabbitMQ幫咱們取了名字)。那exchange怎麼樣知道它的Message發送到哪一個queue呢?答案就是經過bindings:綁定。
channel.queueBind(queueName, 「logs」, 」」)參數1:隊列名稱 ;參數2:轉發器名稱
如今logs的exchange就將它的Message附加到咱們建立的queue了。Listing bindings
使用命令rabbitmqctl list_bindings。
咱們最終實現的數據流圖以下:
package com.zhy.rabbit._03_bindings_exchanges;
import java.io.IOException;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] args) throws IOException
{
// 建立鏈接和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 聲明轉發器和類型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
String message = new Date().toLocaleString()+" : log something";
// 往轉發器上發送消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
接收端:
package com.zhy.rabbit._03_bindings_exchanges;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsToSave
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 建立鏈接和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一個非持久的、惟一的且自動刪除的隊列
String queueName = channel.queueDeclare().getQueue();
// 爲轉發器指定隊列,設置binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二個參數爲自動應答,無需手動應答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
print2File(message);
}
}
private static void print2File(String msg)
{
try
{
String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
String logFileName = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
File file = new File(dir, logFileName+".txt");
FileOutputStream fos = new FileOutputStream(file, true);
fos.write((msg + "\r\n").getBytes());
fos.flush();
fos.close();
} catch (FileNotFoundException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
}
}
}
接收端:
package com.zhy.rabbit._03_bindings_exchanges;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class ReceiveLogsToConsole
{
private final static String EXCHANGE_NAME = "ex_log";
public static void main(String[] argv) throws java.io.IOException,
java.lang.InterruptedException
{
// 建立鏈接和頻道
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 建立一個非持久的、惟一的且自動刪除的隊列
String queueName = channel.queueDeclare().getQueue();
// 爲轉發器指定隊列,設置binding
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// 指定接收者,第二個參數爲自動應答,無需手動應答
channel.basicConsume(queueName, true, consumer);
while (true)
{
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}