把一個流拷貝到另一個流是有用且常見的操作。Stream.CopyTo 方法在.Net 4中就已經加入來滿足要求這個功能的場景,例如在一個指定的URL處下載數據:
public static byte[] DownloadData(string url) { using(var request = WebRequest.Create(url)) using(var response = request.GetResponse()) using(var responseStream = response.GetResponseStream()) using(var result = new MemoryStream()) { responseStream.CopyTo(result); return result.ToArray(); } }
為了提高響應能力和伸縮性,我們想使用基於TAP模式來實現上面的功能。可以嘗試按下面的來做:
public static async Task<byte[]> DownloadDataAsync(string url) { using(var request = WebRequest.Create(url)) { return await Task.Run(() => { using(var response = request.GetResponse()) using(var responseStream = response.GetResponseStream()) using(var result = new MemoryStream()) { responseStream.CopyTo(result); return result.ToArray(); } } } }
此實現如果用於UI線程會提升響應能力,因為它脫離了從網絡流下載數據任務的調用線程以及把該網絡流復制到最終將下載的數據轉成一個數組的內存流。然而,該實現對伸縮性沒有效果,因為它在等待數據下載的過程中,仍舊執行同步I/O和阻塞線程池線程。反之,我們想要的是下面的功能代碼:
public static async Task<byte[]> DownloadDataAsync(string url) { using(var request = WebRequest.Create(url)) using(var response = await request.GetResponseAsync()) using(var responseStream = response.GetResponseStream()) using(var result = new MemoryStream()) { await responseStream.CopyToAsync(result); return result.ToArray(); } }
不幸的是,在.Net 4中缺少異步的CopyToAsync方法,只有Stream類有一個同步的CopyTo方法。現在我們就自己提供一個實現:
public static void CopyTo(this Stream source, Stream destination) { var buffer = new byte[0x1000]; int bytesRead; while((bytesRead = source.Read(buffer, 0, buffer.Length)) > 0) { destination.Write(buffer, 0, bytesRead); } }
為了提供一個異步的CopyTo實現,我們可以利用編譯器實現TAP的能力,稍微地修改這個實現:
public static async Task CopyToAsync(this Stream source, Stream destination) { var buffer = new byte[0x1000]; int bytesRead; while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0) { await destination.WriteAsync(buffer, 0, bytesRead); } }
這裡我們將返回類型從void改成了Task,將Read和Write分別換成了ReadAsync和WriteAsync,並且在ReadAsync和WriteAsync的調用前加了與上下文相關的await關鍵字前綴。.Net 4 中不存在ReadAsycn和WriteAsync,但是可以通過基於Task.Factory.FromAsync實現,關於這個描述在上一篇隨筆中的“Tasks和APM”章節講過:
public static Task<int> ReadAsync( this Stream source, byte [] buffer, int offset, int count) { return Task<int>.Factory.FromAsync(source.BeginRead, source.EndRead, buffer, offset, count, null); } public static Task WriteAsync( this Stream destination, byte [] buffer, int offset, int count) { return Task.Factory.FromAsync( destination.BeginWrite, destination.EndWrite, buffer, offset, count, null); }
有了這些方法,我們可以成功地實現CopyToAsync方法。我們也可以通過添加一個CancellationToken到方法中以支持撤銷請求,該CancellationToken將會在復制過程中的每次讀寫之後被監控到(如果ReadAsync和WriteAsync支持撤銷,那麼也可以將CancellationToken線程化到那些調用中):
public static async Task CopyToAsync( this Stream source, Stream destination, CancellationToken cancellationToken) { var buffer = new byte[0x1000]; int bytesRead; while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0) { await destination.WriteAsync(buffer, 0, bytesRead); cancellationToken.ThrowIfCancellationRequested(); } }
【注意這種撤銷在同步的CopyTo實現中也是有用的,傳入的CancellationToken會啟用撤銷。實現會依賴一個從該方法返回的可取消的對象,但實現接收到那麼對象已經太晚了,因為同步調用完成時,已經沒有留下要取消的東西了。】
我們也加入了進度通知的支持,包括至今已經復制了多少數據:
public static async Task CopyToAsync( this Stream source, Stream destination, CancellationToken cancellationToken, IProgress<long> progress) { var buffer = new byte[0x1000]; int bytesRead; long totalRead = 0; while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length)) > 0) { await destination.WriteAsync(buffer, 0, bytesRead); cancellationToken.ThrowIfCancellationRequested(); totalRead += bytesRead; progress.Report(totalRead); } }
有了該方法,我們現在可以完全實現我們的DownloadDataAsync方法了,包括加入撤銷和進度支持:
public static async Task<byte[]> DownloadDataAsync( string url, CancellationToken cancellationToken, IProgress<long> progress) { using(var request = WebRequest.Create(url)) using(var response = await request.GetResponseAsync()) using(var responseStream = response.GetResponseStream()) using(var result = new MemoryStream()) { await responseStream.CopyToAsync( result, cancellationToken, progress); return result.ToArray(); } }
給我們的CopyToAsync方法做進一步的優化也是可能的。比如,如果我們要使用兩個buffer而不是一個,就可以在讀取下一片數據時寫入之前讀取的數據,因此如果讀取和寫入都使用了異步了I/O就會產生交叉延遲:
public static async Task CopyToAsync(this Stream source, Stream destination) { int i = 0; var buffers = new [] { new byte[0x1000], new byte[0x1000] }; Task writeTask = null; while(true) { var readTask = source.ReadAsync(buffers[i], 0, buffers[i].Length))>0; if (writeTask != null) await Task.WhenAll(readTask, writeTask); int bytesRead = await readTask; if (bytesRead == 0) break; writeTask = destination.WriteAsync(buffers[i], 0, bytesRead); i ^= 1; // swap buffers } }
消除不必要的上下文轉換是另一個優化。正如之前提到的,默認await一個Task開始執行的時候,會傳輸回到當前的SynchronizationContext。在CopyToAsynch實現的情況下,使用這樣的轉換時沒必要的,因為我們沒有操作任何UI狀態。我們可以發揮Task.ConfigureAwait的優勢類關閉這個自動的轉換。為了簡化,上面的原始異步的實現修改如下:
public static Task CopyToAsync(this Stream source, Stream destination) { var buffer = new byte[0x1000]; int bytesRead; while((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false)) > 0) { await destination.WriteAsync(buffer, 0, bytesRead) .ConfigureAwait(false); } }
該系列終於結束了,謝謝大家多多支持。請繼續關注我們博客,我會陸續出些系列性的文章。
作者:tkb至簡 出處:http://www.cnblogs.com/farb/
QQ:782762625
歡迎各位多多交流!
本文版權歸作者和博客園共有,歡迎轉載。未經作者同意下,必須在文章頁面明顯標出原文鏈接及作者,否則保留追究法律責任的權利。
如果您認為這篇文章還不錯或者有所收獲,可以點擊右下角的【推薦】按鈕,因為你的支持是我繼續寫作,分享的最大動力!