上一篇演示了泛型Hub的實現,微軟於6月17日更新了SignalR 2.1.0,而後自帶了泛型Hub,因而就不須要本身去實現了…(微軟你爲啥不早一個月自帶啊…)。不過不要緊,SignalR出彩之處不在泛型Hub,本篇爲各位觀衆帶來了基於SignalR的簡易集羣通信組件Demo,可用於分佈式定時任務。html
說到集羣,天然想到了NLB啊Cluster啊HPC啊等等。NLB受制於成員數量,Cluster用數量堆高可用性,HPC太複雜。本着SignalR的雙向異步通信的特色,實際上是能夠用來玩彈性計算的。初始狀態由一臺計算任務分發節點,一臺監控以及一臺計算節點構成。隨着任務分發隊列中的任務數愈來愈多,一臺執行節點沒法及時消耗待執行任務,達到某個閾值的時候,動態的加入一個計算節點來增長計算吞吐量。一樣的,當隊列中的任務基本處於很低的數量的時候,自動移除一個計算節點來減小資源消耗。固然,若是是大型的計算量之下,分發節點,隊列都應該是集羣的,還要考慮各類計算節點故障之類的問題,這不在本篇考慮的範疇內,本篇以初始狀態模型來一步步實現簡易集羣通信組件。jquery
好,廢話不說了,正篇開始。數據庫
任務分發節點只有一個公開的行爲,就是接受計算節點任務執行完成的消息。異步
下面是實現。分佈式
/// <summary> /// 集羣交換器 /// </summary> public class ClusterHub : Hub<IClusterClient> { /// <summary> /// /// </summary> static ClusterHub() { aliveDictionary = new ConcurrentDictionary<string, Guid>(); } /// <summary> /// /// </summary> /// <param name="dispatcher"></param> public ClusterHub(IDispatcher dispatcher) { this.dispatcher = dispatcher; db = OdbFactory.Open(localDbFileName); } /// <summary> /// 本地數據庫文件名 /// </summary> const string localDbFileName = "ClusterStorage.dll"; /// <summary> /// 監視器鏈接Id /// </summary> static string monitorConnectionId; /// <summary> /// 調度器 /// </summary> IDispatcher dispatcher; /// <summary> /// 在線詞典 /// </summary> static ConcurrentDictionary<string, Guid> aliveDictionary; /// <summary> /// /// </summary> static IOdb db; /// <summary> /// 完成任務 /// </summary> /// <param name="jobResult"></param> public void Finished(Contracts.Messages.JobResultDto jobResult) { lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id)); if (member != null) { member.UpdateStatisticsInfo(jobResult.ProcessedTime); db.Store(member); if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime }); } } } Clients.Caller.RunJob(dispatcher.GetJobId()); } /// <summary> /// 加入 /// </summary> void Join() { object ip = string.Empty; var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor"; Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip); lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor); if (member != null) { member.MemberStatusType = MemberStatusTypeEnum.Connectioned; } else { member = new MemberDo(ip.ToString(), isMonitor); if (isMonitor) { monitorConnectionId = Context.ConnectionId; } } db.Store(member); aliveDictionary.TryAdd(Context.ConnectionId, member.Id); if (!isMonitor) { if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).MemberJoin(member.Id); } Clients.Caller.GetId(member.Id.ToString()); Clients.Caller.RunJob(dispatcher.GetJobId()); } } } /// <summary> /// 離開 /// </summary> void Leave() { var id = Guid.Empty; aliveDictionary.TryRemove(Context.ConnectionId, out id); lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Id == id); if (member != null) { member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned; db.Store(member); if (member.IsMonitor) { monitorConnectionId = string.Empty; } else if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).MemberLeave(id); } } } } public override Task OnConnected() { Console.WriteLine(Context.ConnectionId+":Connected"); Join(); return base.OnConnected(); } public override Task OnDisconnected() { Console.WriteLine(Context.ConnectionId + ":Disconnected"); Leave(); return base.OnDisconnected(); } public override Task OnReconnected() { Console.WriteLine(Context.ConnectionId + ":Reconnected"); return base.OnReconnected(); } }
ClusterHub承載着2種客戶端角色的交互,計算節點和監控。ide
這邊採用了一個輕量級的基於C#開發的無引擎對象數據庫來存儲客戶端信息。函數
先說重載的部分:性能
OnConnected - 當有客戶端鏈接的時候,執行Join方法。ui
OnDisconnected - 當有客戶端離線的時候,執行Leave方法。this
而後是私有方法:
Join - 根據QueryString來區分客戶端類型是計算節點仍是監視器,若是是計算節點,就直接通知監視器有成員加入,而後經過IDispatcher來獲取任務Id,通知計算節點開始執行任務。
Leave - 計算節點離線的時候通知監視器。
公開方法:
Finished - 計算節點完成任務後就調用該方法,Hub將計算的一些統計信息更新到本地存儲,同時通知監視器更新計算結果。
私有變量:
IDispatcher– 任務調度器接口,由外部組件來負責具體的實現。
計算節點有兩個行爲:
GetId - 獲取節點身份。
RunJob - 執行任務。
/// <summary> /// 集羣客戶端 /// </summary> public class ClusterClient { /// <summary> /// /// </summary> /// <param name="jobProvider"></param> public ClusterClient(IJobProvider jobProvider) { this.jobProvider = jobProvider; url = ConfigurationManager.AppSettings["HubAddress"]; var queryStrings = new Dictionary<string, string>(); queryStrings.Add("ClientRole", "Normal"); connection = new HubConnection(url, queryStrings); hubProxy = connection.CreateHubProxy(typeof(IClusterHub).GetCustomAttributes(typeof(DescriptionAttribute), false).OfType<DescriptionAttribute>().First().Description); InitClientEvents(); connection.Start().Wait(); } string url; HubConnection connection; IHubProxy hubProxy; IJobProvider jobProvider; string id; /// <summary> /// /// </summary> void InitClientEvents() { hubProxy.On("GetId", (id) => GetId(id)); hubProxy.On("RunJob", (jobId) => RunJob(jobId)); } /// <summary> /// 執行任務 /// </summary> /// <param name="id"></param> void GetId(string id) { this.id = id; } /// <summary> /// 執行任務 /// </summary> /// <param name="jobId"></param> void RunJob(string jobId) { var startTime = DateTime.Now; jobProvider.Invoke(jobId); var stopTime = DateTime.Now; hubProxy.Invoke("Finished", new JobResultDto() { Id = id, JobId = jobId, ProcessedTime = (stopTime - startTime).TotalMilliseconds }); } }
客戶端的實現很簡單,核心就是經過構造函數注入任務提供接口,由接口經過任務Id來執行任務。
監視器具備三個公開行爲:
MemberJoin - 計算節點加入
MemberLeave - 計算節點離線
UpdateMemberStatisticsInfo - 更新節點統計信息
/// <reference path="jquery-2.1.1.js" /> /// <reference path="jquery.signalR-2.1.0.js" /> (function ($) { var members = []; var methods = { reloadList: function () { var list = ""; $.each(members, function (i, n) { list += "<li id='member_" + n.Id + "'>[" + n.Id + "]:AverageProcessedTime " + n.AverageProcessedTime + " Milliseconds</li>"; }); $('#members').html(list); } } var hubs = { clusterHub: $.connection.clusterHub, init: function () { $.connection.hub.logging = true; $.connection.hub.url = 'http://192.168.1.124:10086/signalr'; $.connection.hub.qs = { "ClientRole": "Monitor" } $.connection.hub.start().done(function () { }); } } var cluster = { on: { updateMemberStatisticsInfo: function (data) { $.each(members, function (i, n) { if (n.Id == data.Id) { n.AverageProcessedTime = data.AverageProcessedTime; return; } }); methods.reloadList(); }, memberJoin: function (id) { members.push({ "Id": id, "AverageProcessedTime": 0 }); methods.reloadList(); }, memberLeave: function (id) { members = $.grep(members, function (n) { return n.Id != id }); methods.reloadList(); } } } $(function () { hubs.clusterHub.client.UpdateMemberStatisticsInfo = cluster.on.updateMemberStatisticsInfo; hubs.clusterHub.client.MemberJoin = cluster.on.memberJoin; hubs.clusterHub.client.MemberLeave = cluster.on.memberLeave; hubs.init(); }); })(jQuery);
<!DOCTYPE html> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <title>集羣監視器</title> </head> <body> <div> <ul id="members"></ul> </div> <script src="scripts/jquery-2.1.1.min.js"></script> <script src="scripts/jquery.signalR-2.1.0.min.js"></script> <script src="http://192.168.1.124:10086/signalr/hubs"></script> <script src="scripts/core.js"></script> </body> </html>
監視器用real-time的Web平臺實現,一共註冊三個方法的實現。
Hub端啓動後,先啓動監視器,而後在不一樣的機器上啓動計算端,圖上是2個計算節點,監視器上也顯示着2個節點,每一個節點執行一個JobId後,監視器上就會刷新結果。
簡易集羣組件就到這兒了,本篇演示的是一個思路,能夠在這個基礎上深度擴展成文章開頭所描述的那樣,高性能高可用的基於SignalR的集羣組件。歡迎各位有興趣的同窗進行討論和拍磚。
轉載請註明出處:http://www.cnblogs.com/royding/p/3811169.html