在上一篇的最後,編寫了一個C#驅動RabbitMQ的簡單栗子,瞭解了C#驅動RabbitMQ的基本用法。本章介紹RabbitMQ的四種Exchange及各類Exchange的使用場景。html
上一篇最後一個栗子使用的Exchange就是direct類型的,direct類型的exchange路由規則很簡單:react
exchange在和queue進行binding時會設置routingkey(爲了不和下邊的routingKey混淆,不少時候把這裏的routingKey叫作BindingKey)ide
channel.QueueBind(queue:"Q1", exchange:"myexchange", routingKey:"orange");
將消息發送到Broker時會設置對應的routingkey:測試
channel.BasicPublish(exchange: "myexchange",routingKey: "orange", basicProperties: null, body: body);
只有RoutingKey和BindingKey徹底相同時,exchange纔會把消息路由到綁定的queue中去。網站
咱們知道了direact類型的交換機只有routingKey和bindingKey相同的時候纔會進行消息路由,根據這一特色咱們能夠經過routingKey將消息路由到不一樣的queue中。如在進行日誌處理時,需求是全部的日誌都保存到文本文件,出現錯誤日誌時則還須要短信通知以便及時處理。咱們能夠建立兩個隊列:只接收錯誤日誌的log_error隊列和接收全部日誌信息的log_all隊列。消費者C1處理log_error隊列中消息,將這些消息經過短信通知管理員,消費者C2處理log_all隊列的信息,將這些信息記錄到文本文件。ui
生產者用於發送日誌消息,代碼以下:spa
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //聲明兩個隊列,log_all保存全部日誌,log_error保存error類型日誌 channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueDeclare(queue: "log_error", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定全部日誌類型到log_all隊列 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string item in logtypes) { channel.QueueBind(queue: "log_all", exchange: "myexchange", routingKey: item); } //綁定錯誤日誌到log_all隊列 channel.QueueBind(queue: "log_error", exchange: "myexchange", routingKey: "error"); //準備100條測試日誌信息 List<LogMsg> msgList = new List<LogMsg>(); for (int i = 1; i < 100; i++) { if (i%4==0) { msgList.Add(new LogMsg() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}條信息") }); } if (i % 4 == 1) { msgList.Add(new LogMsg() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}條信息") }); } if (i % 4 == 2) { msgList.Add(new LogMsg() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}條信息") }); } if (i % 4 == 3) { msgList.Add(new LogMsg() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}條信息") }); } } Console.WriteLine("生產者發送100條日誌信息"); //發送日誌信息 foreach (var item in msgList) { channel.BasicPublish(exchange: "myexchange", routingKey: item.LogType, basicProperties: null, body: item.Msg); } } } Console.ReadKey(); } }
消費者C1用於處理log_error隊列中的消息,錯誤消息進行短信通知,代碼以下:.net
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定 channel.QueueBind(queue: "log_error", exchange: "myexchange", routingKey: "error"); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是爲了演示,並無存入文本文件 Console.WriteLine($"接收成功!【{message}】,發送短信通知"); }; Console.WriteLine("消費者C1【接收錯誤日誌,發送短信通知】準備就緒...."); //處理消息 channel.BasicConsume(queue: "log_error", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消費者C2用於處理log_all隊列中的消息,全部消息記錄到文本文件中,代碼以下:debug
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myexchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "log_all", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定 string[] logtypes = new string[] { "debug", "info", "warn", "error" }; foreach (string item in logtypes) { channel.QueueBind(queue: "log_all", exchange: "myexchange", routingKey: item); } //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是爲了演示,並無存入文本文件 Console.WriteLine($"接收成功!【{message}】,存入文本文件"); }; Console.WriteLine("消費者C2【接收全部日誌信息,存入文本文件】準備就緒...."); //處理消息 channel.BasicConsume(queue: "log_all", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
運行這三個項目,執行結果以下:3d
fanout類型的exchange路由規則是最簡單的,交換機會把消息廣播到與該Exchange綁定的全部queue中,即全部和該exchange綁定的隊列都會收到消息。fanout類型exchange和隊列綁定時不須要指定routingKey,即便指定了routingKey也會被忽略掉。路由結構以下:
fanout類型交換機主要用於發佈/訂閱的一些場景,如用戶註冊了咱們的網站後,咱們經過短信和郵件兩種方式通知用戶
這裏經過代碼簡單演示將消息同時使用短信和郵件兩種方式通知用戶的流程。首先聲明一個fanout類型的exchange,而後聲明兩個隊列 SMSqueue和EMAILqueue,這兩個隊列都和這個exchange綁定。消費者1處理EMAILqueue的消息,經過郵件方式發送通知;消費者2處理SMSqueue的消息經過短信方式發送通知。
生產者發送信息,代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //第一步:建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //聲明SMSqueue隊列,用於短信通知 channel.QueueDeclare(queue: "SMSqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明隊列,Email隊列,用於郵件通知 channel.QueueDeclare(queue: "EMAILqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null); channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null); Console.WriteLine("生產者準備就緒...."); string message = ""; //第六步:發送消息 //在控制檯輸入消息,按enter鍵發送消息 while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase)) { message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); //基本發佈 channel.BasicPublish(exchange: "myfanoutexchange", routingKey: string.Empty, basicProperties: null, body: body); Console.WriteLine($"消息【{message}】已發送到隊列"); } } } Console.ReadKey(); }
消費者1將EMAILqueue的消息經過郵件方式發送通知,代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "EMAILqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是爲了演示,並無存入文本文件 Console.WriteLine($"接收成功!【{message}】,郵件通知"); }; Console.WriteLine("郵件通知服務準備就緒..."); //處理消息 channel.BasicConsume(queue: "EMAILqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消費者2將SMSqueue的消息經過短信方式發送通知,代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myfanoutexchange", type: ExchangeType.Fanout, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "SMSqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //綁定exchange和queue channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //只是爲了演示,並無存入文本文件 Console.WriteLine($"接收成功!【{message}】,短信通知"); }; Console.WriteLine("短信通知服務準備就緒..."); //處理消息 channel.BasicConsume(queue: "myfanoutqueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
啓動這三個應用程序,執行結果以下:
topic類型exchange的路由規則也是基於routingKey和bindingKey的,其路由過程和direct類型基本一致,二者的區別在於direct類型的exchange要求routingKey和bindingKey必須相同才進行將消息路由到綁定的queue中,而topic類型的bindingKey是一個匹配規則,只要routingKey符合bindingKey的規則就能夠將消息路由到綁定的queue中去,結構以下圖所示。注意routingKey和bindingKey的結構都是一系列由點號鏈接單詞的字符串,例如【aaa.bbb.ccc】。
bindingKey的兩個特殊符號:*表示一個單詞,#表示0或多個單詞(注意是單詞,而不是字符)。以下圖,usa.news和usa.weather都和usa.#匹配,而usa.news和europe.news都和#.news匹配。
這裏使用代碼實現上圖中的例子,爲了方便咱們只定義兩個隊列:接收美國相關信息的usaQueue(bindingKey是usa.#)和接收新聞消息的newsQueue(bindingKey是#.news)。而後定義兩個消費者,消費者1處理useaQueue的消息,消費者2處理newsQueue的消息。
生產者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //聲明隊列usaQueue channel.QueueDeclare(queue: "usaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明隊列newsQueue channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生產者準備就緒...."); //綁定usaQueue隊列到交互機,routingKey爲usa.# channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null); //綁定newsQueue隊列到交互機,routingKey爲#.news channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null); ////--------------------開始發送消息 //1.發送美國新聞消息 string message1 = "美國新聞消息:內容balabala"; var body1 = Encoding.UTF8.GetBytes(message1); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "usa.news", basicProperties: null, body: body1); Console.WriteLine($"消息【{message1}】已發送到隊列"); //2.發送美國天氣消息 string message2 = "美國天氣消息:內容balabala"; var body2 = Encoding.UTF8.GetBytes(message2); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "usa.weather", basicProperties: null, body: body2); Console.WriteLine($"消息【{message2}】已發送到隊列"); //3.發送歐洲新聞消息 string message3 = "歐洲新聞消息:內容balabala"; var body3 = Encoding.UTF8.GetBytes(message3); channel.BasicPublish(exchange: "mytopicExchange", routingKey: "europe.news", basicProperties: null, body: body3); Console.WriteLine($"消息【{message3}】已發送到隊列"); //4.發送歐洲天氣消息 string message4 = "歐洲天氣消息:內容balabala"; var body4 = Encoding.UTF8.GetBytes(message4); //基本發佈 channel.BasicPublish(exchange: "mytopicExchange", routingKey: "europe.weather", basicProperties: null, body: body4); Console.WriteLine($"消息【{message4}】已發送到隊列"); } } Console.ReadKey(); }
消費者1代碼,只處理usaQueue中的消息:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "usaQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("usaQueue消費者準備就緒...."); //綁定usaQueue隊列到交互機 channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接收成功!【{message}】"); }; //處理消息 channel.BasicConsume(queue: "usaQueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
消費者2代碼,只處理newsQueue中的消息:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "mytopicExchange", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null); //聲明隊列queue channel.QueueDeclare(queue: "newsQueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("newsQueue消費者準備就緒...."); //綁定usaQueue隊列到交互機 channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null); //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"接收成功!【{message}】"); }; //處理消息 channel.BasicConsume(queue: "newsQueue", autoAck: true, consumer: consumer); Console.ReadLine(); } } }
生成者發送的四條消息中,消息1的routingKey爲usa.news,同時符合usaQueue的bindingKey(usa.#)和newsQueue的bindingKey(#.news),因此消息1同時路由到兩個隊列中;消息2的routingKey爲usa.weather只符合usa.#,發送到usaQueue;消息的rouKey爲europe.news,只符合#.news,發送到newsQueue中;消息4的routingKey爲europe.weahter,和兩個隊列的bindingKey都不符合,因此被丟棄。執行這三個Console應用程序,結果以下:
一點補充:topic類型交換機十分靈活,能夠輕鬆實現direct和fanout類型交換機的功能。若是綁定隊列時全部的bindingKey都是#,則交換機和fanout類型交換機表現一致;若是全部的bindingKey都不包含*和#,則交換機和direct類型交換機表現一致。
header類型路由規則和上邊的幾種exchange都不同,header類型exchange不是經過routingKey進行路由的,而是經過Headers。exchange在和queue進行binding時能夠設置arguments:
channel.QueueBind(queue: "Allqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","all"}, { "user","jack"}, { "pass","123"}
});
將消息發送到exchange時能夠設置消息的Header:
var props1 = channel.CreateBasicProperties(); props1.Headers = new Dictionary<string, object>() { { "user","jack"}, { "pass","123"} }; var body1 = Encoding.UTF8.GetBytes(msg1); //發送消息 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props1, body: body1);
user和pass是普通的鍵值對,咱們也能夠設置其餘的鍵值對。x-match是一個特殊的屬性,當x-match爲all時,aguments和basicProrperties.Headers的全部鍵值對都相等時纔會路由到queue(AND關係);當x-match爲any時,aguments和basicProrperties.Headers的鍵值對只要有一個相同就能夠路由到queue(OR關係)。
看一個簡單的栗子,建立兩個隊列Allqueue和Anyqueue,其中Allqueue和exchange綁定時的x-match爲all,Anyqueue和exchange綁定時的x-match爲any;而後發送兩條消息,發送第一條消息時basicProperties.Headers中的user和pass都和綁定隊列時的agruments的user和pass相等,發送第二條消息是二者的pass不相等,代碼以下:
static void Main(string[] args) { var factory = new ConnectionFactory() { //rabbitmq-server所在設備ip,這裏就是本機 HostName = "127.0.0.1", UserName = "wyy",//用戶名 Password = "123321"//密碼 }; //建立鏈接connection using (var connection = factory.CreateConnection()) { //建立通道channel using (var channel = connection.CreateModel()) { //聲明交換機exchang channel.ExchangeDeclare(exchange: "myheaderExchange", type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null); //聲明Allqueue隊列 channel.QueueDeclare(queue: "Allqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); //聲明Anyqueue隊列 channel.QueueDeclare(queue: "Anyqueue", durable: true, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine("生產者準備就緒...."); //////發送消息消息1,user和pass都相同 //綁定exchange和Allqueue channel.QueueBind(queue: "Allqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","all"}, { "user","jack"}, { "pass","123"}}); string msg1 = "user和pass都相同時發送的消息"; var props1 = channel.CreateBasicProperties(); props1.Headers = new Dictionary<string, object>() { { "user","jack"}, { "pass","123"} }; var body1 = Encoding.UTF8.GetBytes(msg1); //基本發佈 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props1, body: body1); Console.WriteLine($"消息【{msg1}】已發送到隊列"); //////發送消息消息2,user和pass不徹底相同 //綁定exchange和Anyqueue channel.QueueBind(queue: "Anyqueue", exchange: "myheaderExchange", routingKey: string.Empty, arguments: new Dictionary<string, object> { { "x-match","any"}, { "user","jack"}, { "pass","123"},}); string msg2 = "user和pass不徹底相同時發送的消息"; var props2 = channel.CreateBasicProperties(); props2.Headers = new Dictionary<string, object>() { { "user","jack"}, { "pass","456"}//這裏的pass和BindQueue方法的中argumens中的pass不相同 }; var body2 = Encoding.UTF8.GetBytes(msg2); //基本發佈 channel.BasicPublish(exchange: "myheaderExchange", routingKey: string.Empty, basicProperties: props2, body: body2); Console.WriteLine($"消息【{msg2}】已發送到隊列"); } } Console.ReadKey(); } }
執行程序,打開WebUI管理界面,結果以下,咱們看到只有user和pass都相等時消息纔會路由到Allqueue;user和pass只要有一個相等就會路由到Anyqueue
RabbitMQ的交換機(exchange)的做用是路由消息,咱們能夠根據應用場景的不一樣選擇合適的交換機。若是須要精準路由到隊列,或者對消息進行單一維度分類(只對日誌的嚴重程度這一維度進行分類)可使用direct類型交換機;若是須要廣播消息,可使用fanout類型的交換機;若是對消息進行多維度分類(如例子中按照地區和消息內容類型兩個維度進行分類)使用topic類型的交換機;若是消息歸類的邏輯包含了較多的AND/OR邏輯判斷可使用header類型交換機(開發中不多用到Header類型,官網上關於Header類型的介紹也很少)。
【參考文章】
1. https://www.cnblogs.com/zhangweizhong/p/5713874.html
2.https://blog.csdn.net/ctwy291314/article/details/83147194