在這篇文章中,EventBus實現 - 發布訂閱 - XML加載 所適用的范圍只是本機的事件傳播,要是牽 涉到多台服務器之間的事件傳播就不行了,解決辦法有用msmq解決的,Node.js解決的,也有用redis的 發布訂閱解決的,這次用C# socket來實現,能實現立刻推送事件到所有訂閱了相關event的server上。
這次的子系統適用的場景如下:
主要分2個部分:各個server使用的Event Bus Broker以及Event Bus Server。
Broker與Server之間的通信協議就3個:ME、Subscribe、Publish。分別代表:我的名字是、我要訂 閱的事件是、我觸發事件。
Event Bus Server是基於SuperSocket開源組件寫的,非常方便。實現了3個對應的命令類(建議大 家先看看SuperSocket的文檔),如下:
public class ME : CommandBase<EventDispatcherBusSession, StringRequestInfo> { public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity)); } } public class Publish : CommandBase<EventDispatcherBusSession, StringRequestInfo> { public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]); foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions()) { if (s.AssociatedServerIdentity == session.AssociatedServerIdentity) continue; if (s.SubscribedEventClasses.Contains(evtClassPath)) { s.Send("Publish " + requestInfo.Body);//forward only Console.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity)); } } } } public class Subscribe : CommandBase<EventDispatcherBusSession, StringRequestInfo> { public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo) { string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]); if (session.SubscribedEventClasses.Contains(classPath)) return; session.SubscribedEventClasses.Add(classPath); string msg = ""; msg+=string.Format("[{0}], now subscribed these events:\r\n", session.AssociatedServerIdentity); foreach(string evtClass in session.SubscribedEventClasses) msg += string.Format(" {0}\r\n", evtClass); Console.WriteLine(msg); } }
啟動server代碼:
static void Main(string[] args) { var appServer = new EventDispatcherBusServer(); if (!appServer.Setup(2012)) { Console.WriteLine("Failed to setup!"); Console.ReadKey(); return; } Console.WriteLine(); if (!appServer.Start()) { Console.WriteLine("Failed to start!"); Console.ReadKey(); return; } Console.WriteLine("The server started successfully, press key 'q' to stop it!"); Console.ReadKey(); appServer.Stop(); }
server端沒多少好說的,都是很簡單的,下面來看看客戶端:
static void Main(string[] args) { EventBusClientBroker busBroker = new EventBusClientBroker("127.0.0.1", 2012, "App server 2", "AppEvents.dll"); busBroker.NewEventReceived += new EventReceived(busBroker_NewEventReceived);//新event被推送過來的事件 busBroker.Connect(); busBroker.Subscribe<NewUserRegisteredEvent>();//訂閱Event //busBroker.Close(); } static void busBroker_NewEventReceived(string evtClass, object evt) { //這裡會將event對象發過來 //evtClass是evt的class全路徑 if (evt is NewUserRegisteredEvent) Console.WriteLine(((NewUserRegisteredEvent)evt).UserName); else if (evt is UserProfileUpdatedEvent) Console.WriteLine(((UserProfileUpdatedEvent)evt).UserID); }
如果要觸發一個事件,則:
NewUserRegisteredEvent evt = new NewUserRegisteredEvent(); evt.RegisterDate = DateTime.Now; evt.UserName = "aaron"; busBroker.Publish<NewUserRegisteredEvent>(evt);
我們來看看效果圖:
搞定,代碼下載:http://files.cnblogs.com/aarond/EventBus2.rar
查看本欄目