在以前的章節中,咱們建立了工做隊列,以前的工做隊列的假設是每一個任務只被分發到一個worker。在這一節中,咱們會作一些徹底不同的事--把一條消息發送給多個消費者,這個模式叫作「發佈/訂閱」(publish/subscribe)。javascript
舉個例子,咱們要構建一個簡易的日誌系統。由兩個程序組成---一個來發出日誌消息,另外一個接收並把消息顯示出來。java
在咱們的日誌系統當中,每個正在運行的接收程序都會收到消息。這樣,咱們能夠運行一個receiver並把log定向到磁盤,而後再跑一個receiver,看看它是否會在屏幕上顯示日誌。node
事實上,被髮布的消息會被廣播到全部的receiver那裏。git
在以前的引導中,咱們從一個隊列中作了收發的操做。是時候介紹在Rabbit中的所有的消息模型了。github
讓咱們先快速地回顧一下以前學習的,api
producer 是一個發送消息的應用安全
queue 是一個存儲消息的buffer服務器
consumer 是一個接收消息的應用less
RabbitMQ中,消息模型的核心思想是生產者毫不會把消息直接發到隊列。實際上,生產者一般不知道一條消息是否已經被髮送到任意一個隊列中。學習
生產者只能把消息發到交換器。交換器是個簡單的東西。一方面接收從生產者那邊來的消息,另外一方面把他們push到隊列中。交換器必定要知道當它們接收到消息以後要如何處理。是否要追加到一個特殊的隊列?是否要追加到許多的隊列?或者丟掉這條消息?這些規則被定義爲交換類型。
如下是可使用的交換類型:direct, topic, header, fanout
。咱們介紹一下最後一個--fanout。讓咱們先建立一個fanout
類型的交換器「logs」:
ch.assertExchange('logs', 'fanout', {durable: false})
fanout類型的交換器很是簡單,咱們能夠從單單從名字上猜想,它就是把它接收到的消息廣播給全部已知的隊列。這也就是咱們的logger
所須要的。
列出全部的交換器(Listing exchanges)
你可使用rabbitmqctl
$sudo rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
在列表中,一些amq.*
的交換器和一些默認的(未命名的),都是被默認建立的,可是多是你用不到的
未命名的交換器(Nameless exchange)
在以前的章節中咱們未提過交換器,可是咱們仍然可以把消息傳到隊列中,這就是咱們使用了默認的交換器,由於咱們使用了空的字符串("")。
以前咱們是這樣發佈一條消息的ch.sendToQueue('hello', new Buffer('Hello World!'));
這裏咱們使用默認的或者未命名的交換器,若是第一個參數存在的話,消息會被路由到這個參數名的隊列。
如今,咱們可使用咱們定義好的交換器
ch.publish('logs', '', new Buffer('Hello World!'));
第二個參數爲空的話表明咱們不想把消息推到指定的隊列,只是想發佈到logs
的交換器中。
你還記得咱們以前用的聲明過的隊列(hello 和 task_queue)嗎?。可以指明一個隊列的名字對咱們來講是重要的--咱們須要把workers指到相同的隊列。
當你想要分享給消費者和生產者隊列的時候,給隊列起一個名字很重要。
但着不是咱們logger
這個程序須要的,咱們想監聽全部的log消息,不是一部分log消息。一樣的,咱們對正在流動的消息也感興趣(not in the old ones).咱們須要完成兩件事情:
第一,無論咱們何時鏈接Rabbit,都須要一個新的,空的隊列。咱們能夠建立一個隨機的隊列名字,或者讓服務器爲咱們隨機選擇一個隊列名字。
第二,無論咱們何時斷開與消費者的鏈接,隊列須要自動銷燬。
在amqp.node
的客戶端中,當咱們傳入字符串的時候,能夠建立一個帶有名字的未持久化的隊列
ch.assertQueue('', {exclusive: true});
這個方法返回一個帶有隨機名字的隊列實例,好比amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
當鏈接被斷開的時候,這個隊列會被銷燬,由於咱們在聲明的時候{exclusive:true}
咱們已經建立了一個fanout
類型的交換器和一個隊列,如今咱們須要告訴交換器把消息發送給隊列,隊列與交換器之間的關係咱們稱之爲綁定。ch.bindQueue(queue_name, 'logs', '');
如今開始,logs
的交換器爲追加消息到咱們的隊列
Listing bindings:
你能夠列出已經存在的綁定關係,你應該猜到。rabbitmqctl list_bindings
。
生產者的程序,用來發出log消息,和以前章節沒有太多的不一樣,最重要的改變就是如今咱們是把消息發佈到咱們的logs
的交換器中,而不是以前的在未聲明的狀況下使用。發送的時候咱們須要提供一個路由鍵,可是在fanout
類型當中,這個能夠忽略。下面是emit_log.js
的代碼
#!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var ex = 'logs'; var msg = process.argv.slice(2).join(' ') || 'Hello World!'; ch.assertExchange(ex, 'fanout', {durable: false}); ch.publish(ex, '', new Buffer(msg)); console.log(" [x] Sent %s", msg); }); setTimeout(function() { conn.close(); process.exit(0) }, 500); });
正如你所見,在與交換器創建鏈接以後。有一點很關鍵,向不存在的交換器發佈消息是被禁止的。
若是仍然沒有隊列綁定交換器,消息會丟失。可是對咱們來講還好,若是仍然沒有消費者監聽,咱們能夠安全地丟棄這些消息。
receive_logs.js
的代碼
#!/usr/bin/env node var amqp = require('amqplib/callback_api'); amqp.connect('amqp://localhost', function(err, conn) { conn.createChannel(function(err, ch) { var ex = 'logs'; ch.assertExchange(ex, 'fanout', {durable: false}); ch.assertQueue('', {exclusive: true}, function(err, q) { console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); ch.bindQueue(q.queue, ex, ''); ch.consume(q.queue, function(msg) { console.log(" [x] %s", msg.content.toString()); }, {noAck: true}); }); }); });
若是你想要保存log,你能夠打開控制檯輸入
$ ./receive_logs.js > logs_from_rabbit.log
若是你想在屏幕上看到log,再打開一個控制檯
$ ./receive_logs.js
固然,須要發出logs
$ ./emit_log.js
使用rabbitmqctl list_bindings
,你能夠肯定剛纔的代碼確實建立了交換器和隊列,有兩個receive_logs.js
的程序在運行。
$ sudo rabbitmqctl list_bindings Listing bindings ... logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] ...done.
這個結果的簡要解釋:數據從logs交換器到兩個服務器分配的隊列。這也是咱們想要的結果。
要如何監聽一部分的消息?讓咱們移到下一章。