上一節,咱們講了direct exchange,這節咱們講下topic exchangehtml
發送到topic exchange的messages不能夠有一個隨意的routing_key, 它必須是使用.分隔的一些詞的集合。例如: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" .ui
這些binding key 必須有一樣的樣式(in the same form). topic exchange背後的邏輯與direct exchange類似:帶有特定的routing key的message會發送到全部有匹配的binding key 的全部queues。spa
然而,對於binding keys有兩個重要的特例:code
*(star) 能替代一個詞orm
#(hash) 能替代0個或者多個詞htm
下面是一個簡單的例子blog
這個例子中,咱們將會發送描述動物的message. 這些message將會帶有一個routing key . 這個routing key 由三個詞組成. 第一個詞描述速度(speed),第二個詞描述膚色(colour),第三個詞描述物種。 <speed>.<colour>.<species>rabbitmq
咱們建立了3條bindings. Q1 綁定」*.orange.*」而且Q2綁定「*.*.rabbit」和「lazy.#」ip
這些bindings總結下就是:ci
Topic exchange是很強大的,而且能夠表現的像其餘exchange.
當一個queue綁定帶「#」的binding key時,它能夠接受全部的messages而不用管routing key.就像fanout exchange.
當特殊字符* 和#沒有在bindings中使用時,topic exchange表現的像direct exchange.
咱們將使用topic exchange.對於routing key ,咱們將使用這兩個詞:<facility>.<severity>.
EmitLogTopic.cs
using System;using System.Linq;using RabbitMQ.Client;using System.Text; class EmitLogTopic { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", //topic類型的exchange type: "topic"); var routingKey = (args.Length > 0) ? args[0] : "anonymous.info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip( 1 ).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "topic_logs", //發送特定routing key的message routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message); } } }
使用示例:
cd EmitLogTopic dotnet run "kern.critical" "A critical kernel error"
ReceiveLogsTopic.cs
using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text; class ReceiveLogsTopic { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); //聲明topic exchange var queueName = channel.QueueDeclare().QueueName; //聲明隨機生成的queue name if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [binding_key...]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var bindingKey in args) { channel.QueueBind(queue: queueName, //綁定bindingkey exchange: "topic_logs", routingKey: bindingKey); } Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; //message的routingkey Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
使用示例:
接收全部的logs
cd ReceiveLogsTopic dotnet run "#"
接收全部facility:kern的logs
cd ReceiveLogsTopic dotnet run "kern.*"
僅僅critical的logs
cd ReceiveLogsTopic dotnet run "*.critical"
建立多個bindings
cd ReceiveLogsTopic dotnet run "kern.*" "*.critical"
參考網址:
https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html