近些年微服務愈來愈火,讓我也忍不住想去一窺微服務究竟,講到微服務,就離不開分佈式,而分佈式,也離不開消息隊列,在消息隊列中,RabbitMQ能夠說是比較具備表明性的一款。網絡
這裏是一篇介紹消息隊列以及各類消息隊列產品對比的文章,講得很好,有興趣的能夠看一看。分佈式
https://cloud.tencent.com/developer/article/1006035微服務
在講RabbitMQ以前,首先須要在電腦上安裝和配置RabbitMQ,網絡上已經有不少這類文章,若是懶得去搜索,能夠看看這篇介紹如何安裝配置RabbitMQ的文章。ui
http://www.javashuo.com/article/p-uznunwyh-ho.htmlspa
其中,在安裝RabbitMQ的過程當中,遇到了一個坑,在啓用RabbltMQ的管理界面執行.net
rabbitmq-plugins enable rabbitmq_managementcode
命令時,出現瞭如下這樣的報錯blog
能夠在該指令前加上 .\ 即rabbitmq
.\rabbitmq-plugins enable rabbitmq_management隊列
祝安裝順利 !!
下面是在.Net中使用RabbitMQ要明白的一些名詞概念。
綜上所訴,他們之間的關係能夠用我下面的 醜圖 表示。
在圖中,沒有吧Routing key畫出。Producer每一次發送消息,除了發出消息自己,還會隨着消息帶上一個routingKey,並且每一次將Exchange和Queue綁定,大致須要三個參數,
string queueName, string exchangeName, string routingKey
其中也有一個routingKey,但此RoutingKey非彼Routingkey。
對這個過程,咱們能夠理解爲國家給災區發送救災物資,國家給當地政府劃撥物資的時候,會規定,誰才能拿到這批物資,如(房子倒了的.家裏有人受傷了的.家庭經濟困難的)。
而當地政府在分配這批物資以前,爲了方便物資的分配,會給每一個家庭貼上一個標籤,如
家庭A 經濟困難
家庭B 房子倒了.經濟困難
家庭C 家庭富有.房子倒了
家庭D 房子倒了的.家裏有人受傷了的.家庭經濟困難的
因此,發送消息時候的routingKey就是國家規定的那批物質分配規則。
而Exchange和Queue綁定時的RoutingKey能夠理解爲當地政府給每一個家庭貼上的一個標籤。
Exchange(交換機)轉發消息的規則也有不少種:direct, topic, headers(不經常使用) 和 fanout,咱們稱之爲交換類型。
咱們能夠把Exchange理解爲分配這批物質的政府,如今國家規定了宏觀的分配方向(發送消息時的routingKey),每一個家庭也有了家庭狀況的標籤(綁定Exchange時的routingKey),可是這個物資具體怎麼分,仍是當地政府說了算。
Direct 嚴格按照國家規定來,只有房子倒了的,家裏有人受傷了的並且家庭經濟困難的才能分到救災物資。 家庭D能分到
Fanout 只要是災區的居民都能分到, 無論家庭狀況如何。 家庭A\B\C\D都能分到
Topic 主題匹配: 只要家庭狀況在國家規定分配規則內的,都能分到物資,可是家庭C分不到,由於他家太有錢了,這個條件不在國家的分配規則裏。家庭A\B\D能分到
因此,咱們在聲明一個Exchange(交換機)的同時,還要指定該交換機的類型,即(當地政府怎麼來分救災物資)
其實,用這個例子,我是想說,生產者和消費者之間,就像國家與難民之間同樣,國家只知道,我要幫助難民,可是難民有誰,物資能不能分到難民手裏,還得當地政府說了算,你就說我這個例子恰不恰當吧!哈哈😄
好了,懂了概念,咱們再來結合具體例子看看。
Producer.cs的代碼
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var chancel = connection.CreateModel()) { //生命交換機 chancel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //給交換機發送消息 chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body); Console.WriteLine($"成功發送消息{readMsg}"); Console.WriteLine("請輸入要發送的內容!"); readMsg = Console.ReadLine(); } } } } } }
Customer.cs代碼
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var channel = connection.CreateModel()) { //聲明一個Fanout類型的交換機 channel.ExchangeDeclare(exchange: "FanoutDemo", type: ExchangeType.Fanout); //聲明一個消息隊列並獲取它的名字 var queueName = channel.QueueDeclare().QueueName; //把消息隊列和交換機綁定 channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: ""); //建立消費者 var consume = new EventingBasicConsumer(channel); //把消費者和隊列綁定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
在上面的代碼中,不管是在生產者的發送消息裏
//給交換機發送消息
chancel.BasicPublish(exchange: "FanoutDemo", routingKey: "", body: body);
仍是消費者所在的Queue的綁定裏
//把消息隊列和交換機綁定
channel.QueueBind(exchange: "FanoutDemo", queue: queueName, routingKey: "");
咱們都沒有制定routingKey,由於沒我的都能獲取消息,因此此處,聲明routingKey就沒有意義了。
咱們看看運行效果。
運行了三個消費者,當生產者發出消息時,三個消費者都收到了相同的消息。能夠理解爲廣播模式。(Customer單詞拼寫錯了,圖片修改不方便,就不改了,你們將就一下)
Direct時嚴格匹配的,只有隊列綁定的RoutingKey與生產者發送消息時指定的RoutingKey徹底相同,才能接收成功。
Producer.cs
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var chancel = connection.CreateModel()) { //生命交換機 chancel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //給交換機發送消息 chancel.BasicPublish(exchange: "DirectDemo", routingKey: "Direct.Key", body: body); Console.WriteLine($"成功發送消息{readMsg}"); Console.WriteLine("請輸入要發送的內容!"); readMsg = Console.ReadLine(); } } } } } }
我把Exchange的類型更改成Direct類型,而且發送消息的routingKey設置爲Direct.Key。
而後咱們來定義消費者
Customer.CS
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var channel = connection.CreateModel()) { //聲明一個Fanout類型的交換機 channel.ExchangeDeclare(exchange: "DirectDemo", type: ExchangeType.Direct); //聲明一個消息隊列並獲取它的名字 var queueName = channel.QueueDeclare().QueueName; Console.WriteLine("請輸入RoutingKey!"); var routingKey = Console.ReadLine(); //把消息隊列和交換機綁定 channel.QueueBind(exchange: "DirectDemo", queue: queueName, routingKey: routingKey); //建立消費者 var consume = new EventingBasicConsumer(channel); //把消費者和隊列綁定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); Console.WriteLine("開始監聽消息"); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
消費者的RoutingKey再控制檯輸入,
運行效果以下:
能夠看到,只有RoutingKey爲Direct.Key的消費者才收到了生產者發出的消息。
RabbitMQ 中的 RouteKey 支持綁定鍵表達式寫法,有兩種主要的綁定鍵:
*(星號)能夠代替一個單詞.
# (井號) 能夠代替0個或多個單詞.
好比在下面這個圖中(P爲發送者,X爲RabbitMQ中的Exchange,C爲消費者,Q爲隊列)
在這個示例中,咱們將發送一條關於動物描述的消息,也就是說 Name(routeKey) 字段中的內容包含 3 個單詞。第一個單詞是描述速度的(celerity),第二個單詞是描述顏色的(colour),第三個是描述哪一種動物的(species),它們組合起來相似:「..」。
而後在使用 CapSubscribe
綁定的時候,Q1綁定爲 CapSubscribe["*.orange.*"]
, Q2 綁定爲CapSubscribe["*.*.rabbit"]
和 [CapSubscribe["lazy.#]
。
那麼,當發送一個名爲 "quick.orange.rabbit" 消息的時候,這兩個隊列將會同時收到該消息。一樣名爲 lazy.orange.elephant
的消息也會被同時收到。另外,名爲 "quick.orange.fox" 的消息將僅會被髮送到Q1隊列,名爲 "lazy.brown.fox" 的消息僅會被髮送到Q2。"lazy.pink.rabbit" 僅會被髮送到Q2一次,即便它被綁定了2次。"quick.brown.fox" 沒有匹配到任何綁定的隊列,因此它將會被丟棄。
另一種狀況,若是你違反約定,好比使用 4個單詞進行組合,例如 "quick.orange.male.rabbit",那麼它將匹配不到任何的隊列,消息將會被丟棄。
可是,假如你的消息名爲 "lazy.orange.male.rabbit",那麼他們將會被髮送到Q2,由於 #(井號)能夠匹配 0 或者多個單詞。
咱們結合代碼來看一看。
Producer.cs
using System; using System.Text; using RabbitMQ.Client; namespace _2_Publish { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var chancel = connection.CreateModel()) { //生命交換機 chancel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic); string readMsg = "helloWorld"; while (readMsg.ToLower() != "exit") { var body = Encoding.UTF8.GetBytes(readMsg); //給交換機發送消息 chancel.BasicPublish(exchange: "TopicDemo", routingKey: "Topic.Demo.Key", body: body); Console.WriteLine($"成功發送消息{readMsg}"); Console.WriteLine("請輸入要發送的內容!"); readMsg = Console.ReadLine(); } } } } } }
我給發送消息的routingKey指定爲Topic.Demo.Key
再來看看消費者
Cuustomer.cs
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace _2_Receiver { class Program { static void Main(string[] args) { //建立鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //建立鏈接 using (var connection = factory.CreateConnection()) { //建立會話 using (var channel = connection.CreateModel()) { //聲明一個Fanout類型的交換機 channel.ExchangeDeclare(exchange: "TopicDemo", type: ExchangeType.Topic); //聲明一個消息隊列並獲取它的名字 var queueName = channel.QueueDeclare().QueueName; Console.WriteLine("請輸入RoutingKey!"); var routingKey = Console.ReadLine(); //把消息隊列和交換機綁定 channel.QueueBind(exchange: "TopicDemo", queue: queueName, routingKey: routingKey); //建立消費者 var consume = new EventingBasicConsumer(channel); //把消費者和隊列綁定 channel.BasicConsume(queue: queueName, autoAck: true,consumer: consume); Console.WriteLine("開始監聽消息"); consume.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到消息{message}"); }; Console.ReadLine(); } } } } }
其RoutingKey也是在外部輸入。
咱們看看運行效果
由於Producer發佈消息的RoutingKey是Topic.Demo.Key
又由於#能夠表明0個或者多個單詞 ,*能表明一個單詞
因此*.*.Key Topic.#與Topic.Demo.Key匹配,而其餘兩個*.Key和test.1.2固然是不匹配的,因此沒有收到消息。
對於上面的例子,咱們能夠總結出,編寫一個生產者的過程以下:
建立鏈接工廠-》建立鏈接-》建立會話(Chanel)-》建立交換機(Exchange)-》發送消息
編寫一個生產者的過程以下:
建立鏈接工廠-》建立鏈接-》建立會話(Chanel)-》建立交換機(Exchange)-》建立隊列-》綁定隊列和交換機-》建立消費者-》把消費者和隊列綁定-》監聽消息
掌握這個大的方向,無論交換機怎麼分配,代碼應該都會寫了。
爲何在生產者中和消費者中都要建立交換機呢? 由於咱們不肯定是生產者先執行仍是消費者先執行,因此提早建立一下,避免鏈接時發現沒有建立交換機,出現錯誤,若是交換機已經建立了,那麼默認不會再次建立的。
另外,交換機建立後,同一名稱的交換機使用完不會自動刪除,可是第二次若是建立的名稱和上次同樣,可是交換機類型不同了,那麼便會出現報錯。
這裏總結的是一些RabbitMQ的基礎知識,後面還會繼續寫一些更深刻的使用技巧,若是不想錯過精彩信息,點擊關注一下吧(๑¯◡¯๑)!