消息隊列 RabbitMQ

基本概念

什麼叫消息隊列服務器

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。網絡

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。異步

爲何要用消息隊列分佈式

從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?ide

以常見的訂單系統爲例,用戶點擊下單按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。性能

以上是用於業務解耦的狀況,其它常見場景包括最終一致性、廣播、錯峯流控等等。fetch

RabbitMQ 特色

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。加密

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。spa

RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:操作系統

可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。

靈活的路由(Flexible Routing)
在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。

消息集羣(Clustering)
多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。

高可用(Highly Available Queues)
隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。

多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。

多語言客戶端(Many Clients)
RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。

管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。

跟蹤機制(Tracing)
若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。

插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

RabbitMQ 中的概念模型

消息模型

消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。

上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:

Message消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
Connection
網絡鏈接,好比一個TCP鏈接。
Channel
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
Broker
表示消息隊列服務器實體。

AMQP 中的消息路由

生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

Exchange 類型

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:

direct:消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中

fanout:每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。
topic :交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「 」。#匹配0個或多個單詞,匹配很少很多一個單詞。
安裝:
RabbitMQ官網: http://www.rabbitmq.com/  安裝這裏不作介紹了。

NET使用RabbitMQ

一、首先在生產者端和消費者端引入RabbitMQ
二、生產者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace ConsoleApp3
{
    class Program
    {
        /// <summary>
        /// 鏈接配置
        /// </summary>
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            UserName = "admin",
            Password = "123456",
            Port = 5672,
            VirtualHost = "VirtualHost"
        };
        /// <summary>
        /// 路由名稱
        /// </summary>
        const string ExchangeName = "yinrq.exchange";

        //隊列名稱
        const string QueueName = "yinrq.queue";

        /// <summary>
        /// 路由名稱
        /// </summary>
        const string TopExchangeName = "topic.yinrq.exchange";

        //隊列名稱
        const string TopQueueName = "topic.yinrq.queue";


        static void Main(string[] args)
        {
            DirectExchangeSendMsg();
            // TopicExchangeSendMsg();
            Console.WriteLine("read key");
            Console.ReadKey();
        }
        /// <summary>
        ///  單點精確路由模式
        /// </summary>
        public static void DirectExchangeSendMsg()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//設置交換器的類型
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);//聲明一個隊列
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);//綁定消息隊列

                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    string vadata = Console.ReadLine();
                    while (vadata != "exit")
                    {
                        var msgBody = Encoding.UTF8.GetBytes(vadata);
                        channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
                        Console.WriteLine(string.Format("time{0}==", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
                        vadata = Console.ReadLine();
                    }
                }
            }
        }

        public static void TopicExchangeSendMsg()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                    channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);
                    //var props = channel.CreateBasicProperties();
                    //props.Persistent = true;
                    string vadata = Console.ReadLine();
                    while (vadata != "exit")
                    {
                        var msgBody = Encoding.UTF8.GetBytes(vadata);
                        channel.BasicPublish(exchange: TopExchangeName, routingKey: TopQueueName, basicProperties: null, body: msgBody);
                        Console.WriteLine(string.Format("time:{0}==", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
                        vadata = Console.ReadLine();
                    }
                }
            }
        }
    }
}
View Code

上面的代碼分別建立了兩個路由和兩個隊列,一種是DirectExchange,一種是TopicExchange,驗證時須要生產者和消費者使用同一種的ExChange。

三、消費者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RMQCustomer
{
    class Program
    {
        /// <summary>
        /// 鏈接配置
        /// </summary>
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "127.0.0.1",
            UserName = "admin",
            Password = "123456",
            Port = 15672,
            VirtualHost = "VirtualHost"
        };
        /// <summary>
        /// 路由名稱
        /// </summary>
        const string ExchangeName = "yinrq.exchange";

        //隊列名稱
        const string QueueName = "yinrq.queue";

        /// <summary>
        /// 路由名稱
        /// </summary>
        const string TopExchangeName = "topic.yinrq.exchange";

        //隊列名稱
        const string TopQueueName = "topic.yinrq.queue";


        static void Main(string[] args)
        {
            DirectAcceptExchange();
            //DirectAcceptExchangeEvent();
            //DirectAcceptExchangeTask();
            //TopicAcceptExchange();
            Console.WriteLine("read key");
            Console.ReadKey();
        }

        public static void DirectAcceptExchange()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    while (true)
                    {
                        BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
                        if (msgResponse != null)
                        {
                            var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                            Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        }

                        //BasicGetResult msgResponse2 = channel.BasicGet(QueueName, noAck: false);

                        ////process message ...

                        //channel.BasicAck(msgResponse2.DeliveryTag, multiple: false);
                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    }
                }
            }
        }

        public static void DirectAcceptExchangeEvent()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("time :{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    };
                    channel.BasicConsume(QueueName, autoAck: true, consumer: consumer);

                    //已過期用EventingBasicConsumer代替
                    //var consumer2 = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
                    //var msgResponse = consumer2.Queue.Dequeue(); //blocking
                    //var msgBody2 = Encoding.UTF8.GetString(msgResponse.Body);

                    Console.WriteLine("read key");
                    Console.ReadKey();
                }
            }
        }

        public static void DirectAcceptExchangeTask()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告訴broker同一時間只處理一個消息
                                                                                       //channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        int dots = msgBody.Split('.').Length - 1;
                        System.Threading.Thread.Sleep(dots * 1000);
                        Console.WriteLine(" [x] Done");
                        //處理完成,告訴Broker能夠服務端能夠刪除消息,分配新的消息過來
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //noAck設置false,告訴broker,發送消息以後,消息暫時不要刪除,等消費者處理完成再說
                    channel.BasicConsume(QueueName, autoAck: false, consumer: consumer);

                    Console.WriteLine("按任意值,退出程序");
                    Console.ReadKey();
                }
            }
        }

        public static void TopicAcceptExchange()
        {
            using (IConnection conn = rabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        int dots = msgBody.Split('.').Length - 1;
                        System.Threading.Thread.Sleep(dots * 1000);
                        Console.WriteLine("ok");
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(TopQueueName, autoAck: false, consumer: consumer);

                    Console.WriteLine("read key");
                    Console.ReadKey();
                }
            }
        }

    }
}
View Code

消費者端也是兩個路由兩個隊列,在實現DirectExchange時使用了三種方式,DirectAcceptExchange是基於時間輪詢的,每隔一段時間獲取一次,DirectAcceptExchangeEvent、DirectAcceptExchangeTask是基於事件的,當消息到達時觸發事件,獲取數據。

RabbitMq啓用插件管理

  1. 下載並安裝 Eralng OTP For Windows (vR16B03)    otp_win64_17.0.exe(erlang的環境)
  2. 運行安裝 Rabbit MQ Server Windows Installer (v3.2.3)   rabbitmq-server-3.3.3.exe

rabbitMq的應用,有一個快捷命令行

rabbitmq-plugins enable rabbitmq_management

net stop RabbitMQ && net start RabbitMQ

相關文章
相關標籤/搜索