RabbitMQ入門

1 簡介

RabbitMQ有成千上萬的用戶,是最受歡迎的開源消息代理之一。html

1.1 AMQP是什麼

AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通訊。docker

1.2 消息隊列是什麼

MQ 全稱爲Message Queue, 消息隊列。是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。瀏覽器

2 安裝

經過docker進行安裝安全

首先,進入RabbitMQ官網 http://www.rabbitmq.com/download.html
服務器

而後,找到 Docker image 並進入
找到你須要安裝的版本, -management 表示有管理界面的,能夠瀏覽器訪問。
網絡

接着,接來下docker安裝,我這裏裝的 3-management:app

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

最後,瀏覽器訪問看下:http://localhost:15672/ ,用戶名/密碼: guest/guest
異步

3 使用

3.1 「 Hello World!」

RabbitMQ是消息代理:它接受並轉發消息。您能夠將其視爲郵局:將您要發佈的郵件放在郵箱中時,能夠確保郵遞員先生或女士最終將郵件傳遞給收件人。
在下圖中,「 P」是咱們的生產者,「 C」是咱們的消費者。中間的框是一個隊列
fetch

生產者代碼:spa

using RabbitMQ.Client; //1. 使用名稱空間
using System;
using System.Text;

namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection()) //2. 建立到服務器的鏈接
            using (var channel = connection.CreateModel()) //3. 建立一個通道
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. 聲明要發送到的隊列

                string message = "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. 將消息發佈到隊列

                Console.WriteLine(" 發送消息:{0}", message);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

消費者代碼:使用命名空間,建立服務器鏈接,建立通道,聲明隊列都與生產者代碼一致,增長了將隊列中的消息傳遞給咱們。因爲它將異步地向咱們發送消息,所以咱們提供了回調。這就是EventingBasicConsumer.Received事件處理程序所作的。

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);
                };
                channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

讓咱們來看看輸出結果:

發送端:

發送消息:Hello World!
 Press [enter] to exit.

接收端:

等待消息。
 Press [enter] to exit.
 接收消息:Hello World!

3.2 工做隊列

工做隊列(又稱任務隊列)的主要思想是避免當即執行資源密集型任務,而後必須等待其完成。相反,咱們安排任務在之後完成。咱們將任務封裝爲消息並將其發送到隊列。工做進行在後臺運行並不斷的從隊列中取出任務而後執行。當你運行了多個工做進程時,任務隊列中的任務將會被工做進程共享執行。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;

                channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);
                Console.WriteLine(" 發送消息:{0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);

                    int dots = message.Split('.').Length - 1;
                    Thread.Sleep(dots * 1000);

                    Console.WriteLine(" 接收完成");

                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

循環調度

使用任務隊列的好處是可以很容易的並行工做。若是咱們積壓了不少工做,咱們僅僅經過增長更多的工做者就能夠解決問題,使系統的伸縮性更加容易。

讓咱們來看看輸出結果:

發送端:

\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息1
 發送消息:消息1
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息2
 發送消息:消息2
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息3
 發送消息:消息3
 Press [enter] to exit.


\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp 消息4
 發送消息:消息4
 Press [enter] to exit.

接收端1:

等待消息。
 Press [enter] to exit.
 接收消息:消息1
 接收完成
 接收消息:消息3
 接收完成

接收端2:

等待消息。
 Press [enter] to exit.
 接收消息:消息2
 接收完成
 接收消息:消息4
 接收完成

默認狀況下,RabbitMQ將按順序將每一個消息發送給下一個使用者。平均而言,每一個消費者都會收到相同數量的消息。這種分發消息的方式稱爲循環。與三個或更多的工人一塊兒嘗試。

消息確認

爲了確保消息永不丟失,RabbitMQ支持消息確認。消費者發送回一個確認(acknowledgement),以告知RabbitMQ已經接收,處理了特定的消息,而且RabbitMQ能夠自由刪除它。

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

使用此代碼,咱們能夠確保,即便您在處理消息時使用CTRL + C殺死工做人員,也不會丟失任何信息。工人死亡後不久,全部未確認的消息將從新發送。

消息持久性

咱們已經學會了如何確保即便消費者死亡,任務也不會丟失。可是,若是RabbitMQ服務器中止,咱們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,除非您告知不要這樣作,不然它將忘記隊列和消息。要確保消息不會丟失,須要作兩件事:咱們須要將隊列和消息都標記爲持久。

首先,咱們須要確保該隊列將在RabbitMQ節點重啓後繼續存在。爲此,咱們須要將其聲明爲持久的:

channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

最後,咱們須要將消息標記爲持久性-經過將IBasicProperties.SetPersistent設置爲true。

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

公平派遣

咱們能夠將BasicQos方法與 prefetchCount = 1設置一塊兒使用。這告訴RabbitMQ一次不要給工人一個以上的消息。換句話說,在處理並確認上一條消息以前,不要將新消息發送給工做人員。而是將其分派給不忙的下一個工做程序。

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

3.3 發佈/訂閱

在上一個教程中,咱們建立了一個工做隊列。工做隊列背後的假設是,每一個任務都剛好交付給一個工人。在這一部分中,咱們將作一些徹底不一樣的事情-咱們將消息傳達給多個消費者。這種模式稱爲「發佈/訂閱」。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Text;

namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);

                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);

                channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:{0}", message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

        private static string GetMessage(string[] args)
        {
            return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
        }
    }
}

生產者代碼與上一教程看起來沒有太大不一樣。最重要的變化是咱們如今但願將消息發佈到 Example.RabbitMQ.PublishSubscribe 交換器,而不是無名的消息交換器。交換類型有如下幾種:direct,topic,headers 和fanout,在這裏咱們採用fanout交換類型。

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);

                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    byte[] body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" 接收消息:{0}", message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

若是沒有隊列綁定到交換機,則消息將丟失,但這對咱們來講是能夠的。若是沒有消費者在聽,咱們能夠安全地丟棄該消息。

3.4 路由

在上一個教程中,咱們建立了一個發佈/訂閱。咱們可以向許多接收者廣播消息。在本教程中,咱們將向其中添加功能-將消息分類指定給具體的訂閱者。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Example.RabbitMQ.Routing.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");

                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:'{0}':'{1}'", severity, message);
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var severity in args)
                {
                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);
                }

                Console.WriteLine(" 等待消息。");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}

3.5 話題

在上一個教程中,咱們改進了消息系統。代替使用僅能進行虛擬廣播的扇出交換機,咱們使用直接交換機,並有選擇地接收消息的可能性。

儘管使用直接交換對咱們的系統進行了改進,但它仍然存在侷限性-它沒法基於多個條件進行路由。

*(星號)能夠代替一個單詞。
#(哈希)能夠替代零個或多個單詞。

生產者代碼:

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;

namespace Example.RabbitMQ.Topics.Producer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");

                var routingKey = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);
                Console.WriteLine(" 發送消息:'{0}':'{1}'", routingKey, message);
            }
        }
    }
}

消費者代碼:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
                var queueName = channel.QueueDeclare().QueueName;

                if (args.Length < 1)
                {
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }

                foreach (var bindingKey in args)
                {
                    channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);
                }

                Console.WriteLine(" 等待消息。 To exit press CTRL+C");

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" 接收消息:'{0}':'{1}'", routingKey, message);
                };
                channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
}
相關文章
相關標籤/搜索