對於一般的多線程操作,比如異步地進行基於文件系統的IO操作;異步地調用Web Service;或者是異步地進行數據庫訪問等等,是和具體的線程無關的。也就是說,對於這些操作,任意創建一個新的線程來執行都是等效的。但是有些情況下,有些操作卻只能在固定的線程下執行。比如,在GUI應用下,對控件的訪問就需要在創建該控件的線程下執行;或者我們在某個固定的線程中通過TLS(Thread Local Storage)設置了一些Context信息,供具體的操作使用,我們把操作和某個固定的線程的依賴稱為線程關聯性(Thread Affinity)。在這種情況下,我們的異步操作就需要被Marshal到固定的線程執行。在WCF並發或者Callback的情況下也具有這樣的基於線程關聯性的問題。
一、從基於Windows Application客戶端的WCF回調失敗談起
在"我的WCF之旅"系列文章中,有一篇(WinForm Application中調用Duplex Service出現TimeoutException的原因和解決方案)專門介紹在一個Windows Application客戶端應用, 通過WCF 的Duplex通信方式進行回調失敗的文章.我們今天以此作為出發點介紹WCF在Thread Affinity下的表現和解決方案.
我們來創建一個WCF的應用來模擬該場景: 客戶端是一個基於Windows Form應用, 完成一個計算器的功能, 用戶輸入操作數,點擊"計算"按鈕, 後台通過調用WCF service, 並傳遞一個用於顯示計算結果的Callback對象; service進行相應的計算得到最後的運算結果,調用該Callback對象將運算結果顯示到客戶端界面.這是我們的WCF四層結構:
1、Contract:ICalculate & ICalculateCallback
namespace Artech.ThreadAffinity.Contracts
{
[ServiceContract(CallbackContract = typeof(ICalculateCallback))]
public interface ICalculate
{
[OperationContract]
void Add(double op1, double op2);
}
}
這是Service Contract,下面是Callback Contract,用於顯示運算結果:
namespace Artech.ThreadAffinity.Contracts
{
public interface ICalculateCallback
{
[OperationContract]
void DisplayResult(double result);
}
}
2、Service:CalculateService
namespace Artech.ThreadAffinity.Services
由於需要進行callback, 我們把ConcurrencyMode 設為Reentrant。當得到運算的結果後,通過OperationContext.Current.GetCallbackChannel得到callback對象,並調用之。還有一點需要提的是,該service是通過一個Windows Form application進行host的。並且有一個ListBox列出所有service執行的結果,就像這樣:
{
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
public class CalculateService:ICalculate
{
public static ListBox DisplayPanel
{ get; set; }
#region ICalculate Members
public void Add(double op1, double op2)
{
double result = op1 + op2;
ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
callback.DisplayResult(result);
}
#endregion
}
}
3、Hosting
Hosting的代碼寫在Form的Load事件中:
private void HostForm_Load(object sender, EventArgs e)
{
this._serviceHost = new ServiceHost(typeof(CalculateService));
CalculateService.DisplayPanel = this.listBoxResult;
CalculateService.SynchronizationContext = SynchronizationContext.Current;
this._serviceHost.Opened += delegate
{
this.Text = "The calculate service has been started up!";
};
this._serviceHost.Open();
}
我們注意到了CalculateService使用到的用於顯示所有預算結果的ListBox就是在這了通過static property傳遞的。
這麼配置文件
<configuration>
<system.serviceModel>
<services>
<service name="Artech.ThreadAffinity.Services.CalculateService">
<endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />
<host>
<baseAddresses>
<add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />
</baseAddresses>
</host>
</service>
</services>
</system.serviceModel>
</configuration>
4、Client
Client的界面很簡單:輸入兩個操作數,點擊“=”按鈕,將運算結果顯示出來。
先來看看client端對callback contract的實現:
namespace Clients
{
public class CalculateCallback : ICalculateCallback
{
public static TextBox ResultPanel;
#region ICalculateCallback Members
public void DisplayResult(double result)
{
ResultPanel.Text = result.ToString();
}
#endregion
}
}
這是配置:
<configuration>
<system.serviceModel>
<client>
<endpoint address="net.tcp://127.0.0.1:8888/calculateservice"
binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"
name="calculateservice" />
</client>
</system.serviceModel>
</configuration>
然後是我們“=”按鈕的單擊事件對運算的實現:
private void buttonCalculate_Click(object sender, EventArgs e)
{
CalculateCallback.ResultPanel = this.textBoxResult;
DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
ICalculate calculator = channelFactory.CreateChannel();
calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
}
CalculateCallback 用於顯示運算結果的TextBox通過statis property實現傳遞。
這個實現很簡單,貌似沒有什麼問題,但是我們運行程序,在客戶端就會拋出這樣的exception。可以看出是一個TimeoutException。
二、是什麼導致TimeoutException?
我們現在來分析是什麼導致了TimeoutException的拋出。原因很簡單:由於我們對service的調用的是在UI 線程調用的,所以在開始調用到最終得到結果,這個UI Thread會被鎖住;但是當service進行了相應的運算的到運算的結果後,需要調用callback對象對client進行回調,默認的情況下,Callback的執行是在UI線程執行的。當Callback試圖執行的時候,發現UI 線程被鎖,只能等待。這樣形成一個死鎖,UI線程需要等待CalculateService執行返回後才能解鎖,而CalculateService需要Callback執行完成;而Callback需要等到UI線程解鎖才能執行。
基於上門的原因,我們有兩種解決方案:
1、CalculateService不必等到Callback執行完成就返回,我們可以通過異步調用Callback。或者讓Client異步方式調用CalculateService,以便及時釋放UI線程,我們可以通過One-way的方式來進行service的調用。
2、讓Callback的執行不必綁定到UI線程
三、解決方案一:通過異步調用或者One-way回調
為了簡單起見,我們通過ThreadPool實現了異步回調:
public void Add(double op1, double op2)
{
double result = op1 + op2;
ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);
}
這是一種方案,另一種是將Add操作設成One-way的:
namespace Artech.ThreadAffinity.Contracts
{
[ServiceContract(CallbackContract = typeof(ICalculateCallback))]
public interface ICalculate
{
[OperationContract(IsOneWay = true)]
void Add(double op1, double op2);
}
}
這兩種方案都可以解決問題。
四、方案二、通過解除Callback操作和UI線程的關聯性
現在我們才進入我們今天討論的主題:WCF並發操作的線程關聯性問題。在這之前,我們需要了解一個重要的對象:SynchonizationContext(System.Threading.SynchronizationContext)。SynchonizationContext就是為了解決這種線程關聯性問題而設計的。SynchonizationContext提供了兩個主要的API將操作和對應的Thread關聯:Post和Send。
public virtual void Post(SendOrPostCallback d, object state)
public virtual void Send(SendOrPostCallback d, object state)
Send和Post分別以同步和異步的方式將以Delegate表示的具體的操作和SynchonizationContext對象對應的Thread關聯,而SendOrPostCallback delegate對象代表你需要的線程關聯操作,state代表傳入delegate的參數:
public delegate void SendOrPostCallback(object state);
對於某些具有線程關聯的應用,比如Windows Form application,在程序啟動的時候,會設置當前的SynchonizationContext對象(Windows Form application使用的是繼承了SynchonizationContext的WindowsFormsSynchronizationContext :System.Windows.Forms.WindowsFormsSynchronizationContext)。當前SynchonizationContext被成功初始化後,你就可以通過SynchonizationContext的靜態屬性Current得到它。在你自己的應用中,如何有需要,你也可以自定義SynchonizationContext,並通過靜態方法SetSynchronizationContext將其設置為current SynchronizationContext。
對應WCF來說,無論是host一個service,還是在調用service時制定callback,在默認的情況下,service和callback的操作將自動和當前的SynchonizationContext進行關聯(如何有的話)。也就是說,如過我們的service被host到Windows Form application下,那麼service的操作將在UI 線程下執行;同理,如何我們在一個Windows Forms UI線程下調用duplex service並制定callback,那麼callback的最終執行將在UI線程。
關於WCF對線程關聯性的控制,可以通過ServiceBehavior或者CallbackBehavior的UseSynchronizationContext屬性進行設定,該屬性默認為true,這正式WCF默認具有線程關聯性的原因。
現在我們來實現我們的第二套方案:讓Callback的執行不必綁定到UI線程。為此我們只需要加上如何的CallbackBehavior attribute就可以了。
namespace Artech.ThreadAffinity.Clients
{
[CallbackBehavior(UseSynchronizationContext = false)]
public class CalculateCallback : ICalculateCallback
{
public static TextBox ResultPanel;
#region ICalculateCallback Members
public void DisplayResult(double result)
{
ResultPanel.Text = result.ToString();
}
#endregion
}
}
但是現在我們運行我們的程序,將會出現如下的InvalidOperation異常:
原因很簡單,由於我們將callbaclk的UseSynchronizationContext 設置成false,那麼callback的操作將不會再UI線程下執行。但是我們需要運算的結果輸入到UI的TextBox上,對UI上控件的操作需要在UI線程上執行,顯然會拋出異常了。
為了我們引入SynchonizationContext到CalculateCallback中:將SynchonizationContext定義成一個static屬性,通過Post方法異步地實現對運算結果的顯示。
namespace Artech.ThreadAffinity.Clients
{
[CallbackBehavior(UseSynchronizationContext = false)]
public class CalculateCallback : ICalculateCallback
{
public static TextBox ResultPanel;
public static SynchronizationContext SynchronizationContext;
#region ICalculateCallback Members
public void DisplayResult(double result)
{
SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);
}
#endregion
}
}
SynchonizationContext在調用service的時候指定:
private void buttonCalculate_Click(object sender, EventArgs e)
{
CalculateCallback.ResultPanel = this.textBoxResult;
CalculateCallback.SynchronizationContext = SynchronizationContext.Current;
DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
ICalculate calculator = channelFactory.CreateChannel();
calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
}
現在我們程序能夠正常運行了。
五、另一種可選方案:通過ISynchronizeInvoke的Invoke/BeginInvoke
熟悉Windows Form編程的讀者應該都知道,WinForm空間的基類Control(System.Windows.Forms.Control)都實現了System.ComponentModel.ISynchronizeInvoke接口,而Control對ISynchronizeInvoke的實現就是為了解決Control的操作必須在創建Control線程的問題,ISynchronizeInvoke定義Invoke和BeginInvoke方法方面我們以同步或者異步的方式操作Control:
public interface ISynchronizeInvoke
{
// Methods
[HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]
IAsyncResult BeginInvoke(Delegate method, object[] args);
object EndInvoke(IAsyncResult result);
object Invoke(Delegate method, object[] args);
// Properties
bool InvokeRequired { get; }
}
如何我們放棄基於SynchonizationContext的解決方案,我們也可以通過基於ISynchronizeInvoke的方式來解決這個問題。為此我們這樣定義CalculateCallback:
namespace Artech.ThreadAffinity.Clients
{
[CallbackBehavior(UseSynchronizationContext = false)]
public class CalculateCallback : ICalculateCallback
{
public static TextBox ResultPanel;
public delegate void DisplayResultDelegate(TextBox resultPanel, double result);
#region ICalculateCallback Members
public void DisplayResult(double result)
{
DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);
ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });
}
private void DisplayResult(TextBox resultPanel, double result)
{
resultPanel.Text = result.ToString();
}
#endregion
}
}
由於BeginInvoke方式只能接受一個具體的delegate對象(不能使用匿名方法),所以需要定義一個具體的Delegate(DisplayResultDelegate)和對應的方法(DisplayResult),參數通過一個object[]傳入。
從本質上將,這兩種方式的實現完全是一樣的,如何你查看System.Windows.Forms.WindowsFormsSynchronizationContext的代碼,你會發現其Send和Post方方法就是通過調用Invoke和BeginInvoke方式實現的。
六、Service Hosting的線程關聯性
我們花了很多的精力介紹了WCF Duplex通信中Callback操作的線程關聯性問題,實際上我們使用到更多的還是service操作的線程關聯性問題。就以我們上面的程序為例,我們通過一個Windows Form application來host我們的service,並且要求service的運算結束後將結果輸出到server端的Window form的ListBox中,對ListBox的操作肯定需要的Host程序的UI線程中執行。
按照我們一般的想法,我們的Service面向若干client,肯定是並發的接收client端的請求,以多線程的方式執行service的操作,那麼操作中UI 控件的操作肯定會出現錯誤。
我們的程序依然可以正常運行,其根本原因是WCF的service操作默認實現了對Host service的當前線程的SynchonizationContext實現了關聯。與Callback操作的線程關聯性通過CallbackBehavior的UseSynchronizationContext 進行控制一樣,service的線程關聯性通過ServiceBehavir的UseSynchronizationContext 進行設定。UseSynchronizationContext 的默認值為true。
如何我們將CalculateService的UseSynchronizationContext 設為false:
namespace Artech.ThreadAffinity.Services
{
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
public class CalculateService:ICalculate
{
public static ListBox DisplayPanel
{ get; set; }
#region ICalculate Members
public void Add(double op1, double op2)
{
double result = op1 + op2;
ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
callback.DisplayResult(result);
}
#endregion
}
}
有control被不是創建它的線程操作,肯定會拋出一個InvalidOperationException,就像這樣:
我們一樣可以通過SynchonizationContext或者ISynchronizeInvoke的方式來解決這樣的問題,我們只討論前面一種,為此我們改變了CalculateService的定義:通過SynchonizationContext的Post方法實現對ListBox的訪問。
namespace Artech.ThreadAffinity.Services
{
[ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
public class CalculateService:ICalculate
{
public static ListBox DisplayPanel
{ get; set; }
public static SynchronizationContext SynchronizationContext
{ get; set; }
#region ICalculate Members
public void Add(double op1, double op2)
{
double result = op1 + op2;
ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
SynchronizationContext.Post(delegate
{
DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
}, null);
callback.DisplayResult(result);
}
#endregion
}
}
通過static屬性定義的SynchonizationContext在host的時候指定:
private void HostForm_Load(object sender, EventArgs e)
{
this._serviceHost = new ServiceHost(typeof(CalculateService));
CalculateService.DisplayPanel = this.listBoxResult;
CalculateService.SynchronizationContext = SynchronizationContext.Current;
this._serviceHost.Opened += delegate
{
this.Text = "The calculate service has been started up!";
};
this._serviceHost.Open();
}
這樣我們的程序又可以正常運行了。
本文配套源碼