程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> SignalR循序漸進(三)簡易的集群通訊組件

SignalR循序漸進(三)簡易的集群通訊組件

編輯:C#入門知識

上一篇演示了泛型Hub的實現,微軟於6月17日更新了SignalR 2.1.0,然後自帶了泛型Hub,於是就不需要自己去實現了…(微軟你為啥不早一個月自帶啊…)。不過沒關系,SignalR出彩之處不在泛型Hub,本篇為各位觀眾帶來了基於SignalR的簡易集群通訊組件Demo,可用於分布式定時任務。

說到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制於成員數量,Cluster用數量堆高可用性,HPC太復雜。本著SignalR的雙向異步通訊的特點,其實是可以用來玩彈性計算的。初始狀態由一台計算任務分發節點,一台監控以及一台計算節點構成。隨著任務分發隊列中的任務數越來越多,一台執行節點無法及時消耗待執行任務,達到某個阈值的時候,動態的加入一個計算節點來增加計算吞吐量。同樣的,當隊列中的任務基本處於很低的數量的時候,自動移除一個計算節點來減少資源消耗。當然,如果是大型的計算量之下,分發節點,隊列都應該是集群的,還要考慮各種計算節點故障之類的問題,這不在本篇考慮的范疇內,本篇以初始狀態模型來一步步實現簡易集群通訊組件。

好,廢話不說了,正篇開始。

任務分發節點

/// <summary> /// 集群交換器 /// </summary> public class ClusterHub : Hub<IClusterClient> { /// <summary> /// /// </summary> static ClusterHub() { aliveDictionary = new ConcurrentDictionary<string, Guid>(); } /// <summary> /// /// </summary> /// <param name="dispatcher"></param> public ClusterHub(IDispatcher dispatcher) { this.dispatcher = dispatcher; db = OdbFactory.Open(localDbFileName); } /// <summary> /// 本地數據庫文件名 /// </summary> const string localDbFileName = "ClusterStorage.dll"; /// <summary> /// 監視器連接Id /// </summary> static string monitorConnectionId; /// <summary> /// 調度器 /// </summary> IDispatcher dispatcher; /// <summary> /// 在線詞典 /// </summary> static ConcurrentDictionary<string, Guid> aliveDictionary; /// <summary> /// /// </summary> static IOdb db; /// <summary> /// 完成任務 /// </summary> /// <param name="jobResult"></param> public void Finished(Contracts.Messages.JobResultDto jobResult) { lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id)); if (member != null) { member.UpdateStatisticsInfo(jobResult.ProcessedTime); db.Store(member); if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime }); } } } Clients.Caller.RunJob(dispatcher.GetJobId()); } /// <summary> /// 加入 /// </summary> void Join() { object ip = string.Empty; var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor"; Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip); lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor); if (member != null) { member.MemberStatusType = MemberStatusTypeEnum.Connectioned; } else { member = new MemberDo(ip.ToString(), isMonitor); if (isMonitor) { monitorConnectionId = Context.ConnectionId; } } db.Store(member); aliveDictionary.TryAdd(Context.ConnectionId, member.Id); if (!isMonitor) { if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).MemberJoin(member.Id); } Clients.Caller.GetId(member.Id.ToString()); Clients.Caller.RunJob(dispatcher.GetJobId()); } } } /// <summary> /// 離開 /// </summary> void Leave() { var id = Guid.Empty; aliveDictionary.TryRemove(Context.ConnectionId, out id); lock (db) { var members = db.AsQueryable<MemberDo>(); var member = members.SingleOrDefault(m => m.Id == id); if (member != null) { member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned; db.Store(member); if (member.IsMonitor) { monitorConnectionId = string.Empty; } else if (!string.IsNullOrWhiteSpace(monitorConnectionId)) { Clients.Client(monitorConnectionId).MemberLeave(id); } } } } public override Task OnConnected() { Console.WriteLine(Context.ConnectionId+":Connected"); Join(); return base.OnConnected(); } public override Task OnDisconnected() { Console.WriteLine(Context.ConnectionId + ":Disconnected"); Leave(); return base.OnDisconnected(); } public override Task OnReconnected() { Console.WriteLine(Context.ConnectionId + ":Reconnected"); return base.OnReconnected(); } }

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved