上一個教程中,咱們建立了一個work queue. 其中的每一個task都會被精確的傳送到一個worker. 這節,咱們將會講把一個message傳送到多個consumers. 這種模式叫作publish/subscribe(發佈/訂閱).html
爲了說明這種模式,咱們將建立一個簡單的日誌系統(logging system). 它由兩個程序組成,一個是發送日誌message而且另外一個接收。安全
最重要的,發佈的日誌message將會被廣播到全部的receivers服務器
前面咱們講的包含下面的:producer,queue,consumerspa
它的主要思想是producer毫不直接發送任何message到queue. 不少狀況下,producer甚至不知道一個message是否會被髮送到任何queue.3d
如圖,它會直接發送messages到一個exchange. 而對於exchange,一方面它接收來自producer的message,另外一方面它把這些message推送到queues. 至於,messages是否會被髮送一個特定的queue或者發送到不少queue或者丟棄,這些規則都由exchange type定義。rest
Exchange type: direct , topic , headers , fanout.日誌
咱們這節主要講fanout,它會控制廣播。code
channel.ExchangeDeclare("logs", "fanout");
對於fanout exchange ,它會廣播它收到的全部的messages 到它知道的全部的queue.htm
對於列出服務器上的exchanges , 你可使用rabbitmqctlblog
sudo rabbitmqctl list_exchanges
在前面的教程中,咱們不知道exchanges,可是咱們仍然能夠發送messages 到queues. 由於咱們使用到了一個默認的exchange(a default exchange).這個默認的exchange是被空字符串(「」)定義。
回想下,咱們以前怎樣發送message
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", //默認的exchange routingKey: "hello", basicProperties: null, body: body);
此時,messages會根據指定的routingKey被路由到queue.
如今,咱們能夠發佈到指定的exchange.
var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
以前咱們使用過不少指定名稱的queues(例如hello和task_queue). 能夠命名一個queue是很重要的,咱們能夠指定workers到同一個queue。 並且使你能夠在多個producers和consumers以前共享這個queue.
We’re also interested only in currently flowing messages not in the old ones. 咱們想要最新的message而不是僅僅以前的。
這須要解決兩個事情。
第二,一旦咱們斷開與consumer的鏈接,這個queue應該被自動刪除。
在.NET客戶端中,咱們使用下面的語句建立一個帶隨機名稱的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)
var queueName = channel.QueueDeclare().QueueName;
咱們已經建立好了exchange和queue,它們之間的關係咱們叫作binding. 用來告訴exchange發送messages到queue.
channel.QueueBind(queue: queueName, //綁定 exchange: "logs", routingKey: "");
如今,在logs exchange上會把messages發到咱們的queue。
rabbitmqctl list_bindings
這種fanout exchanges ,在發送時,會忽視routingKey的值。
EmitLog.cs(發送)
using System;using RabbitMQ.Client;using System.Text; class EmitLog { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //聲明exchange var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", //發送到logs exchange routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } }
不容許發送到一個不存在的exchange.
若是沒有queue綁定到exchange,messages將會丟失。若是沒有consumer正在監聽,咱們能夠安全的丟棄這些message.
ReceiveLogs.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //聲明exchange var queueName = channel.QueueDeclare().QueueName; //得到隨機queue name channel.QueueBind(queue: queueName, //定義queue和exchange的關係 exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); //回調 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
參考網址:
https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html