Redis 消息中間件 ServiceStack.Redis 輕量級

問題:

  • 公司開了個新項目,算上我一共3我的。車間裏機臺經過流水線連通聯動的玩意。一個管理控制系統鏈接各個機臺和硬件。專機類型就有5種,個數差很少20個左右。redis

  • 軟件規劃的時候採用總分的結構,管理控制系統和專機子系統之間經過消息中間件通信。原本也想TCP鏈接來着,可是開發時間不容許。並且每一個系統都得寫一遍這個玩意。數據庫

  • 消息中間件有不少個,好比 Kafka、RabbitMQ、RocketMQ等國內外的消息中間件。這些中間件不管宣稱的多麼輕量級都要啃一下,更要命的是就他娘三我的。並且後面還要這個雞兒系統可複製。服務器

  • 考慮到消息及時性、開發難易程度、維護簡便性等因素後決定用Redis的pub/sub功能來實現.軟件結構大概如相似結構。網絡

 

可用性:

  • 做爲消息通知屬於安裝了Redis就有的功能,由於Redis是用在系統中存儲一些熱數據,不用單獨維護,在Windows中屬於服務直接就開了。分佈式

  • 做爲能夠分佈式集羣使用的數據庫,消息傳遞應該比較OK了。雖然使用的client-server,可是server-server已經很好了。料想client-server也不會差測試

  • 試驗消息內容發送訂閱的狀況下,速度在30毫秒內,貌似能夠。看其餘博主說大於10K入隊比較慢,可是能夠不用入消息隊列啊,用發佈訂閱。this

  • .net 下通常使用ServiceStack.Redis,要命的是4.0之後收費,能夠破解的可是不支持List<T>型的數據直接存取,想用只能變成JSON字符串存着。spa

  • 若是隻是用訂閱發佈功能,不存儲熱數據或者不使用List<T>的數據可使用4.0以上的版本。文末會貼上兩個類型的下載包。想用其餘的包也能夠,我這裏只說一種思路。

實現:

模塊結構圖展現以下.net

public static class MSServer
    {
        // 定義一個object對象
        private static object objinstance = new object();

        private static ServerState CurState = ServerState.Free;

        static PooledRedisClientManager prcm;

        private static string clientmake = string.Empty;

        /// <summary>
        /// 鏈接的地址
        /// </summary>
        /// <param name="IP">地址127.0.0.1:6379</param>
        /// <param name="rechannels">接收通道 {"channel:1-13","channel:1-5"}</param>
        /// <returns></returns>
        public static int OpenServer(string IP ,string[] rechannels)
        {
            try
            {
                if (prcm == null)
                {
                    lock (objinstance)
                    {
                        if (prcm == null)
                        {
                            prcm = CreateManager(IP, IP);
                            CurState = ServerState.Init;
                            return CreateLink(rechannels);
                        }
                    }
                }
            }
            catch
            {
                prcm = null;
                CurState = ServerState.Free;
                return -1;
            }
            return 1;
        }

        private static int CreateLink(string[] SourceID)
        {
            if (CurState == ServerState.Init && SourceID.Length > 0)
            {
                try
                {
                    using (IRedisClient Redis = prcm.GetReadOnlyClient())
                    {
                        clientmake = SourceID[0];
                        var info = Redis.GetClientsInfo().Where(i => i["name"] == clientmake).ToList();
                        info.ForEach(i =>
                        {
                            Redis.KillClient(i["addr"]);
                        });
                        Redis.SetClient(clientmake);
                        IRedisSubscription sc = Redis.CreateSubscription();
                        Task.Run(() =>
                        {
                            try
                            {
                                sc.SubscribeToChannels(SourceID);
                            }
                            catch { }
                        });
                        sc.OnMessage += new Action<string, string>(showpub);
                    }
                    CurState = ServerState.Work;
                }
                catch
                {
                    string message = string.Empty;
                    prcm = null;
                    CurState = ServerState.Free;
                    return -1;
                }
                return 1;
            }
            else
            {
                return 0;
            }
        }


        public static Action<string, string> ReceiveMessage;
        static void showpub(string channel, string message)
        {
            if (ReceiveMessage != null)
            {
                ReceiveMessage(channel, message);
            }
        }

        private static PooledRedisClientManager CreateManager(string writeHost, string readHost)
        {
            var redisClientConfig = new RedisClientManagerConfig
            {
                MaxWritePoolSize = 1,//「寫」連接池連接數
                MaxReadPoolSize = 1,//「讀」連接池連接數
                DefaultDb = 0,
                AutoStart = true,
            };
            //讀的客戶端只能接受特定的命令,不能用於發送信息
            var RedisClientManager = new PooledRedisClientManager(
                new string[] { writeHost }//用於寫
                , new string[] { readHost }//用於讀
                , redisClientConfig);
            CurState = ServerState.Init;

            return RedisClientManager;
        }
        /// <summary>
        /// 發送信息
        /// </summary>
        /// <param name="channel">通信對象 "channel:1-13"</param>
        /// <param name="meesage">發送信息 "test send "</param>
        /// <returns>0 發送失敗 1 發送成功 -1 鏈接損毀 檢查網絡後重建</returns>
        public static long PubMessage(string channel, string meesage)
        {
            if (CurState == ServerState.Work)
            {
                if (!string.IsNullOrEmpty(channel) && !string.IsNullOrEmpty(meesage))
                {
                    try
                    {
                        using (IRedisClient Redis = prcm.GetClient())
                        {
                            Redis.SetClient(clientmake);
                            return Redis.PublishMessage(channel, meesage);
                        }
                    }
                    catch
                    {
                        prcm = null;
                        CurState = ServerState.Free;
                        return -1;
                    }
                }
                else
                {
                    return 0;
                }
            }
            else
            {
                return -1;
            }
        }
    }

public enum ServerState
    {
        Free,
        Init,
        Work,
        Del
    }
    

 

 

有一個問題,就是鏈接遠程的服務器時若是網絡斷開再重連,會殘留沒用的client ,這樣若是網絡斷斷續續的話,會留好多沒有清除的客戶端。code

這個在3.0.504版本中Redis 中也有這個問題,不知道是基於什麼考慮的。因此須要創建鏈接的時候,給個客戶端名稱,再初始化的時候刪掉全部同類型的名稱。

 

使用的時候大概相似操做 textbox2.text = "channel:1-5" .爲了簡便發佈的和監聽的都是本地的一個通道。

private void button1_Click(object sender, EventArgs e)
        {

            //11.1.7.152   192.168.12.173
            int result = ServerMS.MSServer.OpenServer("127.0.0.1:6379", new string[] { textBox2.Text });
            label1.Text = result.ToString();
            //1匿名事件
            ServerMS.MSServer.ReceiveMessage += new Action<string, string>(fuck);

            if (result == 0)
            {
                //發送失敗從新發送 檢查 通道和字符串後從新發送
            }
            else if (result == 1)
            {
                //發送成功
            }
            else if (result == -1)
            {
                //鏈接錯誤 須要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
            }

        }

        void fuck(string channel, string message)
        {
            this.BeginInvoke(new Action(() =>
            {
                textBox4.Text = channel + message;
            }));
        }
        public bool sdfsd = true;

        private void button3_Click(object sender, EventArgs e)
        {long result = ServerMS.MSServer.PubMessage(textBox2.Text, DateTime.Now.ToString("yyyyyMMddhhmmssfff"));

            if (result == 0)
                {
                    //發送失敗從新發送
                }
            else if (result == 1)
                {
                            //發送成功
                }
            else if (result == -1)
                 {
                  //鏈接錯誤 須要 ServerMS.MSServer.OpenServer("192.168.12.173:6379", new string[] { textBox2.Text });
                 }
        }

 

爲了簡便channel:是通道的固定命令 ,能夠自定義channel:後面的內容,發送就有反饋。確保全部機臺都接收到。

若是有斷線的須要程序本身重連,接收通道的客戶端不能夠再給其餘的使用,Redis上說Redis client 進入訂閱模式時只能接受訂閱發佈等命令指令,不接受普通的存取和其餘命令

因此若是須要在讀取、寫入、發佈、執行其餘的指令須要使用其餘客戶端,不然就出錯了。跑了幾天了上億次的測試貌似沒有出現什麼問題。

 

 

發佈訂閱消息不會走AOF RDB只存在於內存中,即發即用,用完就沒了。沒在線就沒了。須要考慮使用環境。

還用ping pong來肯定鏈接狀態,也能夠自定義數據,使用場景要本身開發,要適合本身的纔是好的。

下載:

4.0 dll   

連接:https://pan.baidu.com/s/1966t0pduHxQXcxcxV3ZTeQ
提取碼:js8p

 

5.8 dll不可使用List<T>類型

連接:https://pan.baidu.com/s/1RFgY4V0ZO78Wvd7LOxr97g 提取碼:bxh2

相關文章
相關標籤/搜索