【譯】RabbitMQ系列(三) - 發佈/訂閱模式

發佈訂閱模式

在以前的文章裏,建立了work queue。work queue中,每個task都會派發給一個worker。在本章中,咱們會完成徹底不同的事情 - 咱們會派發一條message給多個消費者。咱們稱之爲發佈訂閱模式。html

爲了更好來講明,咱們將要構建一個簡單的日誌系統。會由兩部分代碼構成,第一部分來發送日誌message,第二部分會接受並打印日誌。java

在咱們的日誌系統中,每個接收程序都會收到日誌message。這種方式下,咱們能夠運行一個接收程序將日誌保存到磁盤,同時使用另一個接收程序將日誌打印到屏幕。bash

本質上來講,發佈的日誌message會廣播到全部運行的接收者。服務器

Exchanges

在以前的章節咱們經過queue收發message。如今開始介紹Rabbit中的full messaging model。學習

首先讓咱們快速的回憶一下以前的章節spa

  • producer是一個發送message的用戶程序。
  • queue是保存message的緩衝區
  • consumer是接收message的用戶程序

RabbitMQ的messaging model的核心思想是producer不會直接向queue發送message。實際上,不少時候producer也不知道message會發送到哪些queue。日誌

這裏,producer將message發送到exchange。exchange是一個很是簡單的東西。一方面它從producer側接收message,另外一方面它把message推送到queue去。 exchange必須知道對接收到的message接着要去作什麼。是轉發到特定的queue?仍是轉發到多個queue?仍是乾脆丟棄掉。這個規則取決於定義時exchange的類型。code

圖片描述

exchange有四種可選的類型:direct, topic, headers和fanout. 今天咱們聚焦於最後一種-fanout。讓咱們建立一個fanout類型的exchange,命名爲logshtm

channel.exchangeDeclare("logs","fanout");

fanout類型的exchange是很是簡單的。能夠從名字上大概猜出其用途,它廣播全部的message到它所知道的queue去。這也正是日誌應用所指望的。rabbitmq

列出全部的exhange,可使用rabbitmqctl命令 sudo rabbitmqctl list_exchanges,在列表總會出現一些amq.* 的exchange,和默認的exchange。這些是默認自動建立的,咱們不會使用到它們。

沒有名字的exchange。在以前的章節裏咱們沒有提到過exchanges,咱們直接將message發送到queue。其實咱們是用到了默認的exchange,用空字符串」「來標識。回想一下,咱們像下面這樣發佈message:
channel.basicPublish("","hello",null,message.getBytes()); 第一個參數就是exchange的名字。空字符串表明了沒有名字的exchange:message被路由到了由routingKey指定名字的queue。

如今,咱們能夠向有名字的exchange發佈message。

channel.basicPublish("logs","",null,message.getBytes());

Temporary Queue

以前咱們使用queue時都會指定名字,如hello和task_queue。給一個queue命名是很重要的,由於咱們要給worker指出相同的queue。當須要在生產者和消費者間共享一個queue時,就必須給queue取好名字。

可是在咱們日誌應用中,狀況卻有所不一樣。咱們須要接收到全部的log message。咱們也關注當前流動的message。咱們須要搞定2個事情。

首先,當鏈接到Rabbit時,咱們須要一個全新的,空的queue。所以咱們能夠本身建立一個隨意名字的queue,或是由服務器選擇隨意的queue名字,這固然是更好的選擇。

其次,當咱們斷開接收者時,該queue能夠被自動刪除。

在java客戶端中,當咱們使用無參的queueDeclare()時,咱們建立的是使用自動生成名字的一個不持久的,自動刪除queue:

String queueName = channel.queueDeclare().getQueue();

能夠經過這裏來學習到exclusive標誌和其餘queue的相關屬性。

這時queue就具備一個隨機的名字,好比像amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings

圖片描述
咱們已經建立了一個fanout exchange和queue.如今咱們要設置exchange,讓它把message發送到咱們的queue。exchange和queue這種關係的創建咱們稱之爲binding.

channel.queueBind(queueName,"logs","");

從如今開始logs這個exchange就會將message推向咱們的隊列了。

可使用命令rabbitmqctl list_bindings 來列出當前全部的binding。

開始執行

圖片描述
生產者程序,和以前章節的代碼變化不大,主要的變化是咱們將message發送到exchange而不是一個queue。你發現咱們在發送的時候會填上一個routingKey,這個值在fanout類型的exchange中是被忽略的。下面是生產者EmitLog.java的代碼

public class EmitLog {

  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = argv.length < 1 ? "info: Hello World!" :
                            String.join(" ", argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }
}

如你所見,在創建connection以後咱們聲明瞭exchange.這一步是必要的,發佈Message到一個不存在的exchange是不容許的。

若是沒有queue綁定到exchange的時候,發佈的message是會丟失的,但在如今這個場景是OK的。下面是ReceiveLogs.java的代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

首先進行編譯

javac -cp $CP EmitLog.java ReceiveLog.java

若是要把日誌保存到文件,則

java -cp $CP ReceiveLogs > logs_from_rabbit.log

若是要在控制檯看日誌,在另外一個終端

java -cp $CP ReceiveLogs

最後來發送日誌

java -cp $CP EmitLog

使用rabbitmqctl list_bindings,來確認程序建立了咱們在代碼中指定的binding和queue. 運行兩個ReceiveLogs程序,你會看到像下面的輸出

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.
相關文章
相關標籤/搜索