消息隊列核心目的:做爲第三方服務,完成消息的(時序)處理。html
RabbitMQ中,將這個消息中間件的流程,抽象成幾種元素的調度關係:web
Producer: 建立消息的主體安全
Exchange,交換器,負責按照消息的屬性,分配對應的隊列服務器
Queue,隊列,負責存放消息的數據結構數據結構
Consumer,負責處理消息的主體dom
RabbitMQ地址:https://www.rabbitmq.com/download.htmlide
Erlang地址:https://www.erlang.org/downloads測試
下載安裝便可,值得注意的時,RabbitMQ是基於Erlang完成的,因此還須要先安裝Erlangspa
安裝完畢,能夠啓用web管理3d
執行命令:
rabbitmq-plugins enable rabbitmq_management
重啓RabbitMQ:
rabbitmq-service stop rabbitmq-service start
Web管理默認端口爲15672
默認用戶名:guest / guest
若使用雲服務器,記得查看端口安全規則,是否放行15672,5672兩個端口
實現一個單隊列的消息隊列
經過網頁管理,建立一個指定的隊列TestMsgQueue,避免在後續代碼中申明,編寫額外代碼。
一、生成者,隨機生成10條消息
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace RabbitMQ_Producer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) { for (int i = 0; i < 10; i++) { System.Threading.Thread.Sleep(20); Task.Factory.StartNew(() => { using (var channel = connection.CreateModel()) { Random r = new Random(); int n = r.Next(2000); string message = "Hello World! Message Num:" + n; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "TestMsgExchange", routingKey: "TestMsgQueue", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } }); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
二、消費者,監聽消息隊列進行處理
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace RabbitRQ_Consumer { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); //模擬消費者的處理時間 Random r = new Random(); int n = r.Next(2000); System.Threading.Thread.Sleep(n); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "TestMsgQueue", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }