這幾天呢,公司風波再起,去年一年公司CTO換啦4任,CEO換啦三個,這不剛來個新老大,感受還不錯,卻沒幹過3個月又要走,索性趁老大們走來走去的時候,就給本身空出來,稍稍總結一下剛寫的一個日誌服務組件中用到的RabbitMQ,在.net中的實戰中應用。html
首先不去討論個人日誌組件怎麼樣。由於有些日誌須要走網絡,有的又不須要走網路,也是有性能與業務場景的多般變化在其中,就把他拋開,咱們只談消息RabbitMQ。算法
那麼什麼是RabbitMQ,它是用來解決什麼問題的,性能如何,又怎麼用?我會在下面一一闡述,若有錯誤,不到之處,還望你們不吝賜教。安全
必須一提的是rabbitmq是由LShift提供的一個消息隊列協議(AMQP)的開源實現,由以高性能、健壯以及可伸縮性出名的Erlang寫成(所以也是繼承了這些優勢)。網絡
百度百科對RabbitMQ闡述也很是明確,建議去看下,還有amqp協議。負載均衡
RabbitMQ官網:http://www.rabbitmq.com/ 若是你要下載安裝,那麼必須先把Erlang語言裝上。性能
RabbitMQ的.net客戶端,能夠在nuget中輸入rabbitmq輕鬆得到。測試
RabbitMQ與其餘消息隊列的對比,早有仙人給寫出來。 Message Queue Shootoutspa
這篇文章中的測試案例爲:1百萬條1k的消息,每秒種的收發狀況以下圖。.net
若是你安裝好啦,rabbitmq,他會提供一個操做監控頁面,頁面以下,他幾乎提供啦,對rabbitmq的全部操做,與監控,因此,你裝上後,本身多看看,多操做下。3d
從上圖的標題中能夠看到一些陌生的英文單詞,讓咱們感受一無所知,更無從操做,那麼我給你們弄啦一個圖片你們能夠看下,或許對您理解這些新鮮的單詞有所幫助。
看過這些名詞,以後,或許你還毫無頭緒,那麼我把消息從生產到消費的整個流程給你們說一下,或許會更深刻一點,其中Exchange,與Queue都是能夠設置相關屬性,隊列的持久化,交換器類型制定。
Note:首先這個過程走分三個部分,一、客戶端(生產消息隊列),二、RabbitMQ服務端(負責路由規則的綁定與消息的分發),三、客戶端(消費消息隊列中的消息)
Note:由圖能夠看出,一個消息能夠走一次網絡卻被分發到不一樣的消息隊列中,而後被多個的客戶端消費,那麼這個過程就是RabbitMQ的核心機制,RabbitMQ的路由類型與消費模式。
類型有4種,direct,fanout,topic,headers。其中headers不經常使用,本篇不作介紹,其餘三種類型,會作詳細介紹。
那麼這些類型是什麼意思呢?就是Exchange與隊列進行綁定後,消息根據exchang的類型,按照不一樣的綁定規則分發消息到消息隊列中,能夠是一個消息被分發給多個消息隊列,也能夠是一個消息分發到一個消息隊列。具體請看下文。
介紹之初還要說下RoutingKey,這是個什麼玩意呢?他是exchange與消息隊列綁定中的一個標識。有些路由類型會按照標識對應消息隊列,有些路由類型忽略routingkey。具體看下文。
一、Exchange類型direct
他是根據交換器名稱與routingkey來找隊列的。
Note:消息從client發出,傳送給交換器ChangeA,RoutingKey爲routingkey.ZLH,那麼無論你發送給Queue1,仍是Queue2一個消息都會保存在Queue1,Queue2,Queue3,三個隊列中。這就是交換器的direct類型的路由規則。只要找到路由器與routingkey綁定的隊列,那麼他有多少隊列,他就分發給多少隊列。
二、Exchange類型fanout
這個類型忽略Routingkey,他爲廣播模式。
Note:消息從客戶端發出,只要queue與exchange有綁定,那麼他無論你的Routingkey是什麼他都會將消息分發給全部與該exchang綁定的隊列中。
三、Exchange類型topic
這個類型的路由規則若是你掌握啦,那是至關的好用,與靈活。他是根據RoutingKey的設置,來作匹配的,其中這裏還有兩個通配符爲:
*,表明任意的一個詞。例如topic.zlh.*,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
#,表明任意多個詞。例如topic.#,他可以匹配到,topic.zlh.one ,topic.zlh.two ,topic.zlh.abc, ....
Note:這個圖看上去很亂,可是他是根據匹配符作匹配的,這裏我建議你本身作下消息隊列的具體操做。
具體操做以下
public static void Producer(int value) { try { var qName = "lhtest1"; var exchangeName = "fanoutchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "*"; var uri = new Uri("amqp://192.168.10.121:5672/"); var factory = new ConnectionFactory { UserName = "123", Password = "123", RequestedHeartbeat = 0, Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //設置交換器的類型 channel.ExchangeDeclare(exchangeName, exchangeType); //聲明一個隊列,設置隊列是否持久化,排他性,與自動刪除 channel.QueueDeclare(qName, true, false, false, null); //綁定消息隊列,交換器,routingkey channel.QueueBind(qName, exchangeName, routingKey); var properties = channel.CreateBasicProperties(); //隊列持久化 properties.Persistent = true; var m = new QMessage(DateTime.Now, value+""); var body = Encoding.UTF8.GetBytes(DoJson.ModelToJson<QMessage>(m)); //發送信息 channel.BasicPublish(exchangeName, routingKey, properties, body); } } } catch (Exception ex) { Console.WriteLine(ex.Message); } }
一、消息隊列的消費
Note:若是一個消息隊列中有大量消息等待操做時,咱們能夠用多個客戶端來處理消息,這裏的分發機制是採用負載均衡算法中的輪詢。第一個消息給A,下一個消息給B,下下一個消息給A,下下下一個消息給B......以此類推。
二、爲啦保證消息的安全性,保證此消息被正確處理後才能在服務端的消息隊列中刪除。那麼rabbitmq提供啦ack應答機制,來實現這一功能。
ack應答有兩種方式:一、自動應答,二、手動應答。具體實現以下。
public static void Consumer() { try { var qName = "lhtest1"; var exchangeName = "fanoutchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "*"; var uri = new Uri("amqp://192.168.10.121:5672/"); var factory = new ConnectionFactory { UserName = "123", Password = "123", RequestedHeartbeat = 0, Endpoint = new AmqpTcpEndpoint(uri) }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType); channel.QueueDeclare(qName, true, false, false, null); channel.QueueBind(qName, exchangeName, routingKey); //定義這個隊列的消費者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); //false爲手動應答,true爲自動應答 channel.BasicConsume(qName, false, consumer); while (true) { BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); byte[] bytes = ea.Body; var messageStr = Encoding.UTF8.GetString(bytes); var message = DoJson.JsonToModel<QMessage>(messageStr); Console.WriteLine("Receive a Message, DateTime:" + message.DateTime.ToString("yyyy-MM-dd HH:mm:ss") + " Title:" + message.Title); //若是是自動應答,下下面這句代碼不用寫啦。 if ((Convert.ToInt32(message.Title) % 2) == 1) { channel.BasicAck(ea.DeliveryTag, false); } } } } } catch (Exception ex) { Console.WriteLine(ex.Message); } }