概述
.NET基於委托的APM(Asynchronous Programming Model)模型通過BeginInvoke, EndInvoke, AsyncCallback,IAsyncResult的組合使用,讓程序員可以方便的進行異步調用、異步回調和同步等待等操作。但.NET平台還沒有為線程的中止(abort)提供安全可靠的機制,也許正是基於這個原因APM並沒有包含異步調用的超時機制,而是把這個可能引起爭議的工作交給使用者自己來把握。
作為APM模型的補充,本文通過CancellableTask類提供了一個異步調用超時機制。CancellableTask類的設計有兩個主要的考慮:
1.保持APM風格,使用者依然可以使用熟悉的BeginInvoke, EndInvoke, IAsyncResult, AsyncCallback等;
2.提供基於Thread.Abort的默認超時處理,同時支持用戶自定義cancel回調。
使用
CancellableTask的構造函數包含workCallbak和cancelCallback(可選)兩參數,分別對應work回調和cancel回調。CancellableTask的BeginInvoke保持了APM的風格,可以看作是增加了timeout參數(單位:ms)的擴展版;而EndInvoke,AsyncCallback以及IAsyncResult的使用都和APM保持一致。Work委托產生的異常會在EndInvoke時拋出,同時若線程被超時中止,EndInvoke則會拋出ThreadAbortException異常。
下面是一段CancellableTask的使用示例:
class Program { static void Main(string[] args) { //默認超時直接abort線程 { Console.WriteLine("[case 1]"); CancellableTask cancellableTask = new CancellableTask(Work); State arg = new State { Loop = 20, Stop = false }; IAsyncResult asyncResult = cancellableTask.BeginInvoke( arg, (ar => Console.WriteLine("Async Callback")), null, 10 * 1000); asyncResult.AsyncWaitHandle.WaitOne(); try { object r = cancellableTask.EndInvoke(asyncResult); Console.WriteLine("return " + r); } catch (ThreadAbortException) { Console.WriteLine("Thread Aborted"); } catch (Exception exp) { Console.WriteLine(exp.ToString()); } } //自定義Cancel回調 { Console.WriteLine(Environment.NewLine + "[case 2]"); CancellableTask cancellableTask = new CancellableTask(Work, Cancel); State arg = new State { Loop = 20, Stop = false }; IAsyncResult asyncResult = cancellableTask.BeginInvoke( arg, (ar => { try { object r = cancellableTask.EndInvoke(ar); Console.WriteLine("return " + r); } catch (ThreadAbortException) { Console.WriteLine("Thread Aborted"); } catch (Exception exp) { Console.WriteLine(exp.ToString()); } } ), arg, 10 * 1000); } Console.ReadLine(); } static object Work(object arg) { State state = arg as State; int i; for (i = 0; i < state.Loop; i++) { if (state.Stop) break; Console.WriteLine(i); Thread.Sleep(1000); } return i; } static void Cancel(object state) { State st = state as State; st.Stop = true; } } internal class State { public int Loop { get; set; } public bool Stop { get; set; } }
實現
CancellableTask通過wrapper對workCallback進行包裝。Wrapper內部首先創建等待事件e,並通過ThreadPool.RegisterWaitForSingleObject注冊事件和WaitOrTimeout回調,然後調用workCallback。若workCallback提前返回,調用e.Set(),ThreadPool會調用WaitOrTimeout回調,isTimeout參數為false,不進行處理;否則,當workCallback超時未返回,ThreadPool會調用WaitOrTimeout回調,isTimeout參數為true。WaitOrTimeout回調在isTimeout情況下,首先判斷是否有自定義cancel回調,如果有則采用自定義回調;否則,默認情況下調用Thread.Abort終止work線程。下面是CancellableTask的實現細節:
public class CancellableTask { public delegate object WorkCallback(object arg); public delegate void CancelCallback(object state); protected class TimeoutState { internal Thread thread; internal object state; public TimeoutState(Thread thread, object state) { this.thread = thread; this.state = state; } } protected WorkCallback workCallback; protected CancelCallback cancelCallback; protected WorkCallback wrapper; public CancellableTask(WorkCallback workCallback) { this.workCallback = workCallback; } public CancellableTask(WorkCallback workCallback, CancelCallback cancelCallback) { this.workCallback = workCallback; this.cancelCallback = cancelCallback; } public IAsyncResult BeginInvoke(object arg, AsyncCallback asyncCallback, object state, int timeout) { wrapper = delegate(object argv) { AutoResetEvent e = new AutoResetEvent(false); try { TimeoutState waitOrTimeoutState = new TimeoutState(Thread.CurrentThread, state); ThreadPool.RegisterWaitForSingleObject(e, WaitOrTimeout, waitOrTimeoutState, timeout, true); return workCallback(argv); } finally { e.Set(); } }; IAsyncResult asyncResult = wrapper.BeginInvoke(arg, asyncCallback, state); return asyncResult; } public object EndInvoke(IAsyncResult result) { return wrapper.EndInvoke(result); } protected void WaitOrTimeout(object state, bool isTimeout) { try { if (isTimeout) { TimeoutState waitOrTimeoutState = state as TimeoutState; if (null != cancelCallback) { cancelCallback(waitOrTimeoutState.state); } else { waitOrTimeoutState.thread.Abort(); } } } catch { } } }
總結
本文為.NET APM模型提供了異步超時機制擴展,一方面保持了APM編程風格,另一方面支持用戶自定義cancel回調。需要注意的是,默認的cancel方式Thread.Abort的安全性問題,使用時應注意資源釋放等。