前兩章咱們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。若是對direct模式下經過routingkey來匹配消息的模式已經有必定了解那fanout也很好理解。簡單的能夠理解成direct是經過routingkey精準匹配的,而topic是經過routingkey來模糊匹配。
在topic模式下支持兩個特殊字符的匹配。html
* (星號) 表明任意 一個單詞 # (井號) 0個或者多個單詞
注意:上面說的是單詞不是字符。ide
以下圖所示,RabbitMQ direct模式經過RoutingKey來精準匹配,RoutingKey爲red的投遞到Queue1,RoutingKey爲black和white的投遞到Queue2。 post
咱們能夠假設一個場景,咱們要作一個日誌模塊來收集處理不一樣的日誌,日誌區分包含三個維度的標準:模塊、日誌緊急程度、日誌重要程度。模塊分爲:red、black、white;緊急程度分爲:critical、normal;把重要程度分爲:medium、low、high在RoutingKey字段中咱們把這三個維度經過兩個「.「鏈接起來。
如今咱們須要對black模塊,緊急程度爲critical,重要程度爲high的日誌分配到隊列1打印到屏幕;對因此模塊重要程度爲high的日誌和white緊急程度爲critical的日誌發送到隊列2持久化到硬盤。以下示例:url
-
RoutingKey爲「black.critical.high」的日誌會投遞到queue1和queue2,。spa
-
RoutingKey爲「red.critical.high」的日誌會只投遞到queue2。日誌
-
RoutingKey爲「white.critical.high」的日誌會投遞到queue2,而且雖然queue2的兩個匹配規則都符合但只會向queue2投遞一份。code
新建TopicProduct用來發布三種routingkey的消息。orm
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; String routeKeyName2 = "red.critical.high"; String routeKeyName3 = "white.critical.high"; String message1 = "black-critical-high!"; String message2 = "red-critical-high!"; String message3 = "white-critical-high!"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; Byte[] body1 = Encoding.UTF8.GetBytes(message1); Byte[] body2 = Encoding.UTF8.GetBytes(message2); Byte[] body3 = Encoding.UTF8.GetBytes(message3); //消息推送 channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3); Console.WriteLine(" [x] Sent {0}", message1); Console.WriteLine(" [x] Sent {0}", message2); Console.WriteLine(" [x] Sent {0}", message3); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; String routeKeyName2 = "red.critical.high"; String routeKeyName3 = "white.critical.high"; String message1 = "black-critical-high!"; String message2 = "red-critical-high!"; String message3 = "white-critical-high!"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; Byte[] body1 = Encoding.UTF8.GetBytes(message1); Byte[] body2 = Encoding.UTF8.GetBytes(message2); Byte[] body3 = Encoding.UTF8.GetBytes(message3); //消息推送 channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3); Console.WriteLine(" [x] Sent {0}", message1); Console.WriteLine(" [x] Sent {0}", message2); Console.WriteLine(" [x] Sent {0}", message3); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
新建TopicCustomerA接收一種消息htm
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerA { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerA { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
新建TopicCustomerB接收兩種消息blog
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerB { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "red.critical.*"; String routeKeyName2 = "white.critical.*"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerB { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "red.critical.*"; String routeKeyName2 = "white.critical.*"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
先運行TopicCustomerA和TopicCustomerB保持訂閱狀態。而後執行TopicProduct發佈消息。TopicCustomerA和TopicCustomerB收到的消息以下: