RabbitMQ有成千上萬的用戶,是最受歡迎的開源消息代理之一。html
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通訊。docker
MQ 全稱爲Message Queue, 消息隊列。是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。瀏覽器
經過docker進行安裝安全
首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html
服務器
而後,找到 Docker image 並進入
找到你須要安裝的版本, -management 表示有管理界面的,能夠瀏覽器訪問。
網絡
接着,接來下docker安裝,我這裏裝的 3-management:app
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
最後,瀏覽器訪問看下:http://localhost:15672/ ,用戶名/密碼: guest/guest
異步
RabbitMQ是消息代理:它接受並轉發消息。您能夠將其視爲郵局:將您要發佈的郵件放在郵箱中時,能夠確保郵遞員先生或女士最終將郵件傳遞給收件人。
在下圖中,「 P」是咱們的生產者,「 C」是咱們的消費者。中間的框是一個隊列
fetch
生產者代碼:spa
using RabbitMQ.Client; //1. 使用名稱空間 using System; using System.Text; namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) //2. 建立到服務器的鏈接 using (var channel = connection.CreateModel()) //3. 建立一個通道 { channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 聲明要發送到的隊列 string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將消息發佈到隊列 Console.WriteLine(" 發送消息:{0}", message); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
消費者代碼:使用命名空間,建立服務器鏈接,建立通道,聲明隊列都與生產者代碼一致,增長了將隊列中的消息傳遞給咱們。因爲它將異步地向咱們發送消息,所以咱們提供了回調。這就是EventingBasicConsumer.Received事件處理程序所作的。
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); Console.WriteLine(" 等待消息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收消息:{0}", message); }; channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
讓咱們來看看輸出結果:
發送端:
發送消息:Hello World! Press [enter] to exit.
接收端:
等待消息。 Press [enter] to exit. 接收消息:Hello World!
工做隊列(又稱任務隊列)的主要思想是避免當即執行資源密集型任務,而後必須等待其完成。相反,咱們安排任務在之後完成。咱們將任務封裝爲消息並將其發送到隊列。工做進行在後臺運行並不斷的從隊列中取出任務而後執行。當你運行了多個工做進程時,任務隊列中的任務將會被工做進程共享執行。
生產者代碼:
using RabbitMQ.Client; using System; using System.Text; namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body); Console.WriteLine(" 發送消息:{0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return args.Length > 0 ? string.Join(" ", args) : "Hello World!"; } } }
消費者代碼:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); Console.WriteLine(" 等待消息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收消息:{0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine(" 接收完成"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
使用任務隊列的好處是可以很容易的並行工做。若是咱們積壓了不少工做,咱們僅僅經過增長更多的工做者就能夠解決問題,使系統的伸縮性更加容易。
讓咱們來看看輸出結果:
發送端:
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息1 發送消息:消息1 Press [enter] to exit. \bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息2 發送消息:消息2 Press [enter] to exit. \bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息3 發送消息:消息3 Press [enter] to exit. \bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息4 發送消息:消息4 Press [enter] to exit.
接收端1:
等待消息。 Press [enter] to exit. 接收消息:消息1 接收完成 接收消息:消息3 接收完成
接收端2:
等待消息。 Press [enter] to exit. 接收消息:消息2 接收完成 接收消息:消息4 接收完成
默認狀況下,RabbitMQ將按順序將每一個消息發送給下一個使用者。平均而言,每一個消費者都會收到相同數量的消息。這種分發消息的方式稱爲循環。與三個或更多的工人一塊兒嘗試。
爲了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的消息,而且RabbitMQ能夠自由刪除它。
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
使用此代碼,咱們能夠確保,即便您在處理消息時使用CTRL + C殺死工做人員,也不會丟失任何信息。工人死亡後不久,全部未確認的消息將從新發送。
咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是,若是RabbitMQ服務器中止,咱們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,除非您告知不要這樣作,不然它將忘記隊列和消息。要確保消息不會丟失,須要作兩件事:咱們須要將隊列和消息都標記爲持久。
首先,咱們須要確保該隊列將在RabbitMQ節點重啓後繼續存在。爲此,咱們須要將其聲明爲持久的:
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
最後,咱們須要將消息標記爲持久性-經過將IBasicProperties.SetPersistent設置爲true。
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
咱們能夠將BasicQos方法與 prefetchCount = 1設置一塊兒使用。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息以前,不要將新消息發送給工做人員。而是將其分派給不忙的下一個工做程序。
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
在上一個教程中,咱們建立了一個工做隊列。工做隊列背後的假設是,每一個任務都剛好交付給一個工人。在這一部分中,咱們將作一些徹底不一樣的事情-咱們將消息傳達給多個消費者。這種模式稱爲「發佈/訂閱」。
生產者代碼:
using RabbitMQ.Client; using System; using System.Text; namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" 發送消息:{0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!"; } } }
生產者代碼與上一教程看起來沒有太大不一樣。最重要的變化是咱們如今但願將消息發佈到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的消息交換器。交換類型有如下幾種:direct,topic,headers 和fanout,在這裏咱們採用fanout交換類型。
消費者代碼:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: ""); Console.WriteLine(" 等待消息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine(" 接收消息:{0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
若是沒有隊列綁定到交換機,則消息將丟失,但這對咱們來講是能夠的。若是沒有消費者在聽,咱們能夠安全地丟棄該消息。
在上一個教程中,咱們建立了一個發佈/訂閱。咱們可以向許多接收者廣播消息。在本教程中,咱們將向其中添加功能-將消息分類指定給具體的訂閱者。
生產者代碼:
using RabbitMQ.Client; using System; using System.Linq; using System.Text; namespace Example.RabbitMQ.Routing.Producer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct"); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" 發送消息:'{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
消費者代碼:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity); } Console.WriteLine(" 等待消息。"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }
在上一個教程中,咱們改進了消息系統。代替使用僅能進行虛擬廣播的扇出交換機,咱們使用直接交換機,並有選擇地接收消息的可能性。
儘管使用直接交換對咱們的系統進行了改進,但它仍然存在侷限性-它沒法基於多個條件進行路由。
*(星號)能夠代替一個單詞。
#(哈希)能夠替代零個或多個單詞。
生產者代碼:
using RabbitMQ.Client; using System; using System.Linq; using System.Text; namespace Example.RabbitMQ.Topics.Producer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic"); var routingKey = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body); Console.WriteLine(" 發送消息:'{0}':'{1}'", routingKey, message); } } } }
消費者代碼:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic"); var queueName = channel.QueueDeclare().QueueName; if (args.Length < 1) { Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach (var bindingKey in args) { channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey); } Console.WriteLine(" 等待消息。 To exit press CTRL+C"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } }