RabbitMQ學習之Publish/Subscribe(3)

 上一個教程中,咱們建立了一個work queue. 其中的每一個task都會被精確的傳送到一個worker. 這節,咱們將會講把一個message傳送到多個consumers. 這種模式叫作publish/subscribe(發佈/訂閱).html

爲了說明這種模式,咱們將建立一個簡單的日誌系統(logging system. 它由兩個程序組成,一個是發送日誌message而且另外一個接收。安全

最重要的,發佈的日誌message將會被廣播到全部的receivers服務器

Exchangs

前面咱們講的包含下面的:producer,queue,consumerspa

它的主要思想是producer毫不直接發送任何messagequeue. 不少狀況下,producer甚至不知道一個message是否會被髮送到任何queue.3d

如圖,它會直接發送messages到一個exchange. 而對於exchange,一方面它接收來自producermessage,另外一方面它把這些message推送到queues. 至於,messages是否會被髮送一個特定的queue或者發送到不少queue或者丟棄,這些規則都由exchange type定義。rest

Exchange type: direct , topic , headers , fanout.日誌

咱們這節主要講fanout,它會控制廣播。code

channel.ExchangeDeclare("logs", "fanout");

對於fanout exchange ,它會廣播它收到的全部的messages 到它知道的全部的queue.htm

Listing exchanges

對於列出服務器上的exchanges , 你可使用rabbitmqctlblog

sudo rabbitmqctl list_exchanges
The default exchange

在前面的教程中,咱們不知道exchanges,可是咱們仍然能夠發送messages queues. 由於咱們使用到了一個默認的exchange(a default exchange).這個默認的exchange是被空字符串(「」)定義。

回想下,咱們以前怎樣發送message

 var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "",  //默認的exchange
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);

此時,messages會根據指定的routingKey被路由到queue.

如今,咱們能夠發佈到指定的exchange.

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

Temporary queues

以前咱們使用過不少指定名稱的queues(例如hellotask_queue). 能夠命名一個queue是很重要的,咱們能夠指定workers到同一個queue。 並且使你能夠在多個producersconsumers以前共享這個queue. 

We’re also interested only in currently flowing messages not in the old ones. 咱們想要最新的message而不是僅僅以前的。

這須要解決兩個事情。

  1. 首先,不管何時咱們鏈接Rabbit,咱們須要一個新的,空的queue。爲了達到這個目的,咱們能夠建立一個帶隨機名稱的queue。更好的辦法,咱們可讓服務器給咱們選擇一個隨機的queue名稱。
  2. 第二,一旦咱們斷開與consumer的鏈接,這個queue應該被自動刪除。 

.NET客戶端中,咱們使用下面的語句建立一個帶隨機名稱的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)

var queueName = channel.QueueDeclare().QueueName;

Bindings

咱們已經建立好了exchangequeue,它們之間的關係咱們叫作binding. 用來告訴exchange發送messagesqueue. 

channel.QueueBind(queue: queueName,  //綁定
                  exchange: "logs",
                  routingKey: "");

如今,在logs exchange上會把messages發到咱們的queue

Listing bindings
rabbitmqctl list_bindings

代碼

這種fanout exchanges ,在發送時,會忽視routingKey的值。

EmitLog.cs(發送)

using System;using RabbitMQ.Client;using System.Text;
class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");  //聲明exchange

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",  //發送到logs exchange
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

不容許發送到一個不存在的exchange.

若是沒有queue綁定到exchangemessages將會丟失。若是沒有consumer正在監聽,咱們能夠安全的丟棄這些message.

ReceiveLogs.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //聲明exchange

            var queueName = channel.QueueDeclare().QueueName;  //得到隨機queue name
            channel.QueueBind(queue: queueName,  //定義queue和exchange的關係
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);  //回調
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

參考網址:

https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

相關文章
相關標籤/搜索