程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> 並發事件: 實現CLR異步編程模型

並發事件: 實現CLR異步編程模型

編輯:關於.NET

通常 I/O 操作的特點是速度慢、不可預見。當應用程序執行同步 I/O 操作時,基本上會放棄對正在 完成實際工作的設備的控制。例如,如果應用程序調用 StreamRead 方法從 FileStream 或 NetworkStream 讀取某些字節,我們無法預計該方法需要多長時間才能返回。如果正在被讀取的文件位於 本地硬盤上,那麼 Read 操作可能會立即返回。如果存儲文件的遠程服務器脫機,則 Read 方法可能會等 待幾分鐘,然後超時並引發異常。在此期間,發出同步請求的線程會被占用。如果該線程是 UI 線程,則 應用程序將被凍結並停止響應用戶的輸入。

正在等待同步 I/O 完成的線程受阻,意味著該線程雖 然空閒,但無法執行有用工作。為提高可伸縮性,許多應用程序開發人員會創建更多線程。但遺憾的是, 每個線程都會帶來相當大的管理開銷,如其內核對象、用戶模式和內核模式堆棧、增加的環境切換、調用 帶有線程附加/分離通知的 DllMain 方法等。最終的結果是可伸縮性被降低了。

如果應用程序希 望保持對用戶的響應能力、提高可伸縮性和吞吐量並提高可靠性,則不應同步執行 I/O 操作。該應用程 序應使用公共語言運行庫 (CLR) 異步編程模型 (APM) 來執行異步 I/O 操作。關於如何使用 CLR APM 有 許多書面資料,其中包括我的著作《CLR via C#》第二版 (Microsoft Press®, 2006) 的第 23 章。 但我沒有注意到有哪個資料解釋了如何定義一個類,提供用於實施 APM 的方法。因此我決定在此專欄中 著重關注這一主題。

開發人員希望實施 APM 基本上有四個原因。第一,您可能要構建一個類,用 於直接與硬件(如硬盤上的文件系統、網絡、串行端口或並行端口)進行通信。正如上面提到的,設備 I/O 是不可預見的,因此應用程序在與硬件通信時應該執行異步 I/O 操作,以使應用程序保持響應能力 、伸縮性和可靠性。

幸運的是,Microsoft® .NET Framework 已經包含了與許多硬件設備通 信的類。因此,除非您要定義一個用於與 Framework 類庫 (FCL) 不支持的硬件設備(如並行端口)進行 通信的類,否則不需要親自實施 APM。可是盡管 FCL 支持某些設備,但不支持其中的某些特定子功能。 在這種情況下,如果您希望執行 I/O 操作,則可能需要實施 APM。例如,FCL 雖然提供了允許應用程序 與磁盤進行通信的 FileStream 類,但 FileStream 不允許您訪問伺機鎖定 (microsoft.com/msj/0100/win32/win320100.aspx)、稀疏文件流 (microsoft.com/msj/1198/ntfs/ntfs.aspx) 或 NTFS 文件系統提供的其他新奇的功能。如果要編寫 P/Invoke 包裝來調用提供這些功能的 Win32® API,那麼您可能會希望包裝支持 APM,從而可以異步 執行操作。

第二,您可能要在一個已經定義的與硬件直接通信的類上構建一個抽象層。.NET Framework 中已經提供了幾個這樣的示例。例如,假設您要將數據發送給一個 Web 服務方法。在 Web 服 務客戶端代理類上,有一個方法可接受您的參數,這些參數可能是一個復雜的數據結構集。在內部,該方 法將這些復雜的數據結構序列化為一個字節數組。然後使用 NetworkStream 類(該類已經具備了使用異 步 I/O 與硬件通信的能力)通過網絡發送該字節數組。另一個示例出現在訪問數據庫時。ADO.NET SqlCommand 類型提供了 BeginExecuteNonQuery、BeginExecuteReader 和其他 BeginXxx 方法,這些方 法可以分析參數,以便將數據通過網絡發送到數據庫。在 I/O 完成時,會調用相應的 EndExecuteNonQuery、EndExecuteReader 和其他 EndXxx 方法。在內部,這些 EndXxx 方法分析得到的 數據並將富數據對象返回給調用方。

第三,您的類可能會提供一種方法,這種方法執行起來可能 需要很長時間。在這種情況下,您可能希望提供 BeginXxx 和 EndXxx 方法,為調用方提供方便的 APM。 前面的示例最終是 I/O 密集型操作,與之不同,這次您的方法是執行計算密集型操作。由於是計算密集 型操作,因此必須使用一個線程來執行該工作;定義 BeginXxx 和 EndXxx 方法只是為了方便您的類用戶 。

最後,您的類可能包含執行同步 I/O 的 Win32 方法。遺憾的是,有大量執行 I/O 的 Win32 方法,但對於這些方法 Win32 無法異步執行這些 I/O 操作。例如,Win32 注冊表和事件日志函數可能會 與本地或遠程注冊表/事件日志通信。Microsoft 可能會創建這些 Win32 函數的異步版本,使線程在這些 版本上不受阻。但到目前為止,這些 Win32 函數的異步版本還不存在。當我用托管代碼來包裝這類方法 時,我始終提供 BeginXxx 和 EndXxx 方法,因此即使在我的應用程序不夠高效時也能用托管代碼完成操 作,因為在 Windows® 執行同步 I/O 操作時我的方法肯定存在線程受阻。但是,如果 Microsoft 將 這些方法的異步版本添加到 Win32,我可能會更改我的包裝,以便利用新方法提高效率,而根本不必更改 我的客戶端代碼。

APM 的核心:IAsyncResult

CLR APM 的核心是 IAsyncResult 接口,定義如圖 1 所示。

Figure 1 IAsyncResult

public interface IAsyncResult {
  WaitHandle AsyncWaitHandle { get; } // For Wait-Until-Done technique
  Boolean  IsCompleted   { get; } // For Polling technique
  Object   AsyncState   { get; } // For Callback technique
  Boolean  CompletedSynchronously { get; } // Almost never used
}

當調用任何 BeginXxx 方法時,該方法必須在內部構建一個對象,其類型用於實施 IAsyncResult 及其四個只讀屬性。該對象可識別剛剛啟動的異步操作的狀態。在 BeginXxx 方法返回給 應用程序代碼後,應用程序可以查詢這些屬性,以確定操作是否已完成。此對象還包含已完成操作的狀態 :如果操作成功完成則是結果值,如果操作沒有成功完成則是異常。應用程序將 IAsyncResult 對象傳遞 給 EndXxx 方法,該方法等待操作完成(假定其尚未完成)。EndXxx 方法會返回結果,或引發異常,使 調用方獲知操作的結果或操作錯誤。

圖 2 定義了一個 AsyncResultNoResult 類,用於實施 IAsyncResult 接口。這個簡單的類可以用於沒有返回值的異步操作 - 具體來說就是,操作成功或者失敗 。Stream 的 BeginWrite 和 EndWrite 方法就屬於這種情況。當您啟動對流的異步寫操作時,結果是成 功或失敗 - EndWrite 方法還原為返回 void。

Figure 2 AsyncResultNoResult 類

internal class AsyncResultNoResult : IAsyncResult
{
  // Fields set at construction which never change while
  // operation is pending
  private readonly AsyncCallback m_AsyncCallback;
  private readonly Object m_AsyncState;
  // Fields set at construction which do change after
  // operation completes
  private const Int32 c_StatePending = 0;
  private const Int32 c_StateCompletedSynchronously = 1;
  private const Int32 c_StateCompletedAsynchronously = 2;
  private Int32 m_CompletedState = c_StatePending;
  // Field that may or may not get set depending on usage
  private ManualResetEvent m_AsyncWaitHandle;
  // Fields set when operation completes
  private Exception m_exception;
  public AsyncResultNoResult(AsyncCallback asyncCallback, Object state)
  {
   m_AsyncCallback = asyncCallback;
   m_AsyncState = state;
  }
  public void SetAsCompleted(
   Exception exception, Boolean completedSynchronously)
  {
   // Passing null for exception means no error occurred.
   // This is the common case
   m_exception = exception;
   // The m_CompletedState field MUST be set prior calling the callback
   Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
     completedSynchronously ? c_StateCompletedSynchronously :
     c_StateCompletedAsynchronously);
   if (prevState != c_StatePending)
     throw new InvalidOperationException(
       "You can set a result only once");
   // If the event exists, set it
   if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();
   // If a callback method was set, call it
   if (m_AsyncCallback != null) m_AsyncCallback(this);
  }
  public void EndInvoke()
  {
   // This method assumes that only 1 thread calls EndInvoke
   // for this object
   if (!IsCompleted)
   {
     // If the operation isn't done, wait for it
     AsyncWaitHandle.WaitOne();
     AsyncWaitHandle.Close();
     m_AsyncWaitHandle = null; // Allow early GC
   }
   // Operation is done: if an exception occured, throw it
   if (m_exception != null) throw m_exception;
  }
  #region Implementation of IAsyncResult
  public Object AsyncState { get { return m_AsyncState; } }
  public Boolean CompletedSynchronously {
   get { return Thread.VolatileRead(ref m_CompletedState) ==
        c_StateCompletedSynchronously; }
  }
  public WaitHandle AsyncWaitHandle
  {
   get
   {
     if (m_AsyncWaitHandle == null)
     {
      Boolean done = IsCompleted;
      ManualResetEvent mre = new ManualResetEvent(done);
      if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,
        mre, null) != null)
      {
        // Another thread created this object's event; dispose
        // the event we just created
        mre.Close();
      }
      else
      {
        if (!done && IsCompleted)
        {
         // If the operation wasn't done when we created
         // the event but now it is done, set the event
         m_AsyncWaitHandle.Set();
        }
      }
     }
     return m_AsyncWaitHandle;
   }
  }
  public Boolean IsCompleted {
   get { return Thread.VolatileRead(ref m_CompletedState) !=
        c_StatePending; }
  }
  #endregion
}

正如您所見,AsyncResultNoResult 類有一個構造函數,可接受 AsyncCallback 和 Object 參數,這些參數用於啟動所有異步操作。該構造函數僅將這些參數保存在私有字段中。IAsyncResult 的 AsyncState 屬性將 Object 字段返回給調用方。該類定義了用於實施 IAsyncResult 的 IsCompleted 和 CompletedSynchronously 屬性的 m_CompletedState 字段。它還定義了用於實施 IAsyncResult 的 AsyncWaitHandle 屬性的 m_AsyncWaitHandle 字段。最後,該類定義了 m_exception 字段。當操作完成 時設置此字段。如果操作成功完成,則該字段被設置為 null(與其初始值相同);如果操作失敗,則該 字段被設置為異常派生的對象,表明失敗的原因。

如果對 AsyncResultNoResult 類進行分析,您會發現整個代碼非常直觀 - 處理 m_AsyncWaitHandle 字段的部分出外。此字段作為對 ManualResetEvent 對象的引用,只有在啟動異步操作的代碼查詢 AsyncWaitHandle 屬性或代碼在操作實際完成執行之前調用 EndInvoke 方法時才需要該字段。使用 APM 的最常見(也是推薦的)方法就是指定一種 AsyncCallback 方法,當操作完成時應自動調用該方法。對 於這種最常見的使用方法,根本不需要 ManualResetEvent 對象。因此,我能夠在很大程度上避免創建和 使用此對象,除非使用 AsyncResultNoResult 對象的代碼確實需要它。

我要極力避免這一點的原 因在於,創建和使用內核對象(如 ManualResetEvent)相對來說成本高昂。有關使用內核對象帶來的性 能損失的詳細信息,請參閱我在 2005 年 10 月撰寫的“並發事件”專欄 (msdn.microsoft.com/msdnmag/issues/05/10/ConcurrentAffairs)。

當異步操作完成時,一些代 碼必須調用 AsyncResultNoResult 的 SetAsCompleted 方法,如果操作成功完成則傳入空值,如果操作 失敗則傳入對異常派生的對象的引用。該代碼還會表明操作是同步完成(幾乎從不)還是異步完成(幾乎 總是)。IAsyncResult 的 CompletedSynchronously 屬性會返回這一信息,但應用程序很少會真正關心 這一點。

在內部,SetAsCompleted 會將異常保存在 m_exception 字段中,並更改 m_completedSynchronously 字段的狀態。然後,如果創建了手動重置事件對象,則會對其進行設置。最 後,如果在構建 AsyncResultNoResult 對象時指定了 AsyncCallback 方法,則回調該方法,使應用程序 代碼獲知異步操作是否已完成,從而使其可以處理結果(或失敗)。

為獲得操作的結果,應用程 序代碼將調用某種 EndXxx 方法,而該方法將調用 AsyncResultNoResult 的 EndInvoke 方法,以確定操 作是否已成功。如果在操作完成前調用了 EndInvoke,則 EndInvoke 會使用手動重置事件掛起調用線程 ,直到操作完成。如果操作完成,EndInvoke 會返回或引發先前在調用 SetAsCompleted 時保存的異常。

由於許多異步操作都有返回值,我也定義了一個類以支持這種情況: AsyncResult<TResult>(參見圖 3)。此泛型類由 AsyncResultNoResult 派生而來,實際上只是 增加了對 TResult 類型的返回值的支持。此支持采用私有字段的形式來存放結果 (m_result)、接受 TResult 值的 SetAsCompleted 方法的重載以及等待操作完成的新的 EndInvoke 方法,然後如果操作成 功完成則返回結果,如果操作失敗則引發異常。

Figure 3 帶返回值的 AsyncResult

internal class AsyncResult<TResult> : AsyncResultNoResult
{
  // Field set when operation completes
  private TResult m_result = default(TResult);
  public AsyncResult(AsyncCallback asyncCallback, Object state) :
   base(asyncCallback, state) { }
  public void SetAsCompleted(TResult result,
   Boolean completedSynchronously)
  {
   // Save the asynchronous operation's result
   m_result = result;
   // Tell the base class that the operation completed
   // sucessfully (no exception)
   base.SetAsCompleted(null, completedSynchronously);
  }
  new public TResult EndInvoke()
  {
   base.EndInvoke(); // Wait until operation has completed
   return m_result; // Return the result (if above didn't throw)
  }
}

許多 BeginXxx 方法也接受除 AsyncCallback 和 Object 之外的參數。例如,Socket 類具有 一種帶有 IPAddress(地址)和 Int32(端口)參數的 BeginAccept 方法。如果您希望使用具有 BeginXxx 方法(帶有附加的參數)的 AsyncResultNoResult 或 AsyncResult<TResult> 類,您會 希望定義由這兩個基類中的一個派生出的特有類型(取決於您的 EndXxx 方法是否返回 void)。在您的 類中,為每個參數定義一個附加的字段,並在您的類的構造函數中對它們進行設置。然後完成實際工作的 方法就可以在適當的時候從您的類的字段提取這些參數值。

實施 APM

現在您了解了如何定 義一個類型用以實施 IAsyncResult 接口,接下來我將介紹如何使用我的 AsyncResult<TResult> 和 AsyncResultNoResult 類。我定義了一個 LongTask 類(參見圖 4),其中提供了一種同步 DoTask 方法,該方法執行時間較長,會返回一個 DateTime 實例,表明操作是何時完成的。

Figure 4 LongTask 模擬異步 I/O

internal sealed class LongTask
{
  private Int32 m_ms; // Milliseconds;
  public LongTask(Int32 seconds)
  {
   m_ms = seconds * 1000;
  }
  // Synchronous version of time-consuming method
  public DateTime DoTask()
  {
   Thread.Sleep(m_ms); // Simulate time-consuming task
   return DateTime.Now; // Indicate when task completed
  }
  // Asynchronous version of time-consuming method (Begin part)
  public IAsyncResult BeginDoTask(AsyncCallback callback, Object state)
  {
   // Create IAsyncResult object identifying the
   // asynchronous operation
   AsyncResult<DateTime> ar = new AsyncResult<DateTime>(
     callback, state);
   // Use a thread pool thread to perform the operation
   ThreadPool.QueueUserWorkItem(DoTaskHelper, ar);
   return ar; // Return the IAsyncResult to the caller
  }
  // Asynchronous version of time-consuming method (End part)
  public DateTime EndDoTask(IAsyncResult asyncResult)
  {
   // We know that the IAsyncResult is really an
   // AsyncResult<DateTime> object
   AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;
   // Wait for operation to complete, then return result or
   // throw exception
   return ar.EndInvoke();
  }
  // Asynchronous version of time-consuming method (private part
  // to set completion result/exception)
  private void DoTaskHelper(Object asyncResult)
  {
   // We know that it's really an AsyncResult<DateTime> object
   AsyncResult<DateTime> ar = (AsyncResult<DateTime>)asyncResult;
   try
   {
     // Perform the operation; if sucessful set the result
     DateTime dt = DoTask();
     ar.SetAsCompleted(dt, false);
   }
   catch (Exception e)
   {
     // If operation fails, set the exception
     ar.SetAsCompleted(e, false);
   }
  }
}

為方便起見,我還提供了遵循 CLR APM 的 BeginDoTask 和 EndDoTask 方法,使用戶可以異 步執行 DoTask 方法。當用戶調用我的 BeginDoTask 方法時,我會構造一個 AsyncResult<DateTime> 對象。然後我用一個線程池線程調用一個小的幫助器方法 DoTaskHelper ,該方法包含對該同步 DoTask 方法的調用。

DoTaskHelper 方法只是通過一個 try 塊調用同步 版本的方法。如果 DoTask 方法從開始運行到完成的過程中沒有出現故障(引發異常),那麼我將調用 SetAsCompleted 來設置操作的返回值。如果 DoTask 方法引發異常,則 DoTaskHelper 的 catch 塊將捕 獲異常,並通過調用 SetAsCompleted 表明該操作已完成,傳入對異常派生的對象的引用。

應用 程序代碼調用 LongTask 的 EndDoTask 方法,以獲得操作結果。將 IAsyncResult 傳遞給所有 EndXxx 方法。在內部,EndDoTask 方法獲知傳遞給它的 IAsyncResult 對象是真正的 AsyncResult<DateTime> 對象,對其進行轉換並利用它調用 EndInvoke。正如上面討論的, AsyncResult<TResult> 的 EndInvoke 方法等待操作完成(如果必要),然後返回結果或引發異常 ,表明已將異步操作的結果返回給調用方。

測試和性能

FunctionalTest 方法(參見圖 5 )顯示了使用我的 APM 實施方法的部分代碼。它對 APM 提供的三種集合方法進行了測試:等待直到完成 、輪詢和回調方法。如果檢查該代碼,您會發現它與您所見過的其他 APM 的用法完全相同。當然,這是 整個練習的關鍵所在。

Figure 5 使用 LongTask

private static void FunctionalTest()
{
 IAsyncResult ar;
 LongTask lt = new LongTask(5);
 // Prove that the Wait-until-done technique works
 ar = lt.BeginDoTask(null, null);
 Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));
 // Prove that the Polling technique works
 ar = lt.BeginDoTask(null, null);
 while (!ar.IsCompleted)
 {
   Console.WriteLine("Not completed yet.");
   Thread.Sleep(1000);
 }
 Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));
 // Prove that the Callback technique works
 lt.BeginDoTask(TaskCompleted, lt);
 Console.ReadLine();
}
private static void TaskCompleted(IAsyncResult ar)
{
 LongTask lt = (LongTask)ar.AsyncState;
 Console.WriteLine("Task completed at: {0}", lt.EndDoTask(ar));
 Console.WriteLine("All done, hit Enter to exit app.");
}

PerformanceTest 方法(參見圖 6)將我的 IAsyncResult 實施方法與當使用委托的 BeginInvoke 和 EndInvoke 方法時 CLR 提供的實施方法進行了比較。我的實施執行情況似乎比 FCL 的 當前實施情況要好,顯然是由於後者無論何時創建其 IAsyncResult 對象時始終都要構造 ManualResetEvent,無論應用程序是否需要該事件。

Figure 6 測試 IAsyncResult 性能

private const Int32 c_iterations = 100 * 1000; // 

100 thousand
private static Int32 s_numDone;
private delegate DateTime DoTaskDelegate ();
private static void PerformanceTest()
{
AutoResetEvent are = new AutoResetEvent(false);
LongTask lt = new LongTask(0);
Stopwatch sw;
s_numDone = 0;
sw = Stopwatch.StartNew();
for (Int32 n = 0; n < c_iterations; n++)
{
lt.BeginDoTask(delegate(IAsyncResult ar)
{
if (Interlocked.Increment(ref s_numDone) == c_iterations)
are.Set();
}, null);
}
are.WaitOne();
Console.WriteLine("AsyncResult Time: {0} ", sw.Elapsed);
s_numDone = 0;
DoTaskDelegate doTaskDelegate = lt.DoTask;
sw = Stopwatch.StartNew();
for (Int32 n = 0; n < c_iterations; n++)
{
doTaskDelegate.BeginInvoke(delegate(IAsyncResult ar)
{
if (Interlocked.Increment(ref s_numDone) == c_iterations)
are.Set();
}, null);
}
are.WaitOne();
Console.WriteLine("Delegate Time: {0}", sw.Elapsed);
}

總結

我認為在我們使用 APM 這樣的機制時了解 CLR 內部發生的情況非常有趣。在考察了我在本文中介紹 的實施方法後,您會對 IAsyncResult 對象的大小、其狀態以及它們如何管理其狀態有大致的了解。了解 這些內容可以幫助您改善構建應用程序的方法,並獲得更好的性能。

在本專欄中,我使用我的 IAsyncResult 實施方法來執行采用線程池線程的計算密集型任務。在今後 的專欄中,我將介紹如何使用我的 IAsyncResult 實施方法處理 I/O 密集型操作。

將您想向 Jeffrey 詢問的問題和提出的意見發送至:[email protected] [email protected].

本文配套源碼:http://www.bianceng.net/dotnet/201212/749.htm

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved