在我 2009 年 10 月的專欄文章“服務總線中的路由器”(msdn.microsoft.com/magazine/ee335696) 中,我提出 Windows Azure AppFabric 服務總線未來可能的發展方向:成為最終的偵聽器。我提出了路由器功能,並承諾下一步將寫寫隊列。
自那之後,路由器和隊列已被推遲到服務總線的第二個版本,暫時代之以由服務總線提供緩沖區。未來版本可能會增加日志記錄、診斷和各種檢測選項。我會在以後的文章中講述這些方面。在本文中,我將對緩沖區加以說明,也會向您展示一些先進的 Windows Communication Foundation (WCF) 編程技術。
服務總線緩沖區
在服務總線中,服務命名空間的每一個 URI 實際上都是一個可尋址的消息系統交接點。客戶端可以將消息發送到這個交接點,交接點可以將其轉發到服務。不過,每個交接點也可以作為一個緩沖區(請參見圖 1)。
圖 1 服務總線中的緩沖區
即使沒有服務正在監控緩沖區,該消息也會在緩沖區內存儲配置的時間段。請注意,有多種服務可以監控緩沖區,但除非您明確查看並鎖定消息,否則只有其中一種服務可以檢索消息。
客戶端會在緩沖區後面與服務分離,並且客戶端和服務可不必在同一時間運行。由於客戶端與緩沖區交互,並沒有與實際的服務端點交互,因此所有的消息都是單向的,也沒有現成的辦法獲取消息調用的結果或任何錯誤。
該服務總線緩沖區不應等同於 Microsoft 消息隊列 (MSMQ) 等隊列或 WCF 排隊服務,它們之間有一系列關鍵性差異:
該服務總線緩沖區不持久,而且消息存儲在內存中。也就是說,如果服務總線本身發生災難性故障(雖然有點不太可能),消息會有丟失的風險。
該服務總線緩沖區不是事務性的,不可以作為事務處理的一部分來完成對消息的發送或檢索。
緩沖區無法處理持久消息。服務必須在 10 分鐘內從緩沖區檢索消息,否則消息會被丟棄。盡管基於 WCF MSMQ 的消息也有生存時間,不過這個時間段要長得多,默認為一天。這極大地增加了真正脫節的操作和斷開的應用程序的范圍。
緩沖區的大小有限,所保留的消息不能超過 50 條。
所緩沖消息的大小也有上限,每個 64KB。雖然 MSMQ 也對消息規定了最大尺寸,它的上限卻要大得多(每條消息 4MB)。
因此,緩沖區無法通過雲系統提供真正的排隊調用,不過,如果連接會在排隊調用和“即發即棄”的異步調用之間的某處丟失調用,在某種程度上緩沖區可以恢復這些連接。
緩沖區在兩種情況下會有用。一種是應用程序的客戶端和服務之間進行交互的連接不穩固,而只要消息在短期離線期間得到緩沖,連接的丟棄和重新連接是可以容忍的。第二個(和更常見)的情況是客戶端發出異步單向調用,並利用響應緩沖區(如後面的“響應服務”一節所述)來處理該調用的結果。這樣的互動更多地是將網絡連接視為有彈性的繩索,而不是沒有存儲容量的剛性網絡連接線。
使用緩沖區
緩沖區地址必須是唯一的;您只能將一個緩沖區與一個地址相關聯,且此地址不能為緩沖區或服務所使用。然而,多方都可以從同一緩沖區檢索消息。此外,緩沖區地址對於方案必須使用 HTTP 或 HTTPS。為發送消息並從緩沖區檢索消息,服務總線提供了類似於 System.Messaging API 的 API;也就是說,它要求您與原始消息進行互動。服務總線管理器獨立於服務或客戶端管理緩沖區。每個緩沖區必須有一個策略管理其行為和生存期。默認情況下,服務總線管理器必須執行編程調用,以創建和管理緩沖區。
如圖 2 所示,每個緩沖區策略都通過 MessageBufferPolicy 類的實例表達。
圖 2 MessageBufferPolicy 類
[DataContract]
public class MessageBufferPolicy : ...
{
public MessageBufferPolicy();
public MessageBufferPolicy(MessageBufferPolicy policyToCopy);
public DiscoverabilityPolicy Discoverability
{get;set;}
public TimeSpan ExpiresAfter
{get;set;}
public int MaxMessageCount
{get;set;}
public OverflowPolicy OverflowPolicy
{get;set;}
public AuthorizationPolicy Authorization
{get;set;}
public TransportProtectionPolicy TransportProtection
{get;set;}
}
Discoverability 策略屬性是類型 DiscoverabilityPolicy 的枚舉,控制服務總線注冊表(Atom 源)是否包括緩沖區:
public enum DiscoverabilityPolicy
{
Managers,
ManagersListeners,
ManagersListenersSenders,
Public
}
Discoverability 默認為 DiscoverabilityPolicy.Managers,這意味著它需要有托管授權聲明。將其設置為 DiscoverabilityPolicy.Public 會在沒有任何授權的情況下將其發布到源。
ExpiresAfter 屬性控制消息在緩沖區中的生存期。默認為 5 分鐘,最低值是 1 分鐘,允許的最大值是 10 分鐘。任何配置更長生存期的嘗試都會被忽略且無提示。
MaxMessageCount 屬性設置了緩沖區大小的上限。該策略的默認值為 10 條消息,最低值當然設置為 1。如前所述,緩沖區大小最大為 50,配置更大尺寸的嘗試都會被忽略且無提示。
OverflowPolicy 屬性屬枚舉類型,其單個值定義為:
public enum OverflowPolicy
{
RejectIncomingMessage
}
OverflowPolicy 控制當緩沖區消息數達到最大值(由 MaxMessageCount 定義)時,如何處理消息。唯一可行的選擇是拒絕此消息:將其發送回發件人,並提示錯誤。
單值枚舉可作為未來選項的占位符:如在不通知發件人的情況下丟棄此消息,或自緩沖區刪除信息,接受新消息。
最後兩個屬性負責安全配置。AuthorizationPolicy 屬性指示服務總線是否批准客戶端的令牌:
public enum AuthorizationPolicy
{
NotRequired,
RequiredToSend,
RequiredToReceive,
Required
}
該 AuthorizationPolicy.Required 的默認值要求同時授權發送和接收客戶端。
最後,TransportProtection 屬性使用類型 TransportProtectionPolicy 的枚舉值,規定了消息傳輸到緩沖區的傳輸安全的最低水平:
public enum TransportProtectionPolicy
{
None,
AllPaths,
}
通過 TransportProtectionPolicy.AllPaths 實現傳輸安全是所有緩沖區策略的默認值,它會強制使用 HTTPS 地址。
如圖 3 所示,您可以使用 MessageBufferClient 類管理緩沖區。
圖 3 MessageBufferClient 類
public sealed class MessageBufferClient
{
public Uri MessageBufferUri
{get;}
public static MessageBufferClient CreateMessageBuffer(
TransportClientEndpointBehavior credential,
Uri messageBufferUri,MessageBufferPolicy policy);
public static MessageBufferClient GetMessageBuffer(
TransportClientEndpointBehavior credential,Uri messageBufferUri);
public MessageBufferPolicy GetPolicy();
public void DeleteMessageBuffer();
// More members
}
通過向靜態方法提供類型 TransportClientEndpointBehavior 的服務總線憑據,您使用 MessageBufferClient 的靜態方法來獲取 MessageBufferClient 經過身份驗證的實例。每次使用 MessageBufferClient 時,您通常都需要調用 GetMessageBuffer 方法,檢查緩沖區是否已經存在於服務總線中。如果沒有緩沖區,GetMessageBuffer 會引發異常。
以下說明如何通過編程方式創建緩沖區:
Uri bufferAddress =
new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer/");
TransportClientEndpointBehavior credential = ...
MessageBufferPolicy bufferPolicy = new MessageBufferPolicy();
bufferPolicy.MaxMessageCount = 12;
bufferPolicy.ExpiresAfter = TimeSpan.FromMinutes(3);
bufferPolicy.Discoverability = DiscoverabilityPolicy.Public;
MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,
bufferPolicy);
在這個例子中,您對緩沖區策略對象進行了實例化並將策略設置為某些所需的值。要安裝緩沖區,所需的操作就是提供策略和某些有效憑據,調用 MessageBufferClient 的 CreateMessageBuffer 方法。
作為編程調用的替代方案,您可以使用服務總線資源管理器(在我的路由器文章中有所說明;也可在線查看並找到此文章的示例代碼)查看和修改緩沖區。圖 4 說明如何通過指定地址和各項策略屬性創建新的緩沖區。您還可以刪除服務命名空間中的所有緩沖區,方式大致相同。
圖 4 使用服務總線資源管理器創建緩沖區
如圖 5 所示,您還可以通過在服務命名空間樹中選擇緩沖區,並與右窗格中的緩沖區屬性進行互動,查看和修改現有緩沖區的策略,從緩沖區中清除消息,甚至刪除緩沖區。
圖 5 服務總線資源管理器中的緩沖區
簡化管理
在創建緩沖區時,最好盡可能擴大緩沖區的大小和使用壽命,給客戶端和服務更多的交互時間。此外,使緩沖區可發現是一個好主意,這樣您就可以在服務總線注冊表中查看緩沖區。涉及到緩沖區的使用時,客戶端和服務應驗證緩沖區是否已創建,如果未創建會著手創建它。
為自動完成這些步驟,我創建了 ServiceBusHelper 類:
public static partial class ServiceBusHelper
{
public static void CreateBuffer(string bufferAddress,string secret);
public static void CreateBuffer(string bufferAddress,string issuer,
string secret);
public static void VerifyBuffer(string bufferAddress,string secret);
public static void VerifyBuffer(string bufferAddress,string issuer,
string secret);
public static void PurgeBuffer(Uri bufferAddress,
TransportClientEndpointBehavior credential);
public static void DeleteBuffer(Uri bufferAddress,
TransportClientEndpointBehavior credential);
}
該 CreateBuffer 方法創建了一個可發現的新緩沖區,最高容量有 50 條消息,持續時間為 10 分鐘。如果緩沖區已經存在,CreateBuffer 會刪除舊的緩沖區。該 VerifyBuffer 方法會驗證緩沖區是否存在,如果沒有,則創建新的緩沖區。PurgeBuffer 可用於在診斷或調試過程中清除所有緩存的信息。DeleteBuffer 會完全刪除此緩沖區。圖 6 顯示了這些方法的部分實現列表。
圖 6 緩沖區幫助程序方法的部分列表
public static partial class ServiceBusHelper
{
public static void CreateBuffer(string bufferAddress,
string issuer,string secret)
{
TransportClientEndpointBehavior credentials = ...;
CreateBuffer(bufferAddress,credentials);
}
static void CreateBuffer(string bufferAddress,
TransportClientEndpointBehavior credentials)
{
MessageBufferPolicy policy = CreateBufferPolicy();
CreateBuffer(bufferAddress,policy,credentials);
}
static internal MessageBufferPolicy CreateBufferPolicy()
{
MessageBufferPolicy policy = new MessageBufferPolicy();
policy.Discoverability = DiscoverabilityPolicy.Public;
policy.ExpiresAfter = TimeSpan.Fromminutes(10);
policy.MaxMessageCount = 50;
return policy;
}
public static void PurgeBuffer(Uri bufferAddress,
TransportClientEndpointBehavior credentials)
{
Debug.Assert(BufferExists(bufferAddress,credentials));
MessageBufferClient client =
MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
MessageBufferPolicy policy = client.GetPolicy();
client.DeleteMessageBuffer();
MessageBufferClient.CreateMessageBuffer(credential,bufferAddress,policy);
}
public static void VerifyBuffer(string bufferAddress,
string issuer,string secret)
{
TransportClientEndpointBehavior credentials = ...;
VerifyBuffer(bufferAddress,credentials);
}
internal static void VerifyBuffer(string bufferAddress,
TransportClientEndpointBehavior credentials)
{
if(BufferExists(bufferAddress,credentials))
{
return;
}
CreateBuffer(bufferAddress,credentials);
}
internal static bool BufferExists(Uri bufferAddress,
TransportClientEndpointBehavior credentials)
{
try
{
MessageBufferClient client =
MessageBufferClient.GetMessageBuffer(credentials,bufferAddress);
client.GetPolicy();
return true;
}
catch(FaultException)
{}
return false;
}
static void CreateBuffer(string bufferAddress,
MessageBufferPolicy policy,
TransportClientEndpointBehavior credentials)
{
Uri address = new Uri(bufferAddress);
if(BufferExists(address,credentials))
{
MessageBufferClient client =
MessageBufferClient.GetMessageBuffer(credentials,address);
client.DeleteMessageBuffer();
}
MessageBufferClient.CreateMessageBuffer(credentials,address,policy);
}
}
該 BufferExists 方法使用 MessageBufferClient 的 GetPolicy 方法查看是否存在緩沖區,如果緩沖區不存在,它會以一條錯誤消息來表征。清除緩沖區的做法是:復制其策略,刪除緩沖區,使用 舊的策略創建新的緩沖區(地址相同)。
發送和檢索消息
如前所述,服務總線緩沖區需要與原始 WCF 消息互動。這是通過 MessageBufferClient(創建或得到緩沖區時獲得)的 Send 和 Retrieve 方法實現的:
public sealed class MessageBufferClient
{
public void Send(Message message);
public void Send(Message message,TimeSpan timeout);
public Message Retrieve();
public Message Retrieve(TimeSpan timeout);
// More members
}
這兩種方法都有超時設置,對於無參數版本,默認為 1 分鐘。對於發送者而言,超時是指在緩沖區已滿的情況下等待多長時間。對於檢索者而言,超時是指在緩沖區已空的情況下等待多長時間。
下面是將原始信息發送到緩沖區的發送端代碼:
TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");
MessageBufferClient client =
MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = Message.CreateMessage(MessageVersion.Default,"Hello");
client.Send(message,TimeSpan.MaxValue);
發送者首先創建憑據對象,並使用它來獲得 MessageBufferClient 的實例。然後發送者會創建 WCF 消息並將其發送到緩沖區。下面是從緩沖區檢索原始信息的檢索端代碼:
TransportClientEndpointBehavior credential = ...;
Uri bufferUri = new Uri(@"sb://MyNamespace.servicebus.windows.net/MyBuffer/");
MessageBufferClient client =
MessageBufferClient.GetMessageBuffer(credential,bufferUri);
Message message = client.Retrieve();
Debug.Assert(message.Headers.Action == "Hello");
緩沖的服務
像前面的代碼段一樣使用原始 WCF 消息是服務總線必須提供的功能。但是,這種編程模型有很多不足。它繁瑣、冗長、非結構化、非面向對象、非類型安全。這是倒退到以前沒有 WCF 的時期,使用 System.Messaging API,編程方式明顯背離 MSMQ。您需要解析消息內容並開啟其元素。
幸運的是,您可以改善這項基本功能。您不應與原始消息進行互動,而是應該將互動提升為客戶端與服務之間的結構化調用。雖然這需要提前做相當程度的低級別工作,不過,我可以通過一小組幫助程序類實現這項功能。
為在服務端提供結構化的緩沖調用,我編寫了 BufferedServiceBusHost<T>,定義如下:
// Generic type parameter based host
public class ServiceHost<T> : ServiceHost
{...}
public class BufferedServiceBusHost<T> : ServiceHost<T>,...
{
public BufferedServiceBusHost(params Uri[] bufferAddresses);
public BufferedServiceBusHost(
T singleton,params Uri[] bufferAddresses);
/* Additional constructors */
}
在隨 MSMQ 綁定使用了 WCF 之後,我構建了 BufferedServiceBusHost<T>。您需要向構造函數提供要從中檢索消息的緩沖區地址。其余工作與處理常規 WCF 服務主機類似:
Uri buffer = new Uri(@"https://MyNamespace.servicebus.windows.net/MyBuffer");
ServiceHost host = new BufferedServiceBusHost<MyService>(buffer);
host.Open();
請注意,您可以為構造函數提供多個要監控的緩沖區地址,就像 WCF 服務主機可以打開不同隊列的多個端點。不需要(也無法)在配置文件的服務端點部分提供這些緩沖區的任何地址(盡管緩沖區地址可以來自應用程序設置部分,如果您如此設計的話)。
雖然與服務總線緩沖區的實際通信是通過原始 WCF 消息完成的,不過,對這項工作已進行了封裝。BufferedServiceBusHost<T> 將驗證是否實際存在所提供的緩沖區,不存在的話,會利用如圖 6 所示的 ServiceBusHelper.VerifyBuffer 緩沖區策略加以創建。BufferedServiceBusHost<T> 將使用保護所有路徑的默認傳輸安全策略。它還將驗證所提供的服務一般類型參數 T 的約定都是單向的,即它們都只有單向操作(與單項中繼綁定一樣)。最後一項功能:當關閉主機,只采用調試版本時,BufferedServiceBusHost<T> 將清除所有緩沖區,以確保順利啟動下一個調試會話。
BufferedServiceBusHost<T> 的操作方法是,在本地承載指定服務。對於類型參數 T 的每個服務約定,BufferedServiceBusHost<T> 增加了 IPC(命名管道)上的端點。IPC 同這些端點的綁定被配置為永不超時。
即便是在每次會話服務被看作每次調用服務的情況下,IPC 也總是有一個傳輸會話模擬 MSMQ 的行為。每個被從隊列中取消的 WCF 消息都將被播放到此服務的新實例,可能會與以前的消息同時運行,這一點與 MSMQ 綁定一樣。如果所提供的服務類型是一個單例,BufferedServiceBusHost<T> 會認可這一點,將所有消息發送到同一服務實例的所有緩沖區和端點,就像對待 MSMQ 綁定一樣。
BufferedServiceBusHost<T> 會在各個後台工作線程上監控每個指定的緩沖區。如果有消息被存放在緩沖區中,BufferedServiceBusHost<T> 會檢索該消息,並將原始 WCF 消息轉換成對 IPC 上相應端點的調用。
圖 7 提供了 BufferedServiceBusHost<T> 的部分列表,刪除了大部分錯誤處理和安全措施。
圖 7 BufferedServiceBusHost<T> 的部分列表
public class BufferedServiceBusHost<T> :
ServiceHost<T>,IServiceBusProperties
{
Uri[] m_BufferAddresses;
List<Thread> m_RetrievingThreads;
IChannelFactory<IDuplexSessionChannel>
m_Factory;
Dictionary<string,IDuplexSessionChannel>
m_Proxies;
const string CloseAction =
"BufferedServiceBusHost.CloseThread";
public BufferedServiceBusHost(params Uri[]
bufferAddresses)
{
m_BufferAddresses = bufferAddresses;
Binding binding = new NetNamedPipeBinding();
binding.SendTimeout = TimeSpan.MaxValue;
Type[] interfaces =
typeof(T).GetInterfaces();
foreach(Type interfaceType in interfaces)
{
VerifyOneway(interfaceType);
string address =
@"net.pipe://localhost/" + Guid.NewGuid();
AddServiceEndpoint(interfaceType,binding,
address);
}
m_Factory =
binding.BuildChannelFactory
<IDuplexSessionChannel>();
m_Factory.Open();
}
protected override void OnOpened()
{
CreateProxies();
CreateListeners();
base.OnOpened();
}
protected override void OnClosing()
{
CloseListeners();
foreach(IDuplexSessionChannel proxy in
m_Proxies.Values)
{
proxy.Close();
}
m_Factory.Close();
PurgeBuffers();
base.OnClosing();
}
// Verify all operations are one-way
void VerifyOneway(Type interfaceType)
{...}
void CreateProxies()
{
m_Proxies =
new Dictionary
<string,IDuplexSessionChannel>();
foreach(ServiceEndpoint endpoint in
Description.Endpoints)
{
IDuplexSessionChannel channel =
m_Factory.CreateChannel(endpoint.Address);
channel.Open();
m_Proxies[endpoint.Contract.Name] =
channel;
}
}
void CreateListeners()
{
m_RetrievingThreads = new List<Thread>();
foreach(Uri bufferAddress in
m_BufferAddresses)
{ ? ServiceBusHelper.VerifyBuffer(
bufferAddress.AbsoluteUri,m_Credential);
Thread thread = new Thread(Dequeue);
m_RetrievingThreads.Add(thread);
thread.IsBackground = true;
thread.Start(bufferAddress);
}
}
void Dequeue(object arg)
{
Uri bufferAddress = arg as Uri;
MessageBufferClient bufferClient = ? MessageBufferClient.GetMessageBuffer(
m_Credential,bufferAddress);
while(true)
{
Message message =
bufferClient.Retrieve(TimeSpan.MaxValue);
if(message.Headers.Action == CloseAction)
{
return;
}
else
{
Dispatch(message);
}
}
}
void Dispatch(Message message)
{
string contract = ExtractContract(message);
m_Proxies[contract].Send(message);
}
string ExtractContract(Message message)
{
string[] elements =
message.Headers.Action.Split('/');
return elements[elements.Length-2];
}
protected override void OnClosing()
{
CloseListeners();
foreach(IDuplexSessionChannel proxy in
m_Proxies.Values)
{
proxy.Close();
}
m_Factory.Close();
PurgeBuffers();
base.OnClosing();
}
void SendCloseMessages()
{
foreach(Uri bufferAddress in
m_BufferAddresses)
{
MessageBufferClient bufferClient = ? MessageBufferClient.GetMessageBuffer(
m_Credential,bufferAddress);
Message message =
Message.CreateMessage(
MessageVersion.Default,CloseAction);
bufferClient.Send(message);
}
}
void CloseListeners()
{
SendCloseMessages();
foreach(Thread thread in m_RetrievingThreads)
{
thread.Join();
}
}
[Conditional("DEBUG")]
void PurgeBuffers()
{
foreach(Uri bufferAddress in
m_BufferAddresses)
{
ServiceBusHelper.PurgeBuffer(
bufferAddress,m_Credential);
}
}
}
進入字典的鑰匙是端點的約定類型名稱。
構造函數會存儲所提供的緩沖區地址,然後使用反射來獲取此服務類型上所有接口的集合。對於每一個接口,BufferedServiceBusHost<T> 會驗證它只有單向操作,然後調用基 AddServiceEndpoint 為該約定類型添加端點。地址是 IPC 地址,將 GUID 用作管道名稱。該構造函數使用 IPC 綁定建立類型 IChannelFactory<IDuplexSessionChannel> 的通道工廠。IChannelFactory<T> 用於創建綁定上的非強類型化通道:
public interface IChannelFactory<T> : IChannelFactory
{
T CreateChannel(EndpointAddress to);
// More members
}
打開內部主機及其所有 IPC 端點後,OnOpened 方法將創建這些端點的內部代理和緩沖的偵聽器。這兩個步驟是 BufferedServiceBusHost<T> 的核心部分。為創建代理,它將循環訪問端點的集合。它會獲取每個端點的地址,並使用 IChannelFactory<IDuplexSessionChannel> 針對該地址創建通道。然後該通道(或代理)將存儲在字典中。CreateListeners 方法會循環訪問指定的緩沖區地址。對於每一個地址,它會驗證緩沖區,並創建工作線程,從隊列中取消消息。
該 Dequeue 方法使用 MessageBufferClient 檢索無限循環中的消息,並使用 Dispatch 方法對它們進行調度。Dispatch 從消息中提取目標約定名稱,並用它來從代理字典中查找 IDuplexChannel,然後通過 IPC 發送消息。IDuplexChannel 受底層的 IPC 通道支持,為發送原始消息提供了途徑:
public interface IOutputChannel : ...
{
void Send(Message message,TimeSpan timeout);
// More members
}
public interface IDuplexSessionChannel : IOutputChannel,...
{}
如果在 IPC 調用過程中發生了錯誤,BufferedServiceBusHost<T> 會針對該端點重建它所管理的通道(這一點未在圖 7 中顯示出來)。當您關閉主機時,您需要關閉代理。該操作將正常等待進展中的調用完成。因為 MessageBufferClient.Retrieve 是阻塞操作,沒有內置的方法來中止它,所以如何正常關閉所有檢索線程是個問題。解決方案是將特殊的私有消息張貼到每個監控的緩沖區,其操作會指示檢索線程退出。這正是 SendCloseMessages 方法要進行的操作。該 CloseListeners 方法會將私有消息張貼到緩沖區,然後通過加入它們等待所有偵聽線程終止。關閉偵聽線程會停止將消息饋送到內部代理,一旦代理被關閉(當所有目前正在進行的調用已返回時),主機將准備關閉。BufferedServiceBusHost<T> 還支持非正常 Abort 方法,該方法將完全中止所有線程(未在圖 7 中顯示出來)。
最後,請注意 BufferedServiceBusHost<T> 支持接口 IServiceBusProperties,我的定義如下:
public interface IServiceBusProperties
{
TransportClientEndpointBehavior Credential
{get;set;}
Uri[] Addresses
{get;}
}
在構建我的框架中的幾個地方(尤其是在簡化緩沖方面),我需要這類接口。我針對客戶端,編寫了類 BufferedServiceBusClient<T>,定義如下:
public abstract class BufferedServiceBusClient<T> :
HeaderClientBase<T,ResponseContext>,IServiceBusProperties
{
// Buffer address from config
public BufferedServiceBusClient()
{}
// No need for config file
public BufferedServiceBusClient(Uri bufferAddress);
/* Additional constructors with different credentials */
protected virtual void Enqueue(Action action);
}
BufferedServiceBusClient<T> 派生自我的 HeaderClientBase<T,H>(用來在消息頭中傳遞信息的幫助程序代理。請參閱我 2007 年 11 月的文章“WCF 中的同步環境”,網址為 msdn.microsoft.com/magazine/cc163321):
public abstract class HeaderClientBase<T,H> : InterceptorClientBase<T>
where T : class
{
protected H Header
{get;set;}
// More members
}
該基類的目的是支持響應服務,如下面一節中所述。對於純客戶端的緩沖服務,這種推導無關大局。
無論有沒有客戶端配置文件,您都可以使用 BufferedServiceBusClient<T>。接受緩沖區地址的構造函數不需要配置文件。該無參數構造函數或接受端點名稱的構造函數要求配置文件包含與帶有單向中繼綁定的約定類型相匹配的端點(盡管該綁定會被 BufferedServiceBusClient<T> 完全忽略)。
如果您的代理派生於 BufferedServiceBusClient<T>,您需要使用受保護的 Enqueue 方法,而不是直接使用 Channel 屬性:
[ServiceContract]
interface IMyContract
{
[OperationContract(IsOneWay = true)]
void MyMethod(int number);
}
class MyContractClient : BufferedServiceBusClient<IMyContract>,IMyContract
{
public void MyMethod(int number)
{
Enqueue(()=>Channel.MyMethod(number));
}
}
Enqueue 接受覆蓋了對 Channel 屬性使用的委托(或 lambda 表達式)。結果仍屬於類型安全。圖 8 顯示了 BufferedServiceBusClient<T> 類的部分列表。
圖 8 BufferedServiceBusClient<T> 的部分列表
public abstract class BufferedServiceBusClient<T> :
HeaderClientBase<T,ResponseContext>,IServiceBusProperties where T : class
{
MessageBufferClient m_BufferClient;
public BufferedServiceBusClient(Uri bufferAddress) :
base(new NetOnewayRelayBinding(),new EndpointAddress(bufferAddress))
{}
protected virtual void Enqueue(Action action)
{
try
{
action();
}
catch(InvalidOperationException exception)
{
Debug.Assert(exception.Message ==
"This message cannot support the operation " +
"because it has been written.");
}
}
protected override T CreateChannel()
{
ServiceBusHelper.VerifyBuffer(Endpoint.Address.Uri.AbsoluteUri,Credential);
m_BufferClient = ? MessageBufferClient.GetMessageBuffer(Credential,m_BufferAddress);
return base.CreateChannel();
}
protected override void PreInvoke(ref Message request)
{
base.PreInvoke(ref request);
m_BufferClient.Send(request);
}
protected TransportClientEndpointBehavior Credential
{
get
{...}
set
{...}
}
}
對 BufferedServiceBusClient<T> 的構造函數向其基構造函數提供緩沖區地址和綁定,這一直是執行單向操作驗證的單向中繼綁定。該 CreateChannel 方法會驗證目標緩沖區存在,並且獲得了代表它的 MessageBufferClient。BufferedServiceBusClient<T> 的核心是 PreInvoke 方法。PreInvoke 是一個由 HeaderClientBase<T,H> 的基類 InterceptorClientBase<T> 提供的虛擬方法:
public abstract class InterceptorClientBase<T> : ClientBase<T> where T : class
{
protected virtual void PreInvoke(ref Message request);
// Rest of the implementation
}
PreInvoke 讓您在 WCF 消息由客戶端調度之前,輕松處理消息。BufferedServiceBusClient<T> 覆蓋了 PreInvoke,並使用緩沖區客戶端將消息發送到緩沖區。這樣,客戶端會維護結構化的編程模型,且 BufferedServiceBusClient<T> 會封裝與 WCF 消息的互動。缺點是該消息只能發送一次,當 ClientBase 根類嘗試發送它時,會引發 InvalidOperationException。這正是 Enqueue 大顯身手的時候,它可以平息該異常。
響應服務
在我 2007 年 2 月的專欄文章“構建排隊 WCF 響應服務”(msdn.microsoft.com/magazine/cc163482) 中,我解釋說,收到排隊調用的結果(或錯誤)的唯一方法是使用排隊響應服務。我展示了如何在消息頭中傳遞包含邏輯方法 ID 和響應地址的響應上下文對象:
[DataContract]
public class ResponseContext
{
[DataMember]
public readonly string ResponseAddress;
[DataMember]
public readonly string MethodId;
public ResponseContext(string responseAddress,string methodId);
public static ResponseContext Current
{get;set;}
// More members
}
處理緩沖時適用同一設計模式。客戶端需要為緩沖響應的服務提供專門的響應緩沖區。客戶端還需要通過消息頭傳遞響應地址和方法 ID,如同對基於 MSMQ 的調用一樣。如圖 9 所示,基於 MSMQ 的響應服務與服務總線之間的主要區別是,響應緩沖區必須也駐留在服務總線中。
圖 9 服務總線緩沖的響應服務
為了簡化客戶端,我編寫了類 ClientBufferResponseBase<T>,定義如下:
public abstract class ClientBufferResponseBase<T> :
BufferedServiceBusClient<T> where T : class
{
protected readonly Uri ResponseAddress;
public ClientBufferResponseBase(Uri responseAddress);
/* Additional constructors with different credentials */
protected virtual string GenerateMethodId();
}
ClientBufferResponseBase<T> 是專屬於 BufferedServiceBusClient<T> 的子類,它還在消息頭中增加了響應上下文。正是基於此原因,我的 BufferedServiceBusClient<T> 是派生自 HeaderClientBase<T,H>,而不僅僅是派生自 InterceptorClientBase<T>。如圖 10 所示,您可以像使用 BufferedServiceBusClient 一樣來使用 ClientBufferResponseBase<T>。
圖 10 簡化客戶端
[ServiceContract]
interface ICalculator
{
[OperationContract(IsOneWay = true)]
void Add(int number1,int number2);
}
class CalculatorClient : ClientBufferResponseBase<ICalculator>,ICalculator
{
public CalculatorClient(Uri responseAddress) : base(responseAddress)
{}
public void Add(int number1,int number2)
{
Enqueue(()=>Channel.Add(number1,number2));
}
}
使用 ClientBufferResponseBase<T> 的子類非常簡單:
Uri resposeAddress =
new Uri(@"sb://MyNamespace.servicebus.windows.net/MyResponseBuffer/");
CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
proxy.Close();
在管理客戶端的響應方面,讓調用客戶端獲取用來調度此呼叫的方法 ID 很方便。通過 Header 屬性很容易完成這一點:
CalculatorClient proxy = new CalculatorClient(responseAddress);
proxy.Add(2,3);
string methodId = proxy.Header.MethodId;
圖 11 列出了對 ClientBufferResponseBase<T> 的實施方式。ClientBufferResponseBase<T> 覆蓋了 HeaderClientBase<T,H> 的 PreInvoke 方便,這樣它就可以為每個呼叫生成新的方法 ID,並將其設置到標頭中。
圖 11 實現 ClientBufferResponseBase<T>
public abstract class ClientBufferResponseBase<T> :
BufferedServiceBusClient<T> where T : class
{
public readonly Uri ResponseAddress;
public ClientBufferResponseBase(Uri responseAddress)
{
ResponseAddress = responseAddress;
}
/* More Constructors */
protected override void PreInvoke(ref Message request)
{
string methodId = GenerateMethodId();
Header = new ResponseContext(ResponseAddress.AbsoluteUri,methodId);
base.PreInvoke(ref request);
}
protected virtual string GenerateMethodId()
{
return Guid.NewGuid().ToString();
}
// Rest of the implementation
}
如圖 12 所示,為了簡化緩沖服務調用響應服務所需的工作,我編寫了類 ServiceBufferResponseBase<T>。
圖 12 ServiceBufferResponseBase<T> 類
public abstract class ServiceBufferResponseBase<T> :
BufferedServiceBusClient<T> where T : class
{
public ServiceBufferResponseBase() :
base(new Uri(ResponseContext.Current.ResponseAddress))
{
Header = ResponseContext.Current;
// Grab the credentials the host was using
IServiceBusProperties properties =
OperationContext.Current.Host as IServiceBusProperties;
Credential = properties.Credential;
}
}
雖然這項服務可以完全使用 BufferedServiceBusClient<T> 將響應排入隊列,您還是需要從標頭提取響應緩沖區地址,並以某種方式獲取登錄到服務總線緩沖區的憑據。您還需要向傳出調用的標頭提供響應上下文。所有這些步驟都可以通過 ServiceBufferResponseBase<T> 得到簡化。ServiceBufferResponseBase<T> 會從響應上下文中向基本構造函數提供地址,還會將上下文設置到傳出標頭中。
ServiceBufferResponseBase<T> 作出的另一種簡化假設是響應服務可以使用它的主機用於從自己的緩沖區檢索信息的同一憑據將消息發送到響應緩沖。為此,ServiceBufferResponseBase<T> 從操作上下文獲得對自己主機的引用,並使用主機的 IServiceBusProperties 實現讀取憑據。ServiceBufferResponseBase<T> 會復制這些憑據以供自用(在 BufferedServiceBusClient<T> 內完成)。當然,這首先會強制使用 BufferedServiceBusHost<T>。您的服務需要從 ServiceBufferResponseBase<T> 派生服務代理類並將其用於響應。例如,假設此服務約定如下所示:
[ServiceContract]
interface ICalculatorResponse
{
[OperationContract(IsOneWay = true)]
void OnAddCompleted(int result,ExceptionDetail error);
}
This would be the definition of the proxy to the response service:
class CalculatorResponseClient :
ServiceBufferResponseBase<ICalculatorResponse>,ICalculatorResponse
{
public void OnAddCompleted(int result,ExceptionDetail error)
{
Enqueue(()=>Channel.OnAddCompleted(result,error));
}
}
圖 13 說明了響應其客戶端的簡單緩沖服務。
圖 13 使用 ServiceBufferResponseBase<T>
class MyCalculator : ICalculator
{
[OperationBehavior(TransactionScopeRequired = true)]
public void Add(int number1,int number2)
{
int result = 0;
ExceptionDetail error = null;
try
{
result = number1 + number2;
}
// Don’t rethrow
catch(Exception exception)
{
error = new ExceptionDetail(exception);
}
finally
{
CalculatorResponseClient proxy = new CalculatorResponseClient();
proxy.OnAddCompleted(result,error);
proxy.Close();
}
}
}
如下所示,所有的響應服務都需要從消息頭訪問方法 ID:
class MyCalculatorResponse : ICalculatorResponse
{
public void OnAddCompleted(int result,ExceptionDetail error)
{
string methodId = ResponseContext.Current.MethodId;
...
}
}
我們會進一步研究服務總線,敬請關注。
下載示例代碼:http://code.msdn.microsoft.com/mag201005ServBus