先上一張本文所描述的適用場景圖
分布式場景,共3台server:
前端Server
Order App Server
Warehouse App Server
功能:
前端Server可以不停的發送Command到CommandBus,然後由CommandBus分配不 同的Command到各自的app server去處理。
前端Server可以只發送Command而不必等待Response
前端Server可以同步等待Response返回
MSMQ消息超過3.5M會自動轉為網絡共享方式傳輸消息
對於同一Command的處理,可以通過增加App Server的方式來提高並發處理速 度(比如:可以開2個app server instance來同時處理ACommand的處理)
在App server中能讀取前端Server的Session value(寫Session還不支持)
本文目標是用msmq實現分布式CommandBus的應用(已經更新到A2D Framework 中了)。
前端Server發送Command的方式(異步):
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult> (cmdDistributer_ResultCatached);
cmdDistributer.SendRequest(cmd);
同步方式:
ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.SendRequest(cmd);
ACommandResult result=cmdDistributer.WaitResponse();
配置文件:
<?xml version="1.0" encoding="utf-8" ? >
<CommandBusSetting>
<AutoCreateIfNotExists>true</AutoCreateIfNotExists>
<WebSessionSupport>true</WebSessionSupport>
<CommandQueue>PC-20130606HCVP\private$\Commands_{0} </CommandQueue>
<ResponseQueue>PC- 20130606HCVP\private$\CommandResponses</ResponseQueue>
<NetworkLocation>\\PC- 20130606HCVP\network</NetworkLocation>
</CommandBusSetting>
Command的編寫方式:
[QueueName("ACommand")]//這個可選,沒有QueueName時,默認對應 的msmq隊列名為類名,此處為ACommand public class ACommand : BaseCommand //需要繼承自BaseCommand { public string Tag { get; set; }//自定義的屬性 }
後端App Server的編寫
CommandHandlerHost1: Console程序,相當於App Server 1,會處理部分 Command
static void Main(string[] args) { Thread.Sleep(2000); CommandHandlerListener listener = new CommandHandlerListener(); listener.AddHandler(new TestCommandHandlers()); listener.AddHandler(new Test2CommandHandlers()); listener.Start(); Console.ReadKey(); }
CommandHandlerHost2: Console程序,相當於App Server 2,會處理部分 Command
static void Main(string[] args) { Thread.Sleep(2000); CommandHandlerListener listener = new CommandHandlerListener(); listener.AddHandler(new Test3CommandHandlers()); listener.Start(); Console.ReadKey(); }
CommandHandlers: 所有的Command處理函數都會在這個項目中實現
public class TestCommandHandlers : ICommandHandlers, ICommandHandler<ACommand, ACommandResult>, ICommandHandler<BCommand, BCommandResult> { public ACommandResult Handler(ACommand cmd, ISessionContext session) { Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag); ACommandResult result = new ACommandResult(); result.Result = "result from ACommand"; return result; } public BCommandResult Handler(BCommand cmd, ISessionContext session) { Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag); if (session != null) { Console.WriteLine("session not null"); object o = session.GetWebSession("key1"); if (o != null) { Console.WriteLine("From Session['key1']: " + Convert.ToString(o)); //session.Set("key1", "changed from command handler: " + DateTime.Now); } } BCommandResult result = new BCommandResult(); result.Result = "result from BCommand"; return result; } }
下面是目前的性能測試:
查看本欄目