NetCore下搭建websocket集羣方案

介紹

最近在作一個基於netcore的實時消息服務。最初選用的是ASP.NET Core SignalR,可是後來發現目前它並無支持IOS的客戶端,因此本身只好又基於websocket從新搭建了一套服務。html

由於前期已經使用了SignalR,因此我直接在本來的項目裏面從新擴展了一套自定義websocket服務。前端

在網上有一篇博文介紹瞭如何在Asp.net Core中使用中間件來管理websocket,個人大部分代碼也是參考這篇文章。在這兒貼個連接web

在Asp.net Core中使用中間件來管理websocketredis

自定義WebSocket 中間件

要閱讀ASP.NET Core中的WebSockets支持,能夠在此處查看。若是你的項目跟我同樣,已經使用了Signalr,那麼你不須要在安裝Microsoft.AspNetCore.WebSockets包,不然在項目開始前,json

須要安裝此Nuget包。如今你能夠自定義你本身的中間件了。後端

/// <summary>
    /// websocket 協議擴展中間件
    /// </summary>
    public class CustomWebSocketMiddlewarr
    {
        private readonly RequestDelegate _next;

        public CustomWebSocketMiddlewarr(RequestDelegate next)
        {
            _next = next;
        }

        public async Task Invoke(HttpContext context, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler)
        {
             if (context.WebSockets.IsWebSocketRequest)
                {
                    string ConId = context.Request.Query["sign"];
                    if (!string.IsNullOrEmpty(ConId))
                    {
                        WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();
                        CustomWebSocket userWebSocket = new CustomWebSocket()
                        {
                            WebSocket = webSocket,
                            ConId = ConId
                        };
                        wsFactory.Add(userWebSocket);
                    //await wsmHandler.SendInitialMessages(userWebSocket);
                    await Listen(context, userWebSocket, wsFactory, wsmHandler);
                        
                    }
                }
                else
                {
                    context.Response.StatusCode = 400;
                }
            
            await _next(context);
        }
     //監聽客戶端發送過來的消息
        private async Task Listen(HttpContext context, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory, ICustomWebSocketMessageHandler wsmHandler)
        {
            WebSocket webSocket = userWebSocket.WebSocket;
            var buffer = new byte[1024 * 4];
            WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            while (!result.CloseStatus.HasValue)
            {
                await wsmHandler.HandleMessage(result, buffer, userWebSocket, wsFactory);
                buffer = new byte[1024 * 4];
                result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
            }
            wsFactory.Remove(userWebSocket.ConId);
            await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
        }
    }

在自定義的中間件中,首先判斷是不是websocket請求,若是是的話,在查看是否有對應的sign標識,知足條件後進入後續的處理環節。緩存

簡單講解一下這裏面的處理邏輯。由於個人項目中同時存在Signalr,而Signalr也會使用到websocket協議。可是Signalr的websocket請求傳入的參數是id,因此我在這兒自定義了一個參數sign爲了和Signalrwebsocket

作區分。那麼這個sign是作什麼用的呢? 其實sign是前端傳過來的惟一標識,和這次鏈接對應,也能夠理解爲Signalr裏面的connectionId。而後會把標識和對應websocket類到存入到一個list集合中。即代碼app

中的  wsFactory.Add(userWebSocket)。框架

CustomWebSocket是一個包含WebSocket和標識的類:

public  class CustomWebSocket
    {
        
        public string ConId { get; set; }

        public WebSocket WebSocket { get; set; }
    }

而後定義了一個Websocket工廠類,用來存取鏈接到服務的Websocket實例。

//接口
public
interface ICustomWebSocketFactory { void Add(CustomWebSocket uws); void Remove(string conId); List<CustomWebSocket> All(); List<CustomWebSocket> Others(CustomWebSocket client); CustomWebSocket Client(string conId); }
  

具體實現

public class CustomWebSocketFactory: ICustomWebSocketFactory
    {
        List<CustomWebSocket> List;
        public CustomWebSocketFactory()
        {
            List = new List<CustomWebSocket>();
        }
        public void Add(CustomWebSocket uws)
        {
            List.Add(uws);
        }
        public void Remove(string conId)
        {
            List.Remove(Client(conId));
           
        }
        public List<CustomWebSocket> All()
        {
            return List;
        }
       
        public List<CustomWebSocket> Others(CustomWebSocket client)
        {
            return List.Where(c => c.ConId != client.ConId).ToList();
        }
        public CustomWebSocket Client(string conId)
        {
            var uws= List.FirstOrDefault(c => c.ConId == conId);
            return uws;

        }
    }

能夠看到最終咱們存取websocket都是經過list來進行,因此在注入的時候必定要注意。注入成單例模式。

services.AddSingleton<ICustomWebSocketFactory, CustomWebSocketFactory>();

CustomWebSocketMessageHandle包含有關消息處理的邏輯(發送,接收)
public interface ICustomWebSocketMessageHandler
    {
        Task SendInitialMessages(CustomWebSocket userWebSocket);
        Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory);
        Task SendMessageInfo(string conId, object data, ICustomWebSocketFactory wsFactory);


    }

public   class CustomWebSocketMessageHandler:ICustomWebSocketMessageHandler
    {
        public async Task SendInitialMessages(CustomWebSocket userWebSocket)
        {
            WebSocket webSocket = userWebSocket.WebSocket;
            var msg = new CustomWebSocketMessage
            {
                MessagDateTime = DateTime.Now,
                Type = WSMessageType.鏈接響應
            };

            string serialisedMessage = JsonConvert.SerializeObject(msg);
            byte[] bytes = Encoding.ASCII.GetBytes(serialisedMessage);
            await webSocket.SendAsync(new ArraySegment<byte>(bytes, 0, bytes.Length), WebSocketMessageType.Text, true, CancellationToken.None);
        }
        /// <summary>
        /// 推送消息到客戶端
        /// </summary>
        /// <returns></returns>
        public async Task SendMessageInfo(string conId,object data, ICustomWebSocketFactory wsFactory)
        {
            var uws = wsFactory.Client(conId);
            CustomWebSocketMessage message = new CustomWebSocketMessage();
            message.DataInfo = data;
            message.Type = WSMessageType.任務數量;
            message.MessagDateTime = DateTime.Now;
            if (uws == null)
            {
                //廣播到其餘集羣節點
                var listpush = new List<PushMsg>();

                var push = new PushMsg()
                {
                    sendjsonMsg = new WebSocketFanoutDto()
                    {
                        conId = conId,
                        data = message
                    },
                    exchangeName = "saas.reltimewsmes.exchange",
                    sendEnum = SendEnum.訂閱模式
                };
                listpush.Add(push);
                BTRabbitMQManage.PushMessageAsync(listpush);
                return;
            }
           
            var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(message));
           await uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None);
        }

        /// <summary>
        /// 處理接收到的客戶端信息
        /// </summary>
        /// <param name="result"></param>
        /// <param name="buffer"></param>
        /// <param name="userWebSocket"></param>
        /// <param name="wsFactory"></param>
        /// <returns></returns>
        public async Task HandleMessage(WebSocketReceiveResult result, byte[] buffer, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory)
        {
            string msg = Encoding.UTF8.GetString(buffer);
            try
            {
                var message = JsonConvert.DeserializeObject<CustomWebSocketMessage>(msg);
                if (message.Type == WSMessageType.用戶信息)
                {
                    var logdto = JsonConvert.DeserializeObject<LoginInfoDto>(message.DataInfo.ToJsonString());
                    await InitUserInfo(logdto, userWebSocket, wsFactory);
                }
               
            }
            catch (Exception e)
            {
                var exbuffer = Encoding.UTF8.GetBytes(e.Message);
                var excount = Encoding.UTF8.GetByteCount(e.Message);
                await userWebSocket.WebSocket.SendAsync(new ArraySegment<byte>(exbuffer, 0, excount), result.MessageType, result.EndOfMessage, CancellationToken.None);
            }
        }
        /// <summary>
        /// 初始化用戶鏈接關係
        /// </summary>
        /// <param name="dto"></param>
        /// <param name="userWebSocket"></param>
        /// <param name="wsFactory"></param>
        /// <returns></returns>
        private async Task InitUserInfo(LoginInfoDto dto, CustomWebSocket userWebSocket, ICustomWebSocketFactory wsFactory)
        {
            if (dto.userId == 0)
                return;
            var contectid = userWebSocket.ConId;
            var key = "";
            if (dto.tenantId.HasValue)
                key += "T_" + dto.userId + "_" + dto.tenantId + "_" + "tenant_";
            if (dto.bankId.HasValue)
                key += "B_" + dto.userId + "_" + dto.bankId + "_" + "bank_";
            key += dto.fromeType;
            //添加緩存
            CacheInstace<string>.GetRedisInstanceDefaultMemery().AddOrUpdate(key, contectid, r =>
            {
                r = contectid;
                return r;
            });
            CacheInstace<string>.GetRedisInstanceDefaultMemery().Expire(key, new TimeSpan(12, 0, 0));
           
        }
       
    }
在這裏面,推送消息到客戶端的時候,若是未找到標識對應的Websocket對象,則將消息廣播到全部的集羣節點上。咱們知道Signalr裏面的集羣實現經過redis來作的,但在此處,由於
我項目裏面已經搭建了Rabbitmq的高可用集羣,因此我直接經過Rabbitmq來進行廣播。這樣無論我是在集羣的那個節點上來推送消息,均可以保證消息被正確推送到客戶端。
關於廣播消息的訂閱實現:
 public class WebSocketFanoutDto
    {
        public string conId { get; set; }

        public CustomWebSocketMessage data { get; set; }
    }

 public class FanoutMesConsume : IMessageConsume
    {
        public void Consume(string message)
        {
            var condto = JsonConvert.DeserializeObject<WebSocketFanoutDto>(message);
            var wsFactory = IOCManage.ServiceProvider.GetService<ICustomWebSocketFactory>();
            var uws = wsFactory.Client(condto.conId);
            if (uws != null)
            {
                //發送消息
                var mesbuffer = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(condto.data));
                var mescount = Encoding.UTF8.GetByteCount(JsonConvert.SerializeObject(condto.data));
                uws.WebSocket.SendAsync(new ArraySegment<byte>(mesbuffer, 0, mescount), WebSocketMessageType.Text, true, CancellationToken.None);
            }
        }
    }
 

最後在擴展類裏面添加消息監視和注入Websocket中間件。

固然不要忘記 消息處理類的依賴注入

services.AddSingleton<ICustomWebSocketMessageHandler, CustomWebSocketMessageHandler>();
 
 public static IApplicationBuilder UseCustomWebSocketManager(this IApplicationBuilder app)
        {
            //添加針對分佈式集羣的消息監視
            RabbitMQManage.Subscribe<FanoutMesConsume>(new MesArgs()
            {
                exchangeName = "reltimewsmes.exchange",
                sendEnum = SendEnum.訂閱模式
            });
            return app.UseMiddleware<CustomWebSocketMiddlewarr>();
        }

至此這個框架搭建完成,最後在startup類中注入。

關於Rabbitmq的使用,發送和接收是我基於easynetq封裝的一個幫助類,你們能夠自行實現。

這裏面最主要的邏輯就是每個websocket實例都有一個對應的標識,而後在鏈接成功後,前端會發送用戶信息,後端服務再把用戶信息和鏈接標識關聯。這樣若是想推送信息到某個用戶的話,就能夠經過

用戶信息來找到用戶對應的鏈接信息。至於爲何整個流程會這麼複雜的,就一言難盡(我能怎麼辦,我也很絕望啊)。大多數時候你們均可以直接經過token認證來綁定用戶和socket鏈接。

目前還有幾個問題一個廣播消息的時候,發送消息方也會收到這個消息,這挺尷尬,目前我還沒想到太好的解決辦法。

第二個是採用單例list字段存儲鏈接的websocket實例,少的時候還好,若是多的話,感受可能會存在堆棧溢出的問題,但沒實際測試過,因此目前還不知道最大的鏈接數多少。

相關文章
相關標籤/搜索