在以前的文章裏,建立了work queue。work queue中,每個task都會派發給一個worker。在本章中,咱們會完成徹底不同的事情 - 咱們會派發一條message給多個消費者。咱們稱之爲發佈訂閱模式。html
爲了更好來講明,咱們將要構建一個簡單的日誌系統。會由兩部分代碼構成,第一部分來發送日誌message,第二部分會接受並打印日誌。java
在咱們的日誌系統中,每個接收程序都會收到日誌message。這種方式下,咱們能夠運行一個接收程序將日誌保存到磁盤,同時使用另一個接收程序將日誌打印到屏幕。bash
本質上來講,發佈的日誌message會廣播到全部運行的接收者。服務器
在以前的章節咱們經過queue收發message。如今開始介紹Rabbit中的full messaging model。學習
首先讓咱們快速的回憶一下以前的章節spa
RabbitMQ的messaging model的核心思想是producer不會直接向queue發送message。實際上,不少時候producer也不知道message會發送到哪些queue。日誌
這裏,producer將message發送到exchange。exchange是一個很是簡單的東西。一方面它從producer側接收message,另外一方面它把message推送到queue去。 exchange必須知道對接收到的message接着要去作什麼。是轉發到特定的queue?仍是轉發到多個queue?仍是乾脆丟棄掉。這個規則取決於定義時exchange的類型。code
exchange有四種可選的類型:direct, topic, headers和fanout. 今天咱們聚焦於最後一種-fanout。讓咱們建立一個fanout類型的exchange,命名爲logshtm
channel.exchangeDeclare("logs","fanout");
fanout類型的exchange是很是簡單的。能夠從名字上大概猜出其用途,它廣播全部的message到它所知道的queue去。這也正是日誌應用所指望的。rabbitmq
列出全部的exhange,可使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表總會出現一些amq.* 的exchange,和默認的exchange。這些是默認自動建立的,咱們不會使用到它們。沒有名字的exchange。在以前的章節裏咱們沒有提到過exchanges,咱們直接將message發送到queue。其實咱們是用到了默認的exchange,用空字符串」「來標識。回想一下,咱們像下面這樣發佈message:
channel.basicPublish("","hello",null,message.getBytes()); 第一個參數就是exchange的名字。空字符串表明了沒有名字的exchange:message被路由到了由routingKey指定名字的queue。
如今,咱們能夠向有名字的exchange發佈message。
channel.basicPublish("logs","",null,message.getBytes());
以前咱們使用queue時都會指定名字,如hello和task_queue。給一個queue命名是很重要的,由於咱們要給worker指出相同的queue。當須要在生產者和消費者間共享一個queue時,就必須給queue取好名字。
可是在咱們日誌應用中,狀況卻有所不一樣。咱們須要接收到全部的log message。咱們也關注當前流動的message。咱們須要搞定2個事情。
首先,當鏈接到Rabbit時,咱們須要一個全新的,空的queue。所以咱們能夠本身建立一個隨意名字的queue,或是由服務器選擇隨意的queue名字,這固然是更好的選擇。
其次,當咱們斷開接收者時,該queue能夠被自動刪除。
在java客戶端中,當咱們使用無參的queueDeclare()時,咱們建立的是使用自動生成名字的一個不持久的,自動刪除queue:
String queueName = channel.queueDeclare().getQueue();
能夠經過這裏來學習到exclusive標誌和其餘queue的相關屬性。
這時queue就具備一個隨機的名字,好比像amq.gen-JzTY20BRgKO-HjmUJj0wLg.
咱們已經建立了一個fanout exchange和queue.如今咱們要設置exchange,讓它把message發送到咱們的queue。exchange和queue這種關係的創建咱們稱之爲binding.
channel.queueBind(queueName,"logs","");
從如今開始logs這個exchange就會將message推向咱們的隊列了。
可使用命令rabbitmqctl list_bindings 來列出當前全部的binding。
生產者程序,和以前章節的代碼變化不大,主要的變化是咱們將message發送到exchange而不是一個queue。你發現咱們在發送的時候會填上一個routingKey,這個值在fanout類型的exchange中是被忽略的。下面是生產者EmitLog.java的代碼
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = argv.length < 1 ? "info: Hello World!" : String.join(" ", argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
如你所見,在創建connection以後咱們聲明瞭exchange.這一步是必要的,發佈Message到一個不存在的exchange是不容許的。
若是沒有queue綁定到exchange的時候,發佈的message是會丟失的,但在如今這個場景是OK的。下面是ReceiveLogs.java的代碼:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
首先進行編譯
javac -cp $CP EmitLog.java ReceiveLog.java
若是要把日誌保存到文件,則
java -cp $CP ReceiveLogs > logs_from_rabbit.log
若是要在控制檯看日誌,在另外一個終端
java -cp $CP ReceiveLogs
最後來發送日誌
java -cp $CP EmitLog
使用rabbitmqctl list_bindings,來確認程序建立了咱們在代碼中指定的binding和queue. 運行兩個ReceiveLogs程序,你會看到像下面的輸出
sudo rabbitmqctl list_bindings # => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.