簡介算法
RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息中間件,基於Erlang語言編寫。數組
AMQP是什麼網絡
AMQP 0-9-1(高級消息隊列協議)是一種消息傳遞協議,它容許一致的客戶端應用程序與一致的消息傳遞中間件代理進行通訊。學習
消息傳遞代理接收來自發布者(發佈它們的應用程序,也稱爲生產者)的消息,並將它們路由到消費者(處理它們的應用程序)。spa
因爲它是一個網絡協議,發佈者、消費者和代理均可以駐留在不一樣的機器上。插件
AMQP 0-9-1模型簡介代理
AMQP 0-9-1模型具備如下世界視圖:消息發佈到交換,這一般與郵局或郵箱進行比較。交換而後使用名爲綁定的規則將消息副本分發到隊列。而後,代理將消息傳遞給訂閱隊列的消費者,或者消費者根據須要從隊列獲取/拉取消息。code
發佈消息時,發佈者能夠指定各類消息屬性(消息元數據)。有些元數據能夠由代理使用,可是,其他的元數據對代理是徹底不透明的,只能由接收消息的應用程序使用。中間件
網絡不可靠,應用程序可能沒法處理消息,所以AMQP 0-9-1模型具備消息確認的概念:當消息傳遞給消費者時,消費者會自動或在應用程序開發人員選擇時當即通知代理。當消息確認正在使用時,代理將僅在收到消息(或消息組)通知時從隊列中徹底刪除消息。對象
例如,在某些狀況下,當消息沒法路由時,消息可能會返回給發佈者、丟棄,或者,若是代理實現擴展,則將消息放入所謂的「死信隊列」。發佈者經過使用某些參數發佈消息來選擇如何處理這種狀況。
隊列、交換和綁定統稱爲AMQP實體。
交換和交換類型
Rabbitmq中的核心思想,生產者不會把消息直接發送到隊列當中,通常生產者是向交換機發送消息,交換機把消息推送到隊列當中。
交換機是發送消息的實體,交換機接收消息並將消息路由到零個或者多個隊列當中,使用的路由算法取決於綁定的交換類型和規則,所以AMQP 0-9-1提供瞭如下四種交換類型:
除了交換類型以外,還使用許多屬性聲明交換其中最重要的是:
交換能夠是持久的,也能夠是暫時的。持久性交易所能在經紀重啓後存活下來,而短暫性交易所則不能(當經紀從新上線時,必須從新申報)。並不是全部的場景和用例都須要持久的交換。
本文主要記錄了學習RabbitMQ
開發準備
1.建立RabbitMQHelper工程文件
2.安裝依賴
經過程序包管理器控制檯或者nuget安裝RabbitMQ.Client,默認項目選擇RabbitMQHelper
install-pack RabbitMQ.client
2.1 添加幫助類所需實體
添加交換機實體
using System; using System.Collections.Generic; using System.Text; namespace RabbitMQHelper.Model { /// <summary> /// 交換機實體 /// </summary> public class ExchangeModel { /// <summary> /// 交換機名稱 /// </summary> public string ExchangeName { get; set; } /// <summary> /// 交換機類型 /// </summary> public string ExchangeType { get; set; } /// <summary> /// 路由key /// </summary> public string RouteKey { get; set; } /// <summary> /// 是否持久化 /// </summary> public bool Durable { get; set; } } }
添加連接實體
using System; using System.Collections.Generic; using System.Text; namespace RabbitMQHelper.Model { /// <summary> /// 鏈接實體 /// </summary> public class HostModel { /// <summary> /// 客戶端帳號 /// </summary> public string UserName { get; set; } /// <summary> /// 客戶端密碼 /// </summary> public string PassWord { get; set; } /// <summary> /// 鏈接地址 /// </summary> public string Host { get; set; } /// <summary> /// 端口號 /// </summary> public int Port { get; set; } public ExChangeModel ExChangeModel { get; set; } /// <summary> /// 虛擬路徑 /// </summary> public string VirtualHost { get; set; } } /// <summary> /// RabbitMq實體 /// </summary> public class ExChangeModel { /// <summary> /// 隊列名稱 /// </summary> public string QueueName { get; set; } /// <summary> /// 路由名稱 /// </summary> public string RouteKey { get; set; } /// <summary> /// 交換機名稱 /// </summary> public string ExChangeName { get; set; } } }
2.3 添加持久化連接單例
此處主要是考慮到若是每一次請求都去建立一個鏈接的話,比較耗時,並且rabbitMQ官方也建議使用長鏈接的方式進行通訊,因此此處用一個單例進行存儲鏈接信息
using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQHelper.core { public class RabbitMQSingleton { private static RabbitMQSingleton rabbitServicesSingleton; static RabbitMQSingleton() { rabbitServicesSingleton = new RabbitMQSingleton(); } private Dictionary<string, IConnection> RabbitMQconn = new Dictionary<string, IConnection>(); public static RabbitMQSingleton GetInstance() { return rabbitServicesSingleton; } /// <summary> /// 添加MQ鏈接 /// </summary> /// <param name="key">鏈接名</param> /// <param name="value">內容</param> public void SetRabbitMqConn(string key, IConnection value) { if (!RabbitMQconn.ContainsKey(key)) { RabbitMQconn.Add(key, value); } } /// <summary> /// 獲取rabbitmq全部鏈接信息 /// </summary> /// <returns></returns> public Dictionary<string, IConnection> GetRabbitMQconn() { return RabbitMQconn; } /// <summary> /// 移除鏈接 /// </summary> /// <param name="key"></param> /// <returns></returns> public bool RemoveMQconn(string key) { bool sflag = true; try { if (RabbitMQconn.ContainsKey(key)) { RabbitMQconn.Remove(key); } } catch (Exception) { sflag = false; throw; } return sflag; } } }
2.4.添加輔助類
using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQHelper.Model; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; namespace RabbitMQHelper { public class RabbitHelper { private readonly ConnectionFactory factory = null; public RabbitHelper(HostModel hostModel) { // 建立鏈接工廠 factory = new ConnectionFactory { UserName = hostModel.UserName,//鏈接用戶名 Password = hostModel.PassWord,//鏈接密碼 HostName = "localhost",//鏈接地址 Port = hostModel.Port,//端口號 //VirtualHost = hostModel.VirtualHost }; } /// <summary> /// 建立鏈接 /// </summary> /// <returns></returns> public IConnection Connection() { IConnection connection = null; try { factory.AutomaticRecoveryEnabled = true;//自動重連 connection = factory.CreateConnection(); } catch (Exception) { throw new Exception("鏈接失敗!~~~~~"); } return connection; } /// <summary> /// /// </summary> /// <param name="conn"></param> /// <param name="queueName"></param> /// <param name="msg"></param> /// <param name="exchangeModel"></param> /// <returns></returns> public bool sendMsg(IConnection conn, string queueName, string msg, ExchangeModel exchangeModel) { bool sflag = true; try { //var channel = conn.CreateModel(); using (var channel = conn.CreateModel()) { //1交換機,交換機類型 channel.ExchangeDeclare(exchangeModel.ExchangeName, exchangeModel.ExchangeType); //隊列名稱,是否持久化,獨佔的隊列,不使用時是否自動刪除, channel.QueueDeclare(queueName, exchangeModel.Durable,false,false,null); //轉換成byte數組 var sendBytes = Encoding.UTF8.GetBytes(msg); //設置持久化參數 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2表示持久化 if (!exchangeModel.Durable) { properties = null; } //發送消息:交換機名稱,路由,持久化參數,消息內容 channel.BasicPublish(exchangeModel.ExchangeName, exchangeModel.RouteKey, properties, sendBytes); } } catch (Exception) { sflag = true; throw; } return sflag; } /// <summary> /// 接收消息 /// </summary> /// <param name="connection"></param> /// <param name="queueName"></param> /// <param name="durable"></param> /// <returns></returns> public string ConsumMsg(IConnection connection,string queueName, ExchangeModel exchangeModel) { string msg = string.Empty; var channel = connection.CreateModel(); //隊列綁定:隊列名稱,交換機名稱,路由 channel.QueueBind(queueName, exchangeModel.ExchangeName, exchangeModel.RouteKey, null); var consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); msg = message; Console.WriteLine($"收到消息: {message}"); //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啓動消費者 設置爲手動應答消息 channel.BasicConsume(queueName, false, consumer); return msg; } } }
2.5添加生產者類
using RabbitMQ.Client; using RabbitMQHelper.core; using RabbitMQHelper.IServer; using RabbitMQHelper.Model; using System; using System.Collections.Generic; using System.Text; namespace RabbitMQHelper.Server { public class SendService : ISendService { IConnection connection;//rabbitmq鏈接地址 private object obj=new object();//對象 RabbitHelper rabbitHelper;//rabbitHelper類 public SendService() { RabbitMQSingleton rabbitMQSingleton = RabbitMQSingleton.GetInstance(); Dictionary<string, IConnection> connDictionary = rabbitMQSingleton.GetRabbitMQconn(); if (connDictionary != null && connDictionary.Count > 0) { connection = connDictionary["test"]; } else { HostModel hostModel = new HostModel(); hostModel.UserName = "admin"; hostModel.PassWord = "admin"; hostModel.Host = "127.0.0.1"; hostModel.Port = 5672; //hostModel.VirtualHost = "/"; lock (obj) { rabbitHelper = new RabbitHelper(hostModel); connection = rabbitHelper.Connection(); rabbitMQSingleton.SetRabbitMqConn("test", connection); } } } public bool CloseConnection() { throw new NotImplementedException(); } public void SendMsg(string queueName, string msg) { ExchangeModel exchangeModel = new ExchangeModel(); exchangeModel.Durable = false; exchangeModel.ExchangeName = "ClentName"; exchangeModel.ExchangeType = ExchangeType.Direct; exchangeModel.RouteKey = "ClentRoute"; rabbitHelper.sendMsg(connection, queueName, msg, exchangeModel); } //public void SendMsg(string msg) //{ // throw new NotImplementedException(); //} } }
2.6添加消費者
using RabbitMQ.Client; using RabbitMQHelper.core; using RabbitMQHelper.Model; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; namespace RabbitMQHelper.Server { public class ConsumerServer { IConnection connection;//rabbitmq鏈接地址 private object obj = new object();//對象 RabbitHelper rabbitHelper;//rabbitHelper類 public ConsumerServer() { RabbitMQSingleton rabbitMQSingleton = RabbitMQSingleton.GetInstance(); Dictionary<string, IConnection> connDictionary = rabbitMQSingleton.GetRabbitMQconn(); if (connDictionary != null && connDictionary.Count > 0) { connection = connDictionary["test"]; } else { HostModel hostModel = new HostModel(); hostModel.UserName = "admin"; hostModel.PassWord = "admin"; hostModel.Host = "127.0.0.1"; hostModel.Port = 5672; //hostModel.VirtualHost = "/"; lock (obj) { rabbitHelper = new RabbitHelper(hostModel); connection = rabbitHelper.Connection(); rabbitMQSingleton.SetRabbitMqConn("test", connection); } } } public string GetMsg(string queueName) { ExchangeModel exchangeModel = new ExchangeModel(); exchangeModel.ExchangeName = "ClentName"; exchangeModel.ExchangeType = ExchangeType.Direct; exchangeModel.RouteKey = "ClentRoute"; return rabbitHelper.ConsumMsg(connection, queueName, exchangeModel); } } }
3.添加RibbitMQClient項目(消息生產者)
建立控制檯程序
using RabbitMQ.Client; using RabbitMQHelper.IServer; using RabbitMQHelper.Server; using System; namespace RibbitMQClient { class Program { static void Main(string[] args) { Console.WriteLine("消息生產者開始生產數據!"); Console.WriteLine("輸入exit退出!"); ISendService sendService = new SendService(); string input; do { input = Console.ReadLine(); sendService.SendMsg("Clent1", input); } while (input.Trim().ToLower() != "exit"); } } }
4.建立RibbitMQServer 消費者模式
using RabbitMQHelper.Server; using System; using System.Text; namespace RibbitMQServer { class Program { static void Main(string[] args) { Console.WriteLine("Hello World!"); ConsumerServer consumerServer = new ConsumerServer(); consumerServer.GetMsg("Clent1"); Console.WriteLine("消費者已啓動"); Console.ReadKey(); } } }
5.運行程序
可正常接收數據
標註:若有理解不對的還請多指教,或者有好的實現方式也可一塊兒討論。