上一篇演示了泛型Hub的實現,微軟於6月17日更新了SignalR 2.1.0,然後自帶了泛型Hub,於是就不需要自己去實現了…(微軟你為啥不早一個月自帶啊…)。不過沒關系,SignalR出彩之處不在泛型Hub,本篇為各位觀眾帶來了基於SignalR的簡易集群通訊組件Demo,可用於分布式定時任務。
說到集群,自然想到了NLB啊Cluster啊HPC啊等等。NLB受制於成員數量,Cluster用數量堆高可用性,HPC太復雜。本著SignalR的雙向異步通訊的特點,其實是可以用來玩彈性計算的。初始狀態由一台計算任務分發節點,一台監控以及一台計算節點構成。隨著任務分發隊列中的任務數越來越多,一台執行節點無法及時消耗待執行任務,達到某個阈值的時候,動態的加入一個計算節點來增加計算吞吐量。同樣的,當隊列中的任務基本處於很低的數量的時候,自動移除一個計算節點來減少資源消耗。當然,如果是大型的計算量之下,分發節點,隊列都應該是集群的,還要考慮各種計算節點故障之類的問題,這不在本篇考慮的范疇內,本篇以初始狀態模型來一步步實現簡易集群通訊組件。
好,廢話不說了,正篇開始。
/// <summary>
/// 集群交換器
/// </summary>
public class ClusterHub : Hub<IClusterClient>
{
/// <summary>
///
/// </summary>
static ClusterHub()
{
aliveDictionary = new ConcurrentDictionary<string, Guid>();
}
/// <summary>
///
/// </summary>
/// <param name="dispatcher"></param>
public ClusterHub(IDispatcher dispatcher)
{
this.dispatcher = dispatcher;
db = OdbFactory.Open(localDbFileName);
}
/// <summary>
/// 本地數據庫文件名
/// </summary>
const string localDbFileName = "ClusterStorage.dll";
/// <summary>
/// 監視器連接Id
/// </summary>
static string monitorConnectionId;
/// <summary>
/// 調度器
/// </summary>
IDispatcher dispatcher;
/// <summary>
/// 在線詞典
/// </summary>
static ConcurrentDictionary<string, Guid> aliveDictionary;
/// <summary>
///
/// </summary>
static IOdb db;
/// <summary>
/// 完成任務
/// </summary>
/// <param name="jobResult"></param>
public void Finished(Contracts.Messages.JobResultDto jobResult)
{
lock (db)
{
var members = db.AsQueryable<MemberDo>();
var member = members.SingleOrDefault(m => m.Id == Guid.Parse(jobResult.Id));
if (member != null)
{
member.UpdateStatisticsInfo(jobResult.ProcessedTime);
db.Store(member);
if (!string.IsNullOrWhiteSpace(monitorConnectionId))
{
Clients.Client(monitorConnectionId).UpdateMemberStatisticsInfo(new Contracts.Messages.MemberStatisticsInfoDto() { Id = member.Id.ToString(), AverageProcessedTime = member.AverageProcessedTime });
}
}
}
Clients.Caller.RunJob(dispatcher.GetJobId());
}
/// <summary>
/// 加入
/// </summary>
void Join()
{
object ip = string.Empty;
var isMonitor = Context.Request.QueryString["ClientRole"] == "Monitor";
Context.Request.Environment.TryGetValue("server.RemoteIpAddress", out ip);
lock (db)
{
var members = db.AsQueryable<MemberDo>();
var member = members.SingleOrDefault(m => m.Ip == ip.ToString() && m.IsMonitor == isMonitor);
if (member != null)
{
member.MemberStatusType = MemberStatusTypeEnum.Connectioned;
}
else
{
member = new MemberDo(ip.ToString(), isMonitor);
if (isMonitor)
{
monitorConnectionId = Context.ConnectionId;
}
}
db.Store(member);
aliveDictionary.TryAdd(Context.ConnectionId, member.Id);
if (!isMonitor)
{
if (!string.IsNullOrWhiteSpace(monitorConnectionId))
{
Clients.Client(monitorConnectionId).MemberJoin(member.Id);
}
Clients.Caller.GetId(member.Id.ToString());
Clients.Caller.RunJob(dispatcher.GetJobId());
}
}
}
/// <summary>
/// 離開
/// </summary>
void Leave()
{
var id = Guid.Empty;
aliveDictionary.TryRemove(Context.ConnectionId, out id);
lock (db)
{
var members = db.AsQueryable<MemberDo>();
var member = members.SingleOrDefault(m => m.Id == id);
if (member != null)
{
member.MemberStatusType = MemberStatusTypeEnum.Disconnectioned;
db.Store(member);
if (member.IsMonitor)
{
monitorConnectionId = string.Empty;
}
else if (!string.IsNullOrWhiteSpace(monitorConnectionId))
{
Clients.Client(monitorConnectionId).MemberLeave(id);
}
}
}
}
public override Task OnConnected()
{
Console.WriteLine(Context.ConnectionId+":Connected");
Join();
return base.OnConnected();
}
public override Task OnDisconnected()
{
Console.WriteLine(Context.ConnectionId + ":Disconnected");
Leave();
return base.OnDisconnected();
}
public override Task OnReconnected()
{
Console.WriteLine(Context.ConnectionId + ":Reconnected");
return base.OnReconnected();
}
}