使用Redis作消息隊列

基於內存的單線程數據庫,使Redis的線程安全性與性能極高。而Redis的雙向鏈表數據類型(List)天生就可做爲消息隊列存儲消息.redis

在這裏就不說消息隊列的等等一些優勢。可是補充一下Redis的List類型的幾個命令,你能夠指定將一個元素投送到列表的頭部(左邊)或者尾部(右邊),固然也能夠指定從列表的頭部或尾部取出數據.數據庫

LPush:添加元素至列表的頭部安全

 

 RPush:添加元素至列表的尾部性能

 

LPop:移除並獲取列表的頭部的第一個元素spa

 RPop:移除並獲取列表的尾部的第一個元素線程

 

BLpop:移出並獲取列表頭部的第一個元素, 若是列表沒有元素會阻塞列表直到等待超時或發現可彈出元素爲止。命令格式:blpop key timeout,當timeout=0時,表示一直阻塞等待,直到有其餘客戶端執行rpush或者lpush命令,插入數據後,阻塞才解除.3d

BRpop:與BLpop相同,不一樣的是它是移除列表尾部的第一個元素.code

以下,開啓兩個客戶端,一個客戶端先使用BLpop阻塞讀取數據,另外一個客戶端寫入數據.blog

OK,到此我想你已經明白了,List的做用已經顯而易見。生產者投入消息,消費者拿到消息。並且雙向鏈表的數據類型,投入和拿取數據都特別靈活。是否是感受很不錯?接着往下看😄隊列

下面在代碼中實現消息隊列的數據投遞與拾取.

引入NuGet包:StackExchange.Redis

 生產者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    for (int i = 0; i < 10; i++)
    {
        redis.db.ListRightPush("datalist", $"data{i}");//向列表的尾部投遞消息
        Console.WriteLine($"{DateTime.Now} 已投遞消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消費者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    while (true)
    {
        string result = redis.db.ListRightPop("datalist");//從列表的尾部拾取消息

        if (string.IsNullOrEmpty(result)) { }
        else
        {
            Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result.ToString()}");
            Thread.Sleep(1000);
        }
    }
}

爲了讓你們看到效果,我故意讓線程等待了幾秒鐘.

先進先出和先進後出的實現方式都比較靈活,若是要想實現先進先出的規則的話,要將上面的消費者代碼改成redis.db.ListLeftPop("datalist")=>從頭部開始讀取消息

可是上面的代碼有一個很大的弊端,雖然消息已經消費完了,可是仍然在不停的lpop,因此形成很大的浪費.就算是這裏使用了Sleep,必定程度上減小了CPU的佔用率,可是消息處理的時效性就削弱了.

不用擔憂,對此確定有解決的方法😀,咱們上面提到了Redis有兩個阻塞命令BRpop與BLpop,這兩個命令能夠解決上述問題.有消息的話它就會幫你拿出來,並且不用while(true)的方式也會減小CPU的開銷.由於列表沒有消息的話,它就會一直阻塞,能夠理解爲保持了一個長鏈接(就至關於你問你女友爲何生氣,而後她就說由於什麼什麼...,晚上你問她想吃點什麼,而後她說想吃點什麼什麼...,你每次都要去問她,時間久了她就以爲很煩,會以爲你不懂她。因此你就住進她內心面,她內心面想什麼你就能第一時間知道,用這個作比喻我相信大家都能懂😂)。

可是StackExchange.Redis並無提供BLpop與BRpop的API,咱們可使用使用pub/sub的方式.代碼以下:

生產者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();

    for (int i = 0; i < 10; i++)
    {
        sub.PublishAsync("datalist", $"data{i}").GetAwaiter();
        Console.WriteLine($"{DateTime.Now} 已投遞消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消費者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
    sub.Subscribe("datalist", (channel, message) =>
    {
        Console.WriteLine($"{DateTime.Now} 已接收消息,Message={message}");
        Thread.Sleep(1000);
    });
    Console.WriteLine("消費者0已啓動成功!");
    Console.ReadKey();
}

分別啓動兩個消費者客戶端

這種爲廣播模式,每個訂閱者都會收到消息。可是該消息不保證是否被接收,生產者投遞完消息若是沒有消費者接收的話,消息會丟失.

還有一種方式消息不會丟失,將消息存在列表裏面。首先生產者向列表投入數據,緊接着去通知訂閱者,讓訂閱者從列表中取出數據。可是有一個弊端,若是有多個消費者訂閱時,只有一個消費者能取到數據。代碼以下:

生產者:

var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();

for (int i = 0; i < 10; i++)
{
    redis.db.ListLeftPush("datalist", $"data{i}", flags: CommandFlags.FireAndForget);
    sub.PublishAsync("channel1", "").GetAwaiter();
    Console.WriteLine($"{DateTime.Now} 已投遞消息data{i}!");
    Thread.Sleep(3000);
}
Console.ReadKey();

消費者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
//若是消費者後啓動,或者宕機重啓,要先查詢列表中是否有數據,若是有數據要消費掉
var len = redis.db.ListRange("datalist").Length; if (len > 0) { Task.Run(() => { for (int i = 0; i < len; i++) { string result = redis.db.ListRightPop("datalist"); //業務操做... } }); } sub.Subscribe("channel1", (channel, message) => { string result = redis.db.ListRightPop("datalist"); Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result}"); Thread.Sleep(1000); }); Console.WriteLine("消費者0已啓動成功!"); Console.ReadKey();

若有不足,請見諒!今天是十月二號,昨天是中秋佳節又是國慶,在這裏祝你們雙節快樂。原本昨天晚上寫完這篇的,可是八九點的時候太困了,就😴... ...

昨晚坐在窗邊,偶然間向外面瞄了一眼,月亮的光芒太耀眼了,也太漂亮了,趕忙拍了一張🌕

相關文章
相關標籤/搜索