###發佈和訂閱 ###(使用java 客戶端)java
在先前的指南中,咱們建立了一個工做隊列。這工做隊列後面的假想是每個任務都被準確的傳遞給工做者。在這部分咱們將會作一些徹底不一樣的事情--咱們將一個消息傳遞給多個消費者。這部分被認知爲「發佈和訂閱」。python
爲了說明這個部分,咱們會創建一個簡單德日誌系統,它是由兩個程序組成--第一個發出日誌消息,第二個接收和打印它們。git
在咱們的日誌系統中,每個運行的接收者拷貝程序將會得到信息。經過這個方式咱們能夠運行一個接收者,直接的把日誌記錄到硬盤中;在同一時間咱們能夠運行另外一個接收者,在屏幕上看這些日誌。 本質上,發佈日誌消息等同於廣播到全部接收者。github
###交換 在先前指南部分,咱們將消息發送到隊列裏,並從隊列中接收消息。如今是時候介紹RabbitMQ中全消息模型。 讓咱們快速溫習下在先前指南中咱們掌握的:shell
一個發送消息的生產者是一個用戶程序。 一個存儲消息的隊列是一個緩衝。 一個接收消息的消費者是一個用戶程序。 在RabbitMQ消息模型中核心的思想是生產者從不直接將消息發送給隊列。實際上,生產者經常甚至不知道是否一個消息會被傳遞到隊列中。安全
相反,生產者僅能將消息發送到一個交換機。一個交換機是一個很是簡單的事物。在它的一遍,它從生產者那裏接收消息,另外一邊將消息推送到隊列中。這個交換所必須清楚的知道它所接收到的消息要如何處理。是否將它附加到一個特別的隊列中?是否將它附加到多個隊列中?或者是否它應該被丟棄。規則的定義是由交換類型決定的。 有幾個交換類型:
direct
,topic
,deaders
,fanout
。咱們來關注最後一個--fanout
。讓咱們建立一個這種類型的交換機而且稱呼它爲logs
:服務器
channel.exchangeDeclare("logs", "fanout");
這fanout
交換機是很是簡單的。經過這個名字你可能已經猜出它的用處了,它會將接收的全部消息都廣播到全部它所知道的全部隊列。這個真正是咱們的記錄器所須要的。日誌
交換機列表 爲了列出服務器中全部交換機,你能夠運行着有用的
rabbitmqctl
:code
$ sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
在這個列表裏有一些以
amq.
打頭的交換機和默認(未命名)的交換機。這些是默認建立的,可是不太可能你會在某個時刻使用它們。 匿名交換機 在先前的指南中咱們對交換機毫無瞭解,可是咱們依舊能將消息發送到隊列中。那是可能實現的,由於咱們使用的是默認交換機,經過咱們使用空字符串("")標識它。 回想一下咱們之前是如何發送消息的:three
channel.basicPublish("", "hello", null, message.getBytes());
這第一個參數是交換機的名字。空字符串說明它是默認的或者匿名的交換機:路由關鍵字存在的話,消息經過路由關鍵字的名字路由到特定的隊列上。
如今,咱們能夠發佈咱們本身命名的交換機:
channel.basicPublish( "logs", "", null, message.getBytes());
###臨時隊列 你可能會想起先前咱們使用的隊列是有特定的名字的(是否記得hello
和task_queue
)。命名一個隊列對咱們來講是相當重要的--咱們須要指定工做者到這相同的隊列上。當你想把隊列分享給生產者和消費者,給隊列名是重要的。 可是那不是咱們記錄器的實例。咱們想監聽全部日誌消息,不單單是它們中的子集。咱們一樣是對當前的消息流感興趣,而不是舊的。爲了解決這個咱們須要兩件事。 首先,不管咱們何時鏈接RabbitMQ,咱們須要一個新的,空的隊列。爲了作到這些,咱們能夠建立一個隨機名字的隊列或者更勝一籌-讓服務器爲咱們選擇一個隨機的名字。 第二部,一旦咱們將消費者的鏈接斷開,隊列應該自動刪除。 在Java客戶端裏,當咱們使用無參數調用queueDeclare()
方法,咱們建立一個自動產生的名字,不持久化,獨佔的,自動刪除的隊列。
String queueName = channel.queueDeclare().getQueue();
在這點,隊列名中包含一個隨機隊列名。例如名字像amq.gen-JzTY20BRgKO-HjmUJj0wLg
。 ###綁定
咱們已經建立了一個fanout
交換機和隊列。如今咱們須要告訴交換機發送消息給咱們的隊列上。這交換機和隊列之間的關係稱之爲一個綁定。
channel.queueBind(queueName, "logs", "");
從如今開始,日誌交換所將要附加消息到咱們的隊列中。
綁定列表 你能夠列出存在的綁定使用,使用
rabbitmqctl list_bindings
。
###把全部放在一塊兒 這發送日誌消息的生產者程序,跟之前指南中的程序沒有多少不一樣。這最重要的改變是咱們將匿名的交換機替換爲咱們想要消息發佈到的日誌交換機。當發送是咱們須要申請一個路由關鍵字,可是在廣播消息是它的值會被忽略。這是
EmitLog.java
程序的代碼:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
(EmitLog.java source) 如你所知,創建鏈接後咱們聲明一個交換機。這個步驟是必須的,由於發佈到一個不存在的交換機是禁止的。
若是隊列尚未綁定到交換機上,消息將會丟失,可是這個對咱們來講是ok的;若是沒有消費者正在監聽,咱們能夠安全的丟棄消息。 ReceiveLogs.java
代碼:
import java.io.IOException; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
(ReceiveLogs.java source) 如之前那樣編譯,咱們已經作了。
$ javac -cp rabbitmq-client.jar EmitLog.java ReceiveLogs.java
若是你想把日誌保存到文件中,僅僅打開一個控制平臺,鍵入:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log
若是你想在你的屏幕上看這些日誌, 新建一個終端而且運行:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar ReceiveLogs
固然,爲了發出日誌鍵入:
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar EmitLog
使用rabbitmactl list_bindings
你能夠驗證這代碼確實建立綁定和咱們想要的隊列。隨着兩個ReceiveLogs.java
程序的運行你能夠看到一些如:
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
這結果的解釋是直白簡單的:來自交換機的日誌流向服務器安排的兩個隊列中。而且那確實咱們所指望的。 爲了弄明白如何監聽一個消息的子集,讓咱們移到指南的第四部分。