程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> 關於C# >> 分布式EventBus的Socket實現:發布訂閱

分布式EventBus的Socket實現:發布訂閱

編輯:關於C#

在這篇文章中,EventBus實現 - 發布訂閱 - XML加載 所適用的范圍只是本機的事件傳播,要是牽 涉到多台服務器之間的事件傳播就不行了,解決辦法有用msmq解決的,Node.js解決的,也有用redis的 發布訂閱解決的,這次用C# socket來實現,能實現立刻推送事件到所有訂閱了相關event的server上。

這次的子系統適用的場景如下:

主要分2個部分:各個server使用的Event Bus Broker以及Event Bus Server。

Broker與Server之間的通信協議就3個:ME、Subscribe、Publish。分別代表:我的名字是、我要訂 閱的事件是、我觸發事件。

Event Bus Server是基於SuperSocket開源組件寫的,非常方便。實現了3個對應的命令類(建議大 家先看看SuperSocket的文檔),如下:

public class ME : CommandBase<EventDispatcherBusSession, StringRequestInfo>
    {
        public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
        {
            session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
            Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity));
        }
    }
    
public class Publish : CommandBase<EventDispatcherBusSession, StringRequestInfo>
    {
        public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
        {
            string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
            string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]);
    
            foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions())
            {
                if (s.AssociatedServerIdentity == session.AssociatedServerIdentity)
                    continue;
    
                if (s.SubscribedEventClasses.Contains(evtClassPath))
                {
                    s.Send("Publish " + requestInfo.Body);//forward only
                    Console.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity));
                }
            }
        }
    }
    
public class Subscribe : CommandBase<EventDispatcherBusSession, StringRequestInfo>
    {
        public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo)
        {
            string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);
            if (session.SubscribedEventClasses.Contains(classPath))
                return;
    
            session.SubscribedEventClasses.Add(classPath);
    
            string msg = "";
            msg+=string.Format("[{0}], now subscribed these events:\r\n", session.AssociatedServerIdentity);
            foreach(string evtClass in session.SubscribedEventClasses)
                msg += string.Format("              {0}\r\n", evtClass);
            Console.WriteLine(msg);
        }
    }

啟動server代碼:

static void Main(string[] args)
        {
            var appServer = new EventDispatcherBusServer();
    
            if (!appServer.Setup(2012))
            {
                Console.WriteLine("Failed to setup!");
                Console.ReadKey();
                return;
            }
    
            Console.WriteLine();
    
            if (!appServer.Start())
            {
                Console.WriteLine("Failed to start!");
                Console.ReadKey();
                return;
            }
    
            Console.WriteLine("The server started successfully, press key 'q' to stop it!");
    
            Console.ReadKey();
    
            appServer.Stop();
        }

server端沒多少好說的,都是很簡單的,下面來看看客戶端:

static void Main(string[] args)
        {
            EventBusClientBroker busBroker = new EventBusClientBroker("127.0.0.1", 2012, "App server 2", "AppEvents.dll");
            busBroker.NewEventReceived += new EventReceived(busBroker_NewEventReceived);//新event被推送過來的事件
            busBroker.Connect();
    
            busBroker.Subscribe<NewUserRegisteredEvent>();//訂閱Event
    
            //busBroker.Close();
        }
    
        static void busBroker_NewEventReceived(string evtClass, object evt)
        {
            //這裡會將event對象發過來
            //evtClass是evt的class全路徑
            if (evt is NewUserRegisteredEvent)
                Console.WriteLine(((NewUserRegisteredEvent)evt).UserName);
            else if (evt is UserProfileUpdatedEvent)
                Console.WriteLine(((UserProfileUpdatedEvent)evt).UserID);
        }

如果要觸發一個事件,則:

NewUserRegisteredEvent evt = new NewUserRegisteredEvent();
 evt.RegisterDate = DateTime.Now;
 evt.UserName = "aaron";
 busBroker.Publish<NewUserRegisteredEvent>(evt);

我們來看看效果圖:

搞定,代碼下載:http://files.cnblogs.com/aarond/EventBus2.rar

查看本欄目

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved