什麼叫消息隊列服務器
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。網絡
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而不論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。異步
爲何要用消息隊列分佈式
從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?ide
以常見的訂單系統爲例,用戶點擊下單按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。性能
以上是用於業務解耦的狀況,其它常見場景包括最終一致性、廣播、錯峯流控等等。fetch
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。加密
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。spa
RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:操作系統
可靠性(Reliability)
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
靈活的路由(Flexible Routing)
在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。
消息集羣(Clustering)
多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。
高可用(Highly Available Queues)
隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。
多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。
多語言客戶端(Many Clients)
RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing)
若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。
插件機制(Plugin System)
RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。
消息模型
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:
Message消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
Connection
網絡鏈接,好比一個TCP鏈接。
Channel
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
Broker
表示消息隊列服務器實體。
生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:
direct:消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; namespace ConsoleApp3 { class Program { /// <summary> /// 鏈接配置 /// </summary> private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { UserName = "admin", Password = "123456", Port = 5672, VirtualHost = "VirtualHost" }; /// <summary> /// 路由名稱 /// </summary> const string ExchangeName = "yinrq.exchange"; //隊列名稱 const string QueueName = "yinrq.queue"; /// <summary> /// 路由名稱 /// </summary> const string TopExchangeName = "topic.yinrq.exchange"; //隊列名稱 const string TopQueueName = "topic.yinrq.queue"; static void Main(string[] args) { DirectExchangeSendMsg(); // TopicExchangeSendMsg(); Console.WriteLine("read key"); Console.ReadKey(); } /// <summary> /// 單點精確路由模式 /// </summary> public static void DirectExchangeSendMsg() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//設置交換器的類型 channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);//聲明一個隊列 channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);//綁定消息隊列 var props = channel.CreateBasicProperties(); props.Persistent = true; string vadata = Console.ReadLine(); while (vadata != "exit") { var msgBody = Encoding.UTF8.GetBytes(vadata); channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody); Console.WriteLine(string.Format("time{0}==", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"))); vadata = Console.ReadLine(); } } } } public static void TopicExchangeSendMsg() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null); channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName); //var props = channel.CreateBasicProperties(); //props.Persistent = true; string vadata = Console.ReadLine(); while (vadata != "exit") { var msgBody = Encoding.UTF8.GetBytes(vadata); channel.BasicPublish(exchange: TopExchangeName, routingKey: TopQueueName, basicProperties: null, body: msgBody); Console.WriteLine(string.Format("time:{0}==", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"))); vadata = Console.ReadLine(); } } } } } }
上面的代碼分別建立了兩個路由和兩個隊列,一種是DirectExchange,一種是TopicExchange,驗證時須要生產者和消費者使用同一種的ExChange。
三、消費者
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RMQCustomer { class Program { /// <summary> /// 鏈接配置 /// </summary> private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "admin", Password = "123456", Port = 15672, VirtualHost = "VirtualHost" }; /// <summary> /// 路由名稱 /// </summary> const string ExchangeName = "yinrq.exchange"; //隊列名稱 const string QueueName = "yinrq.queue"; /// <summary> /// 路由名稱 /// </summary> const string TopExchangeName = "topic.yinrq.exchange"; //隊列名稱 const string TopQueueName = "topic.yinrq.queue"; static void Main(string[] args) { DirectAcceptExchange(); //DirectAcceptExchangeEvent(); //DirectAcceptExchangeTask(); //TopicAcceptExchange(); Console.WriteLine("read key"); Console.ReadKey(); } public static void DirectAcceptExchange() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); while (true) { BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true); if (msgResponse != null) { var msgBody = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); } //BasicGetResult msgResponse2 = channel.BasicGet(QueueName, noAck: false); ////process message ... //channel.BasicAck(msgResponse2.DeliveryTag, multiple: false); System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1)); } } } } public static void DirectAcceptExchangeEvent() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msgBody = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(string.Format("time :{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); }; channel.BasicConsume(QueueName, autoAck: true, consumer: consumer); //已過期用EventingBasicConsumer代替 //var consumer2 = new QueueingBasicConsumer(channel); //channel.BasicConsume(QueueName, noAck: true, consumer: consumer); //var msgResponse = consumer2.Queue.Dequeue(); //blocking //var msgBody2 = Encoding.UTF8.GetString(msgResponse.Body); Console.WriteLine("read key"); Console.ReadKey(); } } } public static void DirectAcceptExchangeTask() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { //channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告訴broker同一時間只處理一個消息 //channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msgBody = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); int dots = msgBody.Split('.').Length - 1; System.Threading.Thread.Sleep(dots * 1000); Console.WriteLine(" [x] Done"); //處理完成,告訴Broker能夠服務端能夠刪除消息,分配新的消息過來 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //noAck設置false,告訴broker,發送消息以後,消息暫時不要刪除,等消費者處理完成再說 channel.BasicConsume(QueueName, autoAck: false, consumer: consumer); Console.WriteLine("按任意值,退出程序"); Console.ReadKey(); } } } public static void TopicAcceptExchange() { using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null); channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msgBody = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(string.Format("time:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody)); int dots = msgBody.Split('.').Length - 1; System.Threading.Thread.Sleep(dots * 1000); Console.WriteLine("ok"); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(TopQueueName, autoAck: false, consumer: consumer); Console.WriteLine("read key"); Console.ReadKey(); } } } } }
消費者端也是兩個路由兩個隊列,在實現DirectExchange時使用了三種方式,DirectAcceptExchange是基於時間輪詢的,每隔一段時間獲取一次,DirectAcceptExchangeEvent、DirectAcceptExchangeTask是基於事件的,當消息到達時觸發事件,獲取數據。
rabbitMq的應用,有一個快捷命令行
rabbitmq-plugins enable rabbitmq_management
net stop RabbitMQ && net start RabbitMQ