一、One-way MEP V.S. Responsible Service
我們知道MSMQ天生就具有異步的特性,它只能以One-way的MEP(Message Exchange Pattern)進行通信。Client和Service之間采用One-way MEP的話就意味著Client調用Service之後立即返回,它無法獲得Service的執行結果,也無法捕捉Service運行的Exception。下圖簡單表述了基於MSMQ的WCF Service中Client和Service的交互。
但是在有些場景 中,這是無法容忍的。再拿我在上一篇文章的Order Delivery的例子來說。Client向Service提交了Order,卻無法確認該Order是否被Service正確處理,這顯然是不能接受的。我們今天就來討論一下,如何創建一個Responsive Service來解決這個問題:Client不再是對Service的執行情況一無所知,它可以獲知Order是否被Service正確處理了。
二、Solution
雖然我們的目的很簡單:當Client向Service遞交了Order之後,能以某種方式獲知Order的執行結果;對於Service端來說,在正確把Order從Message Queue中獲取出來、並正確處理之後,能夠向Order的遞交者發送一個Acknowledge Message。為了簡單起見,這個Acknowledge Message包含兩組信息:
Order No.: 被處理的Order的一個能夠為一標志它的ID。
Exception: 如果處理失敗的Exception,如果成功處理為null。
要在WCF中實現這樣的目的,對於Request/Reply MEP來說是簡單而直接的:Client向Service遞交Order,並等待Service的Response,Service在處理接收到Order之後直接將處理結果 返回給Client就可以了。但是我們說過MSMQ天生就是異步的,我們只有采取一種間接的方式實現“曲線救國”。
我們的解決方案是:在每個Client Domain也創建一個基於MSMQ的本地的WCF Service,用於接收來自Order處理端發送的Acknowledge Message。對於處理Order 的Service來說,在正確處理Order之後,想對應的Client發送Acknowledge Message。下圖簡單演示整個過程:
三、Implementation
了解了上面的Solution之後,我們來看看該Solution在真正實現過程中有什麼樣的困難。對於處理Order的Service來說,在向Client端發送Acknowledge Message的時候,它必須要知道該Order對應的Client的Response Service的MSMQ的Address以及其他和Operation相關的Context信息(在這裡我們不需要,不過考慮到擴展性,我們把包括了address的Context的信息 封裝到一個了Class中,在這裡叫做:OrderResponseContext)。而這些Context卻不能在Configuration中進行配置,因為他可以同時面臨著很多個Client:比如每個Client用於接收Response 的Message Queue的address都不一樣。所以這個OrderResponseContext必須通過對應的Client來提供。基於此,我們具有兩面兩種解決方式:
方式一、修改Service Contract,把OrderResponseContext當成是Operation的一個參數
這是我們最容易想到的,比如我們原來的Operation這樣定義:
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order);
}
}
現在變成:
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order, OrderResponseContext responseContext);
}
}
雖然這種方式看起來不錯,但是卻不值得推薦。在一般情況下,我們的Contract需要是很穩定的,一經確定就不能輕易更改,因為Contract是被交互的多方共同支持的,牽一發動全身;此外,從Service Contract代表的是Service的一個Interface,他是對業務邏輯的抽象、和具體實現無關,而對於我們的例子來說,我們僅僅是定義一個遞交Order的Operation,從業務邏輯來看,OrderResponseContext和抽象的業務邏輯毫無關系。基於此,我們需要尋求一種和Service Contract無關的解決方式:
方式二、將OrderResponseContext放到Soap Message 的Header中
其實我們要解決的問題很簡單,就是要把OrderResponseContext的信息置於Soap Message中發送到Service。而我們知道,Soap的Header具有極強的可伸縮性,原則上,我們可以把任何控制信息置於Header中。基於WCF的編程模式很容易地幫助我們實現對Soap Header的插入和獲取:
我們可以通過下面的方式獲得當前Operation Context的Incoming Message Headers和Outgoing Message Headers
OperationContext.Current.IncomingMessageHeaders
OperationContext.Current.OutgoingMessageHeaders
如果我們要把一個OrderResponseContext 對象插入到當前Operation Context的Outgoing Message Headers中,我們可以通過下面的代碼來實現:
OrderResponseContext context = new OrderResponseContext();
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>( context);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("name", "namespace"));
相應的,我們可以通過下面的代碼從Outgoing Message Headers OrderResponseContext的數據獲取的內容:
OrderResponseContextcontext=OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("name","namespace"));
四、Sample
我們照例給出一個完整的Sample,下面是整個Solution的結構:
除了一貫使用的4層結構(Contract-Service-Hosting-Client),還為ResponseService增加了下面兩層:
Localservice: 作為Client Domain的ResponseService。
LocalHosting:Host Localservice。
1.Contract: Artech.ResponsiveQueuedService.Contract
Service Contract: Artech.ResponsiveQueuedService.Contract. IOrderProcessor
using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
[ServiceKnownType(typeof(Order))]
public interface IOrderProcessor
{
[OperationContract(IsOneWay = true)]
void Submit(Order order);
}
}
Service Contract: Artech.ResponsiveQueuedService.Contract.IOrderRessponse
using System;
using System.Collections.Generic;
using System.Text;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[ServiceContract]
public interface IOrderRessponse
{
[OperationContract(IsOneWay =true)]
void SubmitOrderResponse(Guid orderNo,FaultException exception);
}
}
接收來自Order processing端的Response:Order No.和Exception。
Data Contract: Artech.ResponsiveQueuedService.Contract.Order
using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; namespace Artech.ResponsiveQueuedService.Contract { [DataContract] public class Order { Private Fields#region Private Fields private Guid _orderNo; private DateTime _orderDate; private Guid _supplierID; private string _supplierName; #endregion Constructors#region Constructors public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName) { this._orderNo = orderNo; this._orderDate = orderDate; this._supplierID = supplierID; this._supplierName = supplierName; } #endregion Public Properties#region Public Properties [DataMember] public Guid OrderNo { get { return _orderNo; } set { _orderNo = value; } } [DataMember] public DateTime OrderDate { get { return _orderDate; } set { _orderDate = value; } } [DataMember] public Guid SupplierID { get { return _supplierID; } set { _supplierID = value; } } [DataMember] public string SupplierName { get { return _supplierName; } set { _supplierName = value; } } #endregion Public Methods#region Public Methods public override string ToString() { string description = string.Format("Order No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName); return description; } #endregion } }
對Order的封裝。
Data Contract:Artech.ResponsiveQueuedService.Contract. OrderResponseContext
using System;
using System.Collections.Generic;
using System.Text;
using System.Runtime.Serialization;
using System.ServiceModel;
namespace Artech.ResponsiveQueuedService.Contract
{
[DataContract]
public class OrderResponseContext
{
private Uri _responseAddress;
[DataMember]
public Uri ResponseAddress
{
get { return _responseAddress; }
set { _responseAddress = value; }
}
public static OrderResponseContext Current
{
get
{
if (OperationContext.Current == null)
{
return null;
}
return OperationContext.Current.IncomingMessageHeaders.GetHeader<OrderResponseContext>("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract");
}
set
{
MessageHeader<OrderResponseContext> header = new MessageHeader<OrderResponseContext>(value);
OperationContext.Current.OutgoingMessageHeaders.Add(header.GetUntypedHeader("OrderResponseContext", "Artech.ResponsiveQueuedService.Contract"));
}
}
}
}
ResponseAddress代表Host在Client Domain的Response Service的Address。同過Current把OrderResponseContext插入到Outgoing Message Headers中、以及從Ingoing Message Headers取出OrderResponseContext對象。
2.Order Processing Service:Artech.ResponsiveQueuedService.Service
using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel; using System.Net.Security; namespace Artech.ResponsiveQueuedService.Service { public class OrderProcessorService:IOrderProcessor { private void ProcessOrder(Order order) { if (order.OrderDate < DateTime.Today) { throw new Exception(); } } IOrderProcessor Members#region IOrderProcessor Members public void Submit(Order order) { Console.WriteLine("Begin to process the order of the order No.: {0}", order.OrderNo); FaultException exception= null; if (order.OrderDate < DateTime.Today) { exception = new FaultException(new FaultReason("The order has expried"), new FaultCode("sender")); Console.WriteLine("It's fail to process the order.\n\tOrder No.: {0}\n\tReason:{1}", order.OrderNo, "The order has expried"); } else { Console.WriteLine("It's successful to process the order.\n\tOrder No.: {0}", order.OrderNo); } NetMsmqBinding binding = new NetMsmqBinding(); binding.ExactlyOnce = false; binding.Security.Transport.MsmqAuthenticationMode = MsmqAuthenticationMode.None; binding.Security.Transport.MsmqProtectionLevel = ProtectionLevel.None; ChannelFactory<IOrderRessponse> channelFacotry = new ChannelFactory<IOrderRessponse>(binding); OrderResponseContext responseContext = OrderResponseContext.Current; IOrderRessponse channel = channelFacotry.CreateChannel(new EndpointAddress(responseContext.ResponseAddress)); using (OperationContextScope contextScope = new OperationContextScope(channel as IContextChannel)) { channel.SubmitOrderResponse(order.OrderNo, exception); } } #endregion } }
在這裡我們模擬了這樣的場景:先通過Order Date判斷Order是否過期,如果過期創建一個FaultException,否則正確處理該Order,然後通過OrderResponseContext.Current從Incoming Message Header中獲取封裝在OrderResponseContext對象中的Response Address,創建Binding並調用Response Service.
3. Order Processing Service Hosting: Artech.ResponsiveQueuedService.Hosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value=".\private$\orderprocessor"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.ResponsiveQueuedService.Service.OrderProcessorService">
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" />
</service>
</services>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.Service;
using System.ServiceModel;
using System.Configuration;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Hosting
{
class Program
{
static void Main(string[] args)
{
string path = ConfigurationManager.AppSettings["msmqPath"];
if (!MessageQueue.Exists(path))
{
MessageQueue.Create(path);
}
using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService)))
{
host.Opened += delegate
{
Console.WriteLine("The Order Processor service has begun to listen");
};
host.Open();
Console.Read();
}
}
}
}
4. Response Service: Artech.ResponsiveQueuedService.LocalService.OrderRessponseService
using System; using System.Collections.Generic; using System.Text; using Artech.ResponsiveQueuedService.Contract; using System.ServiceModel; namespace Artech.ResponsiveQueuedService.LocalService { public class OrderRessponseService : IOrderRessponse { IOrderRessponse Members#region IOrderRessponse Members public void SubmitOrderResponse(Guid orderNo, FaultException exception) { if (exception == null) { Console.WriteLine("It's successful to process the order!\n\tOrder No.: {0}",orderNo); } else { Console.WriteLine("It's fail to process the order!\n\tOrder No.: {0}\n\tReason: {1}", orderNo, exception.Message); } } #endregion } }
5. Response Service Hosting: Artech.ResponsiveQueuedService.LocalhHosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value=".\private$\orderresponse"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding" exactlyOnce="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.ResponsiveQueuedService.LocalService.OrderRessponseService">
<endpoint address="net.msmq://localhost/private/orderresponse" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderRessponse" />
</service>
</services>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.ResponsiveQueuedService.LocalService;
using System.Configuration;
using System.ServiceModel;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.LocalhHosting
{
class Program
{
static void Main(string[] args)
{
string path = ConfigurationManager.AppSettings["msmqPath"];
if (!MessageQueue.Exists(path))
{
MessageQueue.Create(path);
}
using (ServiceHost host = new ServiceHost(typeof(OrderRessponseService)))
{
host.Opened += delegate
{
Console.WriteLine("The Order Response service has begun to listen");
};
host.Open();
Console.Read();
}
}
}
}
6. Client: Artech.ResponsiveQueuedService.Client
Configuration:
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<appSettings>
<add key="msmqPath" value="net.msmq://localhost/private/orderresponse"/>
</appSettings>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="MsmqBinding" exactlyOnce="false" useActiveDirectory="false">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<client>
<endpoint address="net.msmq://localhost/private/orderprocessor" binding="netMsmqBinding"
bindingConfiguration="MsmqBinding" contract="Artech.ResponsiveQueuedService.Contract.IOrderProcessor" name="defaultEndpoint" />
</client>
</system.serviceModel>
</configuration>
Program:
using System;
using System.Collections.Generic;
using System.Text;
using System.Configuration;
using System.ServiceModel;
using Artech.ResponsiveQueuedService.Contract;
using System.Messaging;
namespace Artech.ResponsiveQueuedService.Clinet
{
class Program
{
static void Main(string[] args)
{
Order order1 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(5), Guid.NewGuid(), "Supplier A");
Order order2 = new Order(Guid.NewGuid(), DateTime.Today.AddDays(-5), Guid.NewGuid(), "Supplier A");
string path = ConfigurationManager.AppSettings["msmqPath"];
Uri address = new Uri(path);
OrderResponseContext context = new OrderResponseContext();
context.ResponseAddress = address;
ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
IOrderProcessor orderProcessor = channelFactory.CreateChannel();
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
{
Console.WriteLine("Submit the order of order No.: {0}", order1.OrderNo);
OrderResponseContext.Current = context;
orderProcessor.Submit(order1);
}
using (OperationContextScope contextScope = new OperationContextScope(orderProcessor as IContextChannel))
{
Console.WriteLine("Submit the order of order No.: {0}", order2.OrderNo);
OrderResponseContext.Current = context;
orderProcessor.Submit(order2);
}
Console.Read();
}
}
}
我創建了兩個Order對象, 其中一個已經過期。從Configuration中取出Response Address並購建一個OrderResponseContext,然後分兩次將這兩個Order向Order Processing Service遞交。在調用Order Processing Order的Operation Context Scope中,通過OrderResponseContext.Current將OrderResponseContext對象插入Outcoming Message Header中。
我們現在運行一下整個程序,看看最終的輸出結果:
Client:
Order Processing:
Order Response:
本文配套源碼