[譯] RabbitMQ tutorials (3) ---- 'Pub/Sub' (Javascript)

發佈與訂閱 (Publish/Subscribe)

在以前的章節中,咱們建立了工做隊列,以前的工做隊列的假設是每一個任務只被分發到一個worker。在這一節中,咱們會作一些徹底不同的事--把一條消息發送給多個消費者,這個模式叫作「發佈/訂閱」(publish/subscribe)。javascript

舉個例子,咱們要構建一個簡易的日誌系統。由兩個程序組成---一個來發出日誌消息,另外一個接收並把消息顯示出來。java

在咱們的日誌系統當中,每個正在運行的接收程序都會收到消息。這樣,咱們能夠運行一個receiver並把log定向到磁盤,而後再跑一個receiver,看看它是否會在屏幕上顯示日誌。node

事實上,被髮布的消息會被廣播到全部的receiver那裏。git

交換器(Exchanges)

在以前的引導中,咱們從一個隊列中作了收發的操做。是時候介紹在Rabbit中的所有的消息模型了。github

讓咱們先快速地回顧一下以前學習的,api

  • producer 是一個發送消息的應用安全

  • queue 是一個存儲消息的buffer服務器

  • consumer 是一個接收消息的應用less

RabbitMQ中,消息模型的核心思想是生產者毫不會把消息直接發到隊列。實際上,生產者一般不知道一條消息是否已經被髮送到任意一個隊列中。學習

生產者只能把消息發到交換器。交換器是個簡單的東西。一方面接收從生產者那邊來的消息,另外一方面把他們push到隊列中。交換器必定要知道當它們接收到消息以後要如何處理。是否要追加到一個特殊的隊列?是否要追加到許多的隊列?或者丟掉這條消息?這些規則被定義爲交換類型。

exchanges

如下是可使用的交換類型:direct, topic, header, fanout。咱們介紹一下最後一個--fanout。讓咱們先建立一個fanout類型的交換器「logs」:

ch.assertExchange('logs', 'fanout', {durable: false})

fanout類型的交換器很是簡單,咱們能夠從單單從名字上猜想,它就是把它接收到的消息廣播給全部已知的隊列。這也就是咱們的logger所須要的。


列出全部的交換器(Listing exchanges)
你可使用rabbitmqctl

$sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在列表中,一些amq.*的交換器和一些默認的(未命名的),都是被默認建立的,可是多是你用不到的

未命名的交換器(Nameless exchange)
在以前的章節中咱們未提過交換器,可是咱們仍然可以把消息傳到隊列中,這就是咱們使用了默認的交換器,由於咱們使用了空的字符串("")。
以前咱們是這樣發佈一條消息的
ch.sendToQueue('hello', new Buffer('Hello World!'));
這裏咱們使用默認的或者未命名的交換器,若是第一個參數存在的話,消息會被路由到這個參數名的隊列。


如今,咱們可使用咱們定義好的交換器

ch.publish('logs', '', new Buffer('Hello World!'));

第二個參數爲空的話表明咱們不想把消息推到指定的隊列,只是想發佈到logs的交換器中。

臨時隊列 (Temporary queues)

你還記得咱們以前用的聲明過的隊列(hello 和 task_queue)嗎?。可以指明一個隊列的名字對咱們來講是重要的--咱們須要把workers指到相同的隊列。
當你想要分享給消費者和生產者隊列的時候,給隊列起一個名字很重要。

但着不是咱們logger這個程序須要的,咱們想監聽全部的log消息,不是一部分log消息。一樣的,咱們對正在流動的消息也感興趣(not in the old ones).咱們須要完成兩件事情:
第一,無論咱們何時鏈接Rabbit,都須要一個新的,空的隊列。咱們能夠建立一個隨機的隊列名字,或者讓服務器爲咱們隨機選擇一個隊列名字。
第二,無論咱們何時斷開與消費者的鏈接,隊列須要自動銷燬。

amqp.node的客戶端中,當咱們傳入字符串的時候,能夠建立一個帶有名字的未持久化的隊列

ch.assertQueue('', {exclusive: true});

這個方法返回一個帶有隨機名字的隊列實例,好比amq.gen-JzTY20BRgKO-HjmUJj0wLg
當鏈接被斷開的時候,這個隊列會被銷燬,由於咱們在聲明的時候{exclusive:true}

綁定 (Bindings)

binding

咱們已經建立了一個fanout類型的交換器和一個隊列,如今咱們須要告訴交換器把消息發送給隊列,隊列與交換器之間的關係咱們稱之爲綁定。
ch.bindQueue(queue_name, 'logs', '');
如今開始,logs的交換器爲追加消息到咱們的隊列

Listing bindings:

你能夠列出已經存在的綁定關係,你應該猜到。rabbitmqctl list_bindings

整合(Putting it all together)

all

生產者的程序,用來發出log消息,和以前章節沒有太多的不一樣,最重要的改變就是如今咱們是把消息發佈到咱們的logs的交換器中,而不是以前的在未聲明的狀況下使用。發送的時候咱們須要提供一個路由鍵,可是在fanout類型當中,這個能夠忽略。下面是emit_log.js的代碼

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'logs';
    var msg = process.argv.slice(2).join(' ') || 'Hello World!';

    ch.assertExchange(ex, 'fanout', {durable: false});
    ch.publish(ex, '', new Buffer(msg));
    console.log(" [x] Sent %s", msg);
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

(emit_log.js 源碼)

正如你所見,在與交換器創建鏈接以後。有一點很關鍵,向不存在的交換器發佈消息是被禁止的。
若是仍然沒有隊列綁定交換器,消息會丟失。可是對咱們來講還好,若是仍然沒有消費者監聽,咱們能夠安全地丟棄這些消息。

receive_logs.js的代碼

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'logs';

    ch.assertExchange(ex, 'fanout', {durable: false});

    ch.assertQueue('', {exclusive: true}, function(err, q) {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
      ch.bindQueue(q.queue, ex, '');

      ch.consume(q.queue, function(msg) {
        console.log(" [x] %s", msg.content.toString());
      }, {noAck: true});
    });
  });
});

(receive_logs,js源碼)

若是你想要保存log,你能夠打開控制檯輸入

$ ./receive_logs.js > logs_from_rabbit.log

若是你想在屏幕上看到log,再打開一個控制檯

$ ./receive_logs.js

固然,須要發出logs

$ ./emit_log.js

使用rabbitmqctl list_bindings,你能夠肯定剛纔的代碼確實建立了交換器和隊列,有兩個receive_logs.js的程序在運行。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

這個結果的簡要解釋:數據從logs交換器到兩個服務器分配的隊列。這也是咱們想要的結果。

要如何監聽一部分的消息?讓咱們移到下一章。

相關文章
相關標籤/搜索