Rabbitmq強大的【優先級隊列】

說到隊列的話,你們必定不會陌生,可是扯到優先級隊列的話,仍是有一部分同窗是不清楚的,多是不知道怎麼去實現吧,其實呢,,,這東西已redis

經爛大街了。。。很簡單,用「堆」去實現的,在咱們系統中有一個訂單催付的場景,咱們客戶的客戶在tmall,taobao下的訂單,taobao會及時將訂單推送給後端

咱們,若是在用戶設定的時間內未付款那麼就會給用戶推送一條短信提醒,很簡單的一個功能對吧,可是,tmall商家對咱們來講,確定是要分大客戶和小客測試

戶的對吧,好比像施華蔻,百雀林這樣大商家一年起碼可以給咱們貢獻幾百萬,因此理應固然,他們的訂單必須獲得優先處理,而曾今咱們的後端系統是使優化

用redis來存放的定時輪詢,你們都知道redis只能用List作一個簡簡單單的消息隊列,並不能實現一個優先級的場景,因此訂單量大了後採用rabbitmq進行ui

改造和優化,若是發現是大客戶的訂單給一個相對比較高的優先級,不然就是默認優先級,好了,廢話很少說,咱們來看看如何去設置。spa

一:優先級隊列

既然是優先級隊列,那麼必然要在Queue上開一個口子貼上一個優先級的標籤,爲了看怎麼設置,咱們用一下rabbitmq的監控UI,看看這個裏面是如何code

手工的建立優先級隊列。orm

214741-20161103222447924-150404743

 從這個圖中能夠看到在Arguments欄中有特別多的小屬性,其中有一項就是"Maximum priority",這項的意思就是說能夠定義優先級的最大值,其實blog

想一想也是,不可能咱們定義的優先級是一個很是大的數字,好比int.MaxValue,大多狀況下都是10之內的數字就能夠了,再或者咱們曾今接觸過的 MSMQ,rabbitmq

它的優先級只是一些枚舉值,什麼High,Normal,Low,不知道你們能否記得? 下面來看下代碼中該如何實現呢???

1. 在Queue上附加優先級屬性

Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic);

 上面的代碼作了一個簡單的隊列聲明,queuename="hello",持久化,排外。。。而後把"x-max-priority"塞入到字典中做爲arguments參數,看起來還

是很是簡單吧~~~

2. 在Message上指定優先級屬性

var properties = channel.CreateBasicProperties(); properties.Priority = 1; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

214741-20161103223717565-951723961

經過上面的代碼能夠看到,在Message上設置優先級,我是經過在channel通道上設置Priority屬性,以後塞到basicProperties中就能夠了,好了,有上面這兩

個基礎以後,下面就能夠開始測試了,準備向rabbitmq推送10條記錄,其中第5條的優先級最高,因此應該首先就print出來,以下圖:

static void Main(string[] args) { var sb = new StringBuilder(); for (int i = 0; i < 11; i++) { sb.Append(i); } var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "mydirect", type: ExchangeType.Direct, durable: true); Dictionary<string, object> dic = new Dictionary<string, object>(); dic.Add("x-max-priority", 20); for (int i = 0; i < 10; i++) { channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: dic); string message = string.Format("{0} {1}", i, sb.ToString()); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Priority = (i == 5) ? (byte)10 : (byte)i; channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", i); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }

214741-20161103224312174-179108976300

圖中能夠看到10條消息我都送到rabbitmq中去了,接下來打開consume端,來看看所謂的index=5 是否第一個送出來??

static void Main(string[] args) { for (int m = 0; m < int.MaxValue; m++) { var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { var result = channel.BasicGet("hello", true); if (result != null) { var str = Encoding.UTF8.GetString(result.Body); Console.WriteLine("{0} 消息內容 {1}", m, str); System.Threading.Thread.Sleep(1); } } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }

214741-20161103224809518-264108002

一切都是這麼的完美,接下來爲了進行可視化驗證,你能夠在WebUI中觀察觀察,能夠發如今Queue上面多了一個 Pri 標記,有意思吧。

214741-20161103225041080-189485375

好了,這麼重要的功能,是否是已經讓你足夠興奮啦, 但願你們可以好好的在實際場景中運用吧~~~

相關文章
相關標籤/搜索