In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".html
在上一個教程中,咱們建立了一個工做隊列。 工做隊列的意圖是每一個任務都交付給一個工做人員。 在這部分中,咱們會作一些徹底不一樣的事情 - 咱們將交付同一個消息給全部的Consumer。 這種模式被稱爲「發佈/訂閱」。java
To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.git
咱們要創建一個簡單的日誌記錄系統來講明這個模式。 日誌記錄系統它分爲兩個模塊,第一個將發出日誌消息,第二個將接收並打印它們github
In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.bash
在咱們的記錄系統中,每一個receiver都會收到同一個消息。 這樣咱們就能夠運行一個receiver用於將receive的log message寫入磁盤; 同時咱們能夠運行另外一個receiver將log message輸出到屏幕上服務器
Essentially, published log messages are going to be broadcast to all the receivers.app
本質上說,訂閱模式會將已發佈的日誌消息將被廣播到全部接收者。less
In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.dom
在以前的教程中,咱們是以隊列做爲中介來實現消息的傳遞。 如今,咱們將在Rabbit完整的消息傳遞模式。ide
Let's quickly Go over what we covered in the previous tutorials:
讓咱們快速回顧一下咱們在之前的教程中介紹的內容:
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.
RabbitMQ中的消息傳遞模型的核心思想是,生產者不會把消息直接發送到隊列,而是發送到交換機上,消息傳送到隊列的過程有交換機完成,這部分生產者是不知道的。 實際上,生產者一般甚至不知道是否將消息傳遞到任何隊列。
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
相反,producer只能將message發送到exchange。exchange的功能也很簡單。 一方面,它收到來自生產者的消息,另外一方將它們推送到隊列。 exchange必然知道接收到的消息如何處理。 應該把它追加到特定隊列上? 仍是追加到多個隊列上? 或者丟棄它。 這個rule是經過exchange的類型定義。
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:
exchange的類型爲:direct,topic,headers和fanout。
這裏咱們將重點關注最後一個 - fanout。 以下咱們建立一個fanout類型的exchange,並將其命名爲logs:
channel.exchangeDeclare("logs", "fanout");//create a fanout exchange,and named logs;
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.
fanout exchange的工做模式很是簡單。 它只是將全部收到的消息廣播給它知道的全部的隊列。 這正是咱們須要的logger
Listing exchangesTo list the exchanges on the server you can run the ever useful rabbitmqctl:sudo rabbitmqctl list_exchanges In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.Nameless exchangeIn previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").Recall how we published a message before:channel.basicPublish("", "hello", null, message.getBytes()); The first parameter is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.
能夠在rabbitmqctl中使用sudo rabbitmqctl list_exchanges命令,它會列舉出全部的exchanges。
在這個列表中有許多名爲amq.*的exchange和還有一個默認的exchange (AMQP default) 。 這些是默認建立的
Listing exchanges ...
amq.headers headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.match headers
amq.fanout fanout
amq.direct direct
amq.topic topic
(default exchange) direct
可是如今不太可能須要使用它們。咱們在以前的教程裏發送消息沒有使用過這些default (unnamed) exchange,但他們仍在咱們將消息發送到隊列過程當中起到了做用。以前咱們經過空字符串("")來標識exchange,就表明使用default (unnamed) exchange。
//在以前的教程中,咱們是這樣發佈消息的
channel.basicPublish(「」,「hello」,null,message.getBytes());
//第一個參數是exchange的名稱。 空字符串表示default (unnamed) exchange//第二個參數是routing key。消息將經過指定的exchange的指定routing key傳遞到綁定的隊列(若是routing key存在)。由於咱們使用的是默認的exchange,因此routing key就等於隊列名字
默認路由,官方的說明
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to
, or unbind from the default exchange. It also cannot be deleted.
defalut exchange會隱式的綁定到全部隊列上每一個隊列上,routing key等於隊列名,任何隊列都不可以明確的指明綁定到default exchange,也不能從default exchange上解除綁定。default也不可以被刪除
Now, we can publish to our named exchange instead:
如今,咱們經過命名的exchange來發布消息了
channel.basicPublish( "logs", "", null, message.getBytes());
As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.
截至如今,咱們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer以前共享queue成爲可能。可見對隊列命名是十分重要的
But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.
可是對於咱們將要構建的日誌系統,並不須要有名字的queue。咱們但願每一個consumer都能receive到全部的日誌message,而不只僅是它們中間的一部分。 咱們也只對當前流行的消息不感興趣。 要解決咱們須要兩件事情。
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.
首先,當consumer connect to Rabbit,須要一個新的隊列。 爲此,咱們能夠建立一個具備隨機名稱的隊列,或者甚至是更好的 - 這裏讓服務器爲咱們選擇一個隨機隊列名稱。
Secondly, once we disconnect the consumer the queue should be automatically deleted.
其次,一旦consumer斷開鏈接,隊列應該被自動刪除。
In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:
在Java客戶端中,當咱們沒有爲queueDeclare()提供參數時,意味着咱們建立了一個具備隨機名稱的非持久,排他,自動刪除的隊列:
String queueName = channel.queueDeclare().getQueue();
At that point queueName contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.
此時,queueName包含一個隨機隊列名稱。 例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
咱們已經建立了一個fanout exchange和queue。 如今咱們須要告訴exchange發送消息給隊列。 exchange和隊列之間的關係稱爲綁定。
channel.queueBind(queueName, "logs", "");
From now on the logs exchange will append messages to our queue.
從如今開始,log exchange將追加消息到綁定的隊列中。
Listing bindingsYou can list existing bindings using, you guessed it,rabbitmqctl list_bindings
列出綁定您可使用,您猜想它,rabbitmqctl列表綁定列出現有綁定
The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logsexchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for EmitLog.Java program:
發送log的producer與以前的教程說起的並無太大的區別。 最重要的改變是咱們如今發佈消息的是到logs exchange而不是default exchange。 發送時須要提供一個routingKey,可是對於faount類型的exchange來講,routing key的值是被忽略的,由於fanout是要廣播全部從producer接受到的消息給全部綁定的隊列。 如下是EmitLog.Java程序的代碼:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.
如你所見,創建鏈接後,咱們申明瞭一個fanout類型的exchange。 這個步驟是必須的,由於publish 消息到不存在的exchange是禁止的
The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.
若是沒有任何隊列綁定到交換機,消息將丟失。
The code for ReceiveLogs.java:
import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
Compile as before and we're done.
javac -cp $CP EmitLog.java ReceiveLogs.java
If you want to save logs to a file, just open a console and type:
java -cp $CP ReceiveLogs > logs_from_rabbit.log
If you wish to see the logs on your screen, spawn a new terminal and run:
java -cp $CP ReceiveLogs
And of course, to emit logs type:
java -cp $CP EmitLog
Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.
To find out how to listen for a subset of messages, let's move on to tutorial 4