一、為什麼要使用MSMQ
在一個分布式的環境中,我們往往需要根據具體的情況采用不同的方式進行數據的傳輸。比如在一個Intranet內,我們一般通過TCP進行高效的數據通信;而在一個Internet的環境中,我們則通常使用Http進行跨平台的數據交換。而這些通信方式具有一個顯著的特點,那就是他們是基於Connection的,也就是說,交互雙方在進行通信的時候必須保證有一個可用的Connection存在於他們之間。而在某些時候,比如那些使用撥號連接的用戶、以及使用便攜式計算機的用戶,我們不能保證在他們和需要訪問的Server之間有一個的可靠的連接,在這種情況下,基於Messaging Queue的連接就顯得尤為重要了。我們今天就來談談在WCF中如何使用MSMQ。
MSMQ不僅僅是作為支持客戶端連接工具而存在,合理的使用MSMQ可以在很大程度上提升系統的Performance和Scalability。我們先來看看MSMQ能給我們帶來怎樣的好處:
1.MSMQ是基於Disconnection
MSMQ通過Message Queue進行通信,這種通信方式為離線工作成為了可能。比如在介紹MSMQ時都會提到的Order Delivery的例子:在一個基於B2C的系統中,訂單從各種各樣的客戶傳來,由於 客戶的各異性,不能保證每個客戶在每時每刻都和用於接收訂單的Server保持一個可靠的連接,我們有時候甚至允許客戶即使在離線的情況下也可以遞交訂單(雖然訂單不能發送到訂單的接收方,但是我們可以通過某種機制保證先在本地保存該訂單,一旦連接建立,則馬上向接收方遞交訂單),而MSMQ則有效地提供了這樣的機制:Server端建立一個Message Queue來接收來個客戶的訂單,客戶端通過向該Message Queue發送承載了訂單數據的Message實現訂單的遞交。如果在客戶離線的情況下,他仍然可以通過客戶端程序進行訂單遞交的操作,存儲著訂單數據的Message會被暫時保存在本地的Message Queue中,一旦客戶聯機,MSMQ將Message從中取出,發送到真正的接收方,而這個動作對於用戶的透明的。
2.MSMQ天生是One-way、異步的
在MSMQ中,Message始終以One-way的方式進行發送,所以MSMQ具有天生的異步特性。所以MSMQ使用於那些對於用戶的請求,Server端無需立即響應的場景。也就是說Server對數據的處理無需和Client的數據的發送進行同步,它可以獨自地按照自己的Schedule進行工作。這可以避免峰值負載。比如Server端可以在一個相對低負載的時段(比如深夜)來對接收到的Order進行批處理,而無需一天24小時一直進行Order的監聽、接收和處理。
3.MSMQ能夠提供高質量的Reliable Messaging
我們知道,在一般的情況下,如果Client端以異步的方式對Service進行調用就意味著:Client無法獲知Message是否成功抵達Service端;也不會獲得Service端執行的結果和出錯信息。但是我們仍然說MSMQ為我們提供了可靠的傳輸(Reliable Messaging),這主要是因為MSMQ為我們提供一些列Reliable Messaging的機制:
超時機制(Timeout):可以設置發送和接收的時間,超出該時間則被認為操作失敗。
確認機制(Acknowledgement):當Message成功抵達Destination Queue,或者被成功接收,向發送端發送一個Acknowledgement message用以確認操作的狀態。
日志機制(Journaling):當Message被發送或接收後,被Copy一份存放在Journal Queue中。
此外,MSMQ還提供了死信隊列(Dead letter Queue)用以保存發送失敗的message。這一切保證了保證了Reliable Messaging。
二、MSMQ在WCF的運用
在WCF中,MSMQ提供的數據傳輸功能被封裝在一個Binding中,提供WCF Endpoint之間、以及Endpoint和現有的基於MSMQ的Application進行通信的實現。為此WCF為我們提供了兩種不同的built-in binding:
NetMsmqBinding:從提供的功能和使用 方式上看,NetMsmqBinding和一般使用的binding,比如basicHttpBinding,netTcpBinding沒有什麼區別:在兩個Endpoint之間實現了數據的通信,所不同的是,它提供的是基於MSMQ的Reliable Messaging。從變成模式上看,和一般的binding完全一樣。
MsmqIntegrationBinding:從命名上我們可以看出,MsmqIntegrationBinding主要用於需要將我們的WCF Application和現有的基於MSMQ的Application集成的情況。MsmqIntegrationBinding實現了WCF Endpoint和某個Message Queue進行數據的通信,具體來說,就是實現了單一的向某個Message Queue 發送Message,和從某個Message Queue中接收Message的功能。從編程模式上看,也有所不同,比如Operation只接收一個MsmqMessage<T>的參數。
這是Client和Service通信的圖示:
三、MSMQ和Transaction
MSMQ提供對Transaction的支持。在一般的情況下,MSMQ通過Message Queue Transaction實現對Transaction的原生的支持,借助Message Queue Transaction,可以把基於一個或多個Message Queue的相關操作納入同一個Transaction中。
Message Queue Transaction僅僅限於基於Message Queue的操作,倘若操作涉及到另外一些資源,比如SQL Server, 則可以使用基於DTC的分布式Transaction。
對於WCF中MSMQ,由於Client和Service的相對獨立(可能Client發送Message到Service處理Message會相隔很長一段時間),所以Client和Service的操作只能納入不同的Transaction中,如下圖。
四、Sample1:NetMsmqBinding
我們首先做一個基於NetMsmqBinding Sample,實現的功能就是我們開篇所提出的Order Delivery。我們說過,NetMsmqBinding和一般的binding在實現的功能和變成模式上完全一樣。下面是我們熟悉的4層結構:
1.Contract
DataContract:Order & OrderItem
using System; using System.Collections.Generic; using System.Text; using System.Runtime.Serialization; namespace Artech.QueuedService.Contract { [DataContract] [KnownType(typeof(OrderItem))] public class Order { Private Fields#region Private Fields private Guid _orderNo; private DateTime _orderDate; private Guid _supplierID; private string _supplierName; private IList<OrderItem> _orderItems; #endregion Constructors#region Constructors public Order() { this._orderItems = new List<OrderItem>(); } public Order(Guid orderNo, DateTime orderDate, Guid supplierID, string supplierName) { this._orderNo = orderNo; this._orderDate = orderDate; this._supplierID = supplierID; this._supplierName = supplierName; this._orderItems = new List<OrderItem>(); } #endregion Public Properties#region Public Properties [DataMember] public Guid OrderNo { get { return _orderNo; } set { _orderNo = value; } } [DataMember] public DateTime OrderDate { get { return _orderDate; } set { _orderDate = value; } } [DataMember] public Guid SupplierID { get { return _supplierID; } set { _supplierID = value; } } [DataMember] public string SupplierName { get { return _supplierName; } set { _supplierName = value; } } [DataMember] public IList<OrderItem> OrderItems { get { return _orderItems; } set { _orderItems = value; } } #endregion Public Methods#region Public Methods public override string ToString() { string description = string.Format("General Informaion:\n\tOrder No.\t: {0}\n\tOrder Date\t: {1}\n\tSupplier No.\t: {2}\n\tSupplier Name\t: {3}", this._orderNo, this._orderDate.ToString("yyyy/MM/dd"), this._supplierID, this._supplierName); StringBuilder productList = new StringBuilder(); productList.AppendLine("\nProducts:"); int index = 0; foreach (OrderItem item in this._orderItems) { productList.AppendLine(string.Format("\n{4}. \tNo.\t\t: {0}\n\tName\t\t: {1}\n\tPrice\t\t: {2}\n\tQuantity\t: {3}", item.ProductID, item.ProductName, item.UnitPrice, item.Quantity, ++index)); } return description + productList.ToString(); } #endregion } }
3.Hosting
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
<message clientCredentialType="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<services>
<service name="Artech.QueuedService.Service. OrderProcessorService">
<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor" />
</service>
</services>
</system.serviceModel>
</configuration>
在默認的情況下,netMsmqBinding 的msmqAuthenticationMode為WindowsDomain,由於基於WindowsDomain必須安裝AD,利於在本機模擬,我把msmqAuthenticationMode改為None,相應的ProtectionLevel和clientCredentialType改為None。
Program:
using System; using System.Collections.Generic; using System.Text; using System.Messaging; using System.ServiceModel; using Artech.QueuedService.Service; namespace Artech.QueuedService.Hosting { class Program { static void Main(string[] args) { string path = @".\private$\orders"; if(!MessageQueue.Exists(path)) { MessageQueue.Create(path,true); } using (ServiceHost host = new ServiceHost(typeof(OrderProcessorService))) { host.Opened += delegate { Console.WriteLine("Service has begun to listen\n\n"); }; host.Open(); Console.Read(); } } } }
在Host Service之前,通過MessageQueue.Create創建一個Message Queue,第二個參數為表明Queue是否支持Transaction的indicator,這裡支持Transaction。
4.Client:
Configuration
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<system.serviceModel>
<bindings>
<netMsmqBinding>
<binding name="msmqBinding">
<security>
<transport msmqAuthenticationMode="None" msmqProtectionLevel="None" />
<message clientCredentialType="None" />
</security>
</binding>
</netMsmqBinding>
</bindings>
<client>
<endpoint address="net.msmq://localhost/private/orders" binding="netMsmqBinding"
bindingConfiguration="msmqBinding" contract="Artech.QueuedService.Contract.IOrderProcessor"
name="defaultEndpoint" />
</client>
</system.serviceModel>
</configuration>
Program
using System;
using System.Collections.Generic;
using System.Text;
using Artech.QueuedService.Contract;
using System.ServiceModel;
using System.Transactions;
namespace Artech.QueuedService.Client
{
class Program
{
static void Main(string[] args)
{
ChannelFactory<IOrderProcessor> channelFactory = new ChannelFactory<IOrderProcessor>("defaultEndpoint");
IOrderProcessor channel = channelFactory.CreateChannel();
Order order = new Order(Guid.NewGuid(),DateTime.Today,Guid.NewGuid(),"A Company");
order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"PC",5000,20));
order.OrderItems.Add(new OrderItem(Guid.NewGuid(),"Printer",7000,2));
Console.WriteLine("Submit order to server");
using (TransactionScope scope = new TransactionScope(TransactionScopeOption.Required))
{
channel.Submit(order);
scope.Complete();
}
Console.Read();
}
}
}
先後運行Host和Client,Host端有下面的輸出: