RabbitMQ是由erlang語言開發的一個基於AMQP(Advanced Message Queuing Protocol)協議的企業級消息隊列中間件。可實現隊列,訂閱/發佈,路由,通配符等工做模式。html
安裝erlang語言運行環境
https://erlang.org/download/otp_win64_23.2.exe
下載後直接下一步便可linux
安裝RabbitMQ
https://www.rabbitmq.com/install-windows.html#installer
直接點擊安裝下一步便可按章程序員
安裝RabbitMQ的Web管理平臺shell
RabbitMQ的管理平臺是經過插件的形式使用,須要手動啓用管理平臺
在Windows下,RabbitMQ默認被安裝到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打開sbin ,在cmd或者powershell中執行
rabbitmq-plugins.bat enable rabbitmq_management
數據庫
安裝完成後,瀏覽器打開 http://localhost:15672/#/ 便可看到RabbitMQ的管理界面。輸入默認帳號密碼 guest 成功登陸。windows
發送消息的端瀏覽器
獲取消息並處理的端網絡
一個終端鏈接。每個Connection均可以在RabbitMQ後臺看到異步
Channel是創建在Connection上的一個虛擬通訊管道。通常狀況下,往消息隊列中寫入多條消息,爲了避免每條消息都創建一個TCP鏈接,因此RabbitMQ的作法是多條消息能夠公用一個Connection,大大提升MQ的負載能力。分佈式
Exchange是一個虛擬交換機。每一條消息都必需要經過交換機才能能進入對應的隊列,能夠理解爲網絡設備中的交換機,是一個意思。
Queue是一個存儲消息的內部對象,全部的Rabbit MQ消息都存儲在Queue中。生產者所生產的消息會存儲在Queue中,消費者獲取的消息也是從Queue中獲取。
dotnet add package RabbitMQ.Client
const string QUEUENAME = "HELLO_MQ"; //建立鏈接對象工廠 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默認的端口 }; while (true) { using var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); Console.WriteLine("輸入生產內容:"); var input = Console.ReadLine(); chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在循環中,輸入一個值,按下enter,便可推送一條消息到隊列。
也能夠直接在RabbitMQ的管理後臺查看
能夠看到咱們發送的消息已經被RabbitMQ存儲在Queue中了。只等某個幸運的消費者前來消費。
const string QUEUENAME = "HELLO_MQ"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var chanel = conn.CreateModel(); chanel.QueueDeclare(QUEUENAME, true, false, false); EventingBasicConsumer consumer = new EventingBasicConsumer(chanel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); chanel.BasicAck(e.DeliveryTag, true); //收到回覆後,RabbitMQ會直接在隊列中刪除這條消息 }; chanel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啓動成功"); Console.ReadLine();
啓動成功後,consumer的Received方法,會收到一條來自MQ的消息,
若是處理完成後,不調用chennel的BasicAck方法,那麼這條消息依然會存在,下次有消費者出現,會再次推送給消費者。
簡單的RabbitMQ Hello World到這裏就算完成了。接下來就是稍微高級一點的應用
工做隊列模式的意思就是一個生產者對應多個消費者。RabbitMQ會使用輪詢去給每一個消費者發送消息。
發佈訂閱模式是屬於比較用多的一種。
發佈訂閱,是由交換機發布消息給多個隊列。多個隊列再對應多個消費者。
發佈訂閱模式對應的交換機類型的fanout。
A
const string QUEUENAME = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回覆後,RabbitMQ會直接在隊列中刪除這條消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啓動成功"); Console.ReadLine();
B
const string QUEUENAME = "HELLO_MQ"; const string TESTEXCHANGE = "TESTEXCHANGE"; var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, }; var conn = factory.CreateConnection(); var channel = conn.CreateModel(); //定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, ""); var consumer = new EventingBasicConsumer(channel); consumer.Received += (a, e) => { Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray())); channel.BasicAck(e.DeliveryTag, true); //收到回覆後,RabbitMQ會直接在隊列中刪除這條消息 }; channel.BasicConsume(QUEUENAME, false, consumer); Console.WriteLine("啓動成功"); Console.ReadLine();
const string QUEUENAME = "HELLO_MQ"; const string QUEUENAME_B = "HELLO_MQ_B"; const string TESTEXCHANGE = "TESTEXCHANGE"; //建立鏈接對象工廠 var factory = new ConnectionFactory() { UserName = "guest", Password = "guest", HostName = "localhost", Port = 5672, //RabbitMQ默認的端口 }; using var conn = factory.CreateConnection(); while (true) { var channel = conn.CreateModel(); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false); Console.WriteLine("輸入生產內容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input)); }
在生產者運行成功後,RabbitMQ後臺會出現一個交換機,點擊交換機會看到交換機下綁定了兩個隊列
從生產者發送消息到隊列,兩個消費者會同時收到消息
routing模式對應的交換機類型是direct,和發佈訂閱模式的區別在於:routing模式下,能夠指定一個routingkey,用於區分消息
生產者
var channel = conn.CreateModel(); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 Console.WriteLine("輸入生產內容:"); var input = Console.ReadLine(); channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
消費者 A
//定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");
消費者 B
//定義隊列 channel.QueueDeclare(QUEUENAME, true, false, false); //定義交換機 channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false); //綁定隊列到交換機 channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");
綁定成功後,發送消息,消費者A能夠收到消息,消費者B沒法收到消息。
若是遇到指定routingKey生產一條消息,結果 AB消費者都收到的狀況。建議在RabbitMQ後臺的交換機下看一下綁定的Queue是否重複綁定了多個routingKey.
在通配符模式下,RabbitMQ使用模糊匹配來決定把消息推送給哪一個生產者。通配符有兩個符號來匹配routingKey
其餘的操做基本和routing模式同樣。
header模式是把routingkey放到header中.取消掉了routingKey。並使用一個字典傳遞 K、V的方式來匹配。
好比同時要給用戶發送郵件和短信,可直接經過header的鍵值對來匹配綁定的值,把消息傳遞給發短信和郵件的生產者.
成都南門這邊有招BS方向高級.NET程序員的公司嗎? 有的話,請私聊我。或者加我QQ:862640563
博客地址:https://www.cnblogs.com/boxrice/ 轉載請註明出處