程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> .NET相關問題:流管道

.NET相關問題:流管道

編輯:關於.NET

問:在我的應用程序中,我要對大量數據進行加密和壓縮操作。由於這些屬於計算密集型操作,我原以為“任務管理器”中 CPU 的使用率會高達 100%,但結果卻發現在雙核計算機上,CPU 的使用率最高才達到 50% 左右。我認為這是因為只使用了一個核心,使得執行任務時需要花費非平凡的時間,計算性能因此顯得十分遜色。請問有沒有辦法使用兩個核心處理加密和壓縮這兩項任務?我使用的是 Microsoft® .NET Framework 中提供的 CryptoStream 和 GzipStream。

問:在我的應用程序中,我要對大量數據進行加密和壓縮操作。由於這些屬於計算密集型操作,我原以為“任務管理器”中 CPU 的使用率會高達 100%,但結果卻發現在雙核計算機上,CPU 的使用率最高才達到 50% 左右。我認為這是因為只使用了一個核心,使得執行任務時需要花費非平凡的時間,計算性能因此顯得十分遜色。請問有沒有辦法使用兩個核心處理加密和壓縮這兩項任務?我使用的是 Microsoft® .NET Framework 中提供的 CryptoStream 和 GzipStream。

答:首先要確認一下,您是先進行壓縮後進行加密嗎?如果不是,請更改操作順序。安全有效的加密會產生相對不可壓縮的數據。如果您將操作順序更改為先壓縮後加密,不僅可以得到更小的文件,而且由於要加密的數據更少,加密時間也可以更短。為了舉例說明,我從古登堡計劃 (Gutenberg Project, http://www.gutenberg.org/) 下載了《戰爭與和平》的文本,然後按照兩種順序分別進行了操作。先加密(使用 RijndaelManaged 和默認密鑰長度)後壓縮生成的數據流要比與先壓縮後加密的大 250%,而且執行時間也增加了 50%。

答:首先要確認一下,您是先進行壓縮後進行加密嗎?如果不是,請更改操作順序。安全有效的加密會產生相對不可壓縮的數據。如果您將操作順序更改為先壓縮後加密,不僅可以得到更小的文件,而且由於要加密的數據更少,加密時間也可以更短。為了舉例說明,我從古登堡計劃 (Gutenberg Project, http://www.gutenberg.org/) 下載了《戰爭與和平》的文本,然後按照兩種順序分別進行了操作。先加密(使用 RijndaelManaged 和默認密鑰長度)後壓縮生成的數據流要比與先壓縮後加密的大 250%,而且執行時間也增加了 50%。

現在言歸正題,我來回答您的實際問題:是的,解決這一問題的方法很多。第一種方法是並行執行實際的壓縮和加密操作。您可能不希望(也不應該)重新實現 GZipStream 和 CryptoStream 中的功能,因此,在 .NET Framework 團隊實現這兩項任務的並行操作之前,您需要一套替代的解決方案。

如果您不在乎實際的輸出格式(例如,只進行壓縮,但並不關心壓縮實際上是否符合 gzip 標准),可以將輸入分塊,然後並行處理每個塊。例如,在雙核計算機上,您可以將當前傳送到 GzipStream 的輸入字節數組分成兩半,使用一個 GZipStream 處理其中一半,使用另一個 GzipStream 處理另一半。然後再將這些輸出內容依次保存到輸出文件。

但是請注意,您很可能需要在輸出內容中包含某些標頭信息,以便解壓縮進程能夠准確地確定一個 GZipStream 的結束位置和下一個 GzipStream 的開始位置。當前 GZipStream 在讀取輸入流中數據的同時會對數據進行緩存,因此,它最終從輸入流獲得的數據有時會超出其實際所需,這意味著,您需要在第一個 GZipStream 完成解壓縮之後重置輸入流中的位置標記。

分塊做法的好處之一在於,當處理器的核心多於兩個時,由於您可以根據需要盡可能多地分割數據以充分利用各個處理器的運算能力,因此您能夠相對較好地擴展進程。但是如果只有兩個核心,並且只進行兩種操作(壓縮和加密),那麼創建並行流管道的方法對您來說可能更為實用。您可以讓一個處理器執行一種操作,同時讓另一個處理器執行另一種操作。很顯然,目前我們無法對同一數據同時進行壓縮和加密,因為這種方法相應地產生兩種輸出(壓縮輸出和加密輸出),這對於解決您的問題(得到經過壓縮和加密的輸出內容)並無太大的意義。但是,您可以模仿現在可能執行的操作,使用裝飾模式將一個流的輸出作為輸入傳送到下一個流(如圖 1 所示):

圖 1 通過流傳送數據

using (CryptoStream encrypt = new CryptoStream(
    output, transform, CryptoStreamMode.Write))
using (GZipStream compress = new GZipStream(
    encrypt, CompressionMode.Compress, true))
  CopyStream(input, compress);
...
static void CopyStream(Stream input, Stream output){
  byte[] buffer = new byte[0x1000];
  int read;
  while ((read = input.Read(buffer, 0, buffer.Length)) > 0)
    output.Write(buffer, 0, read);
}

此處,CopyStream 方法正在將數據從輸入流復制到壓縮流。壓縮流壓縮完一個數據緩存塊之後,會將該數據輸出作為輸入依次寫入到加密流。同樣,當加密流完成對一個數據緩存塊的加密之後,會將其依次寫出到輸出流。這一過程被稱為管道。

並行管道是現實生活中常見的一個概念。假設有一組人要向外發送邀請信。甲負責將信件折成可裝進信封的大小,乙負責將折好的信件放進信封,丙負責密封信封和粘貼郵票。對於第一封信,最開始只有甲在折信,其他人處於空閒狀態。但隨後甲會將折好的信件交給乙裝進信封。此時,甲忙於折疊下一封信,而乙將忙於裝信封,但丙仍然空閒。甲和乙完成各自的任務後,會將裝好的信封交給丙(由丙密封信封和粘貼郵票),而甲折好的信件會交給乙(由乙裝信),這時甲又開始折疊下一封信。從此點開始一直到只剩兩封信件的這段時間內,三人都全部得到充分利用(即,除了人與人之間交送信件和信封的時間外,其余時間每個人都在忙碌)。因此,盡管每人每次只負責整個流程的一個環節,如果待發送的信件足夠多,則從理論上講,在大部分時間內采用此方法完成的工作量是三人各自完成整個流程工作總量的三倍。

流管道任務的操作流程與上述流程相同。當然,不是傳遞折好的信件或信封,而是傳送數據緩沖塊,因為這是流進行通信所使用的機制。但與順序傳送方式(將一個流直接寫入另一個流)不同,由於每個流在各自獨立的線程上並行運行,因此無法再進行直接同步通信。我們需要另一種機制來重新啟用這種通信。為此,我編寫了一個 BlockingStream,如圖 2 所示。

Figure 2 BlockingStream 實現

public class BlockingStream : Stream
{
  private object _lockForRead;
  private object _lockForAll;
  private Queue<byte[]> _chunks;
  private byte[] _currentChunk;
  private int _currentChunkPosition;
  private ManualResetEvent _doneWriting;
  private ManualResetEvent _dataAvailable;
  private WaitHandle[] _events;
  private int _doneWritingHandleIndex;
  private volatile bool _illegalToWrite;
  public BlockingStream()
  {
    _chunks = new Queue<byte[]>();
    _doneWriting = new ManualResetEvent(false);
    _dataAvailable = new ManualResetEvent(false);
    _events = new WaitHandle[] { _dataAvailable, _doneWriting };
    _doneWritingHandleIndex = 1;
    _lockForRead = new object();
    _lockForAll = new object();
  }
  public override bool CanRead { get { return true; } }
  public override bool CanSeek { get { return false; } }
  public override bool CanWrite { get { return !_illegalToWrite; } }
  public override void Flush() { }
  public override long Length {
    get { throw new NotSupportedException(); } }
  public override long Position {
    get { throw new NotSupportedException(); }
    set { throw new NotSupportedException(); } }
  public override long Seek(long offset, SeekOrigin origin) {
    throw new NotSupportedException(); }
  public override void SetLength(long value) {
    throw new NotSupportedException(); }
  public override int Read(byte[] buffer, int offset, int count)
  {
    if (buffer == null) throw new ArgumentNullException("buffer");
    if (offset < 0 || offset >= buffer.Length)
      throw new ArgumentOutOfRangeException("offset");
    if (count < 0 || offset + count > buffer.Length)
      throw new ArgumentOutOfRangeException("count");
    if (_dataAvailable == null)
      throw new ObjectDisposedException(GetType().Name);
    if (count == 0) return 0;
    while (true)
    {
      int handleIndex = WaitHandle.WaitAny(_events);
      lock (_lockForRead)
      {
        lock (_lockForAll)
        {
          if (_currentChunk == null)
          {
            if (_chunks.Count == 0)
            {
              if (handleIndex == _doneWritingHandleIndex)
                return 0;
              else continue;
            }
            _currentChunk = _chunks.Dequeue();
            _currentChunkPosition = 0;
          }
        }
        int bytesAvailable =
          _currentChunk.Length - _currentChunkPosition;
        int bytesToCopy;
        if (bytesAvailable > count)
        {
          bytesToCopy = count;
          Buffer.BlockCopy(_currentChunk, _currentChunkPosition,
            buffer, offset, count);
          _currentChunkPosition += count;
        }
        else
        {
          bytesToCopy = bytesAvailable;
          Buffer.BlockCopy(_currentChunk, _currentChunkPosition,
            buffer, offset, bytesToCopy);
          _currentChunk = null;
          _currentChunkPosition = 0;
          lock (_lockForAll)
          {
            if (_chunks.Count == 0) _dataAvailable.Reset();
          }
        }
        return bytesToCopy;
      }
    }
  }
  public override void Write(byte[] buffer, int offset, int count)
  {
    if (buffer == null) throw new ArgumentNullException("buffer");
    if (offset < 0 || offset >= buffer.Length)
      throw new ArgumentOutOfRangeException("offset");
    if (count < 0 || offset + count > buffer.Length)
      throw new ArgumentOutOfRangeException("count");
    if (_dataAvailable == null)
      throw new ObjectDisposedException(GetType().Name);
    if (count == 0) return;
    byte[] chunk = new byte[count];
    Buffer.BlockCopy(buffer, offset, chunk, 0, count);
    lock (_lockForAll)
    {
      if (_illegalToWrite)
        throw new InvalidOperationException(
          "Writing has already been completed.");
      _chunks.Enqueue(chunk);
      _dataAvailable.Set();
    }
  }
  public void SetEndOfStream()
  {
    if (_dataAvailable == null)
      throw new ObjectDisposedException(GetType().Name);
    lock (_lockForAll)
    {
      _illegalToWrite = true;
      _doneWriting.Set();
    }
  }
  public override void Close()
  {
    base.Close();
    if (_dataAvailable != null)
    {
      _dataAvailable.Close();
      _dataAvailable = null;
    }
    if (_doneWriting != null)
    {
      _doneWriting.Close();
      _doneWriting = null;
    }
  }
}

BlockingStream 背後的概念與 .NET Framework 中其他大多數流的有些不同。與其他流一樣,它也是從 System.IO.Stream 派生而來的,並重寫了其所有抽象方法和屬性。但涉及到位置和線程安全性時,它的行為略有特別之處。.NET Framework 中的大多數流都不是線程安全的,即多個線程無法同時安全地訪問該流的一個實例,並且多數流的下一次讀寫發生的位置僅有一個。而 BlockingStream 則是線程安全的,在某種意義上來說,它隱式地擁有兩個位置,盡管這兩個位置都不以數字數值的形式向該用戶類型公開。

BlockingStream 通過將寫入的數據緩存塊排入內部隊列的方式進行工作。將數據寫入該流時,寫入的緩存塊就會被加入到隊列。當從該流中讀取數據時,會以先進先出 (FIFO) 的順序將緩存塊從隊列中取出,緩存塊中的數據將返回給調用方。這樣,在流中就要留出兩個位置分別用於下次寫入數據和讀取數據。

但是請注意,並不是每個讀取請求都有必要修改隊列。如果讀取器請求的數據量等於或大於隊列中下一個緩存塊的大小,則該緩存塊可以滿足讀取請求(即使返回的數據少於請求的數據,讀取也可以成功返回)。但如果用戶請求的數據量小於下一個緩存塊的大小,則緩存塊中剩余的數據會被單獨存儲(而不存儲在隊列中),以便這些數據能夠用來滿足下一個讀取請求(參見圖 2 中的 _currentChunk)。如果使用上一個讀取請求後剩余的數據能夠完全滿足下一個讀取請求,則該請求將不修改隊列。

BlockingStream 通過 ManualResetEvent 發出信號通知使用者數據是否可用。如果某個線程嘗試從 BlockingStream 讀取數據,而此時沒有可供讀取的數據,則該線程會因 ManualResetEvent 的值未設置而阻塞在這裡,直到有數據被寫入為止。數據被寫入之後,寫入器會為該事件設置一個值,喚醒使用者並通知它有數據可供讀取。實際上,如果有多個讀取器在等待讀取數據,這些讀取器都將被喚醒,這只需要在 Read 方法中另外添加一些邏輯。

Read 方法的實現就像一個大的循環,它會持續循環直到出現可用於執行讀取請求的數據或寫入器調用 SetEndOfStream(通知流已經沒有可用數據);在這種情況下,繼續阻塞等待數據的讀取器是不明智的,因為這樣做會導致讀取器等待很長時間。上文提及的每種情況可以各用一個 ManualResetEvent 來表示,一個已在前文提到,另一個用來表示寫入是否已經完成。Read 進入循環之後要做的第一件事是等待上述任一事件被設置一個值(使用 WaitHandle.WaitAny)。

BlockingStream 對於我們的流管道十分有用,因為它支持與順序傳送方式中隱式使用的相同的流到流模式,而現在還支持跨線程實現。思路是,我可以編寫幾個方法來代表每一個我需要執行的基於流的操作,如圖 3 中所示。

Figure 3 基於流的操作方法

static void Compress(Stream input, Stream output){
  using (GZipStream compressor = new GZipStream(
      output, CompressionMode.Compress, true))
    CopyStream(input, compressor);
}
static void Encrypt(Stream input, Stream output) {
  RijndaelManaged rijndael = new RijndaelManaged();
  ... // setup crypto keys
  using (ICryptoTransform transform = rijndael.CreateEncryptor())
  using (CryptoStream encryptor = new CryptoStream(
      output, transform, CryptoStreamMode.Write))
    CopyStream(input, encryptor);
}

編寫的每種方法都可以在各自獨立的線程上執行,一個方法的輸出作為輸入通過管道傳送給另一個方法,例如 BlockingStream。最初,第二個操作將被阻塞並在其輸入 BlockingStream 上等待。第一個操作將數據寫入流之後,第二個操作立即被喚醒,檢索剛剛寫入的數據並對其進行處理。我已編寫代碼將此功能加入到 StreamPipeline 類中,如圖 4 所示。

Figure 4 StreamPipeline

public class StreamPipeline : IDisposable
{
  private Action<Stream, Stream>[] _filters;
  private List<BlockingStream> _blockingStreams;
  public StreamPipeline(params Action<Stream, Stream>[] filters)
  {
    if (filters == null) throw new ArgumentNullException("filters");
    if (filters.Length == 0 || Array.IndexOf(filters, null) >= 0)
      throw new ArgumentException("filters");
    _filters = filters;
    _blockingStreams = new List<BlockingStream>(_filters.Length - 1);
    for (int i = 0; i < filters.Length-1; i++)
    {
      _blockingStreams.Add(new BlockingStream());
    }
  }
  public void Run(Stream input, Stream output)
  {
    if (_blockingStreams == null)
      throw new ObjectDisposedException(GetType().Name);
    if (input == null) throw new ArgumentNullException("input");
    if (!input.CanRead) throw new ArgumentException("input");
    if (output == null) throw new ArgumentNullException("output");
    if (!output.CanWrite) throw new ArgumentException("output");
    ThreadStart lastStage = null;
    for (int i = 0; i < _filters.Length; i++)
    {
      Stream stageInput = i == 0 ? input : _blockingStreams[i - 1];
      Stream stageOutput =
        i == _filters.Length - 1 ? output : _blockingStreams[i];
      Action<Stream, Stream> filter = _filters[i];
      ThreadStart stage = delegate
      {
        filter(stageInput, stageOutput);
        if (stageOutput is BlockingStream)
          ((BlockingStream)stageOutput).SetEndOfStream();
      };
      if (i < _filters.Length - 1)
      {
        Thread t = new Thread(stage);
        t.IsBackground = true;
        t.Start();
      }
      else lastStage = stage;
    }
    lastStage();
  }
  public void Dispose()
  {
    if (_blockingStreams != null)
    {
      foreach (BlockingStream stream in _blockingStreams)
        stream.Dispose();
      _blockingStreams = null;
    }
  }
}

StreamPipeline 使用起來非常簡單。構造函數接受一組由委托組成的參數數組,每個參數數組需要一個輸入流和一個輸出流;這些被稱為篩選器。然後,它將創建和存儲一個 BlockingStream,以供在每對篩選器之間使用。請注意,上文所示的 Compress 和 Encrypt 方法符合必要的委托簽名,因此可以用作篩選器(如圖 5 所示)。

圖 5 操作中的 StreamPipeline

Run 方法接受應位於管道開始和結尾處的輸入和輸出流。在本示例中,這可能是代表待壓縮和加密的磁盤數據的 FileStream 和用於存儲返回給磁盤的結果數據的輸出 FileStream:

using (FileStream input = File.OpenRead("inputData.bin"))
using (FileStream output = File.OpenWrite("outputData.bin"))
using (StreamPipeline pipeline = new StreamPipeline(Compress, Encrypt))
{
  pipeline.Run(input, output);
}

從內部看,Run 方法會為管道中的每個階段生成一個委托,該委托使用合適的輸入和輸出流運行相應篩選器。該方法創建新線程來運行每個工作項(最後一個除外),最後一個工作項會在調用 Run 方法的線程上運行。當前線程需要阻塞並等待工作完成,因此重用該線程來完成某些處理是明智的,這樣可以避免浪費其資源。

請注意在本裡我並未使用 ThreadPool。通常,我首先想到的是將工作項排入 ThreadPool,而不是啟動新線程。但在這種情況下,我正在排列的工作會阻塞並等待其他已排列的工作項執行完某些操作。這樣就有可能出現死鎖問題(開發人員使用 .NET Framework 1.x 經常出現此問題,請參閱 msdn.microsoft.com/msdnmag/issues/04/12/NETMatters 提供的示例),因此我為每個操作創建了自定義線程,以避免出現這種問題。

當然,這種方法本身也有一定的缺點。例如,由於創建和終止線程的開銷很高,因此如果篩選器的壽命較短,此處的開銷可能會制約計算在其他方面的開銷。如果您要通過同一 StreamPipeline 運行大量不同的輸入和輸出流,那麼重寫該類的相關部分,保持線程處於活動狀態並在每次運行時重用這些線程可能會比較有利。如果您的應用程序存在限制,則使用 ThreadPool 也可能更合適。

現在,我們已經構建了 StreamPipeline,但關鍵問題是其執行效果如何?畢竟,本練習的關鍵在要充分利用計算機上的兩個內核以縮短計算時間。我使用與之前提及的《戰爭與和平》文本相同的數據在雙核計算機上進行了測試,與順序傳送方式相比,這種順序管道實現方法的運行速度提升了 20-25%。您是不是有些失望?我猜此時您心中會有這樣的疑問:運行速度為什麼沒出現 100% 的提升?是這種實現方法存在一些問題嗎?畢竟現在是在兩個處理器上執行操作,運行速度難道不應該提升一倍嗎?

本專欄開始部分有關封裝信封的示例可能會對您有些誤導。我並不了解您的情況,但是對我而言,將信件折成三折要比將折好的信件裝入信封更加費時。如果是那樣,負責將信件裝入所有信封的乙可能在裝完每個信封之後就閒坐著,等待管道中的甲折好下一封信。因此,盡管我們投入更多人員來解決該問題,但問題的解決並不是與投入的人數成正比的。

壓縮和加密示例也是同樣的道理。我記錄了順序實現方法中進行壓縮和加密各自花費的時間。猜猜結果如何?加密僅用了 20% 的時間。因此,管道實現方法在這種情況下差不多已達到了最大速度優化。

但並不是說這種實現就已經發揮了最佳性能。它可能存在著一些性能問題,這些問題會隨著處理器數量的增加而越來越明顯,最明顯的問題就是您至少要為每個處理器配備一個篩選器,才有機會充分利用計算機的所有處理能力。其他可能的問題還包括,在 BlockingStream 實現中我們可能會用到多個鎖(如果一次只有一個讀取器使用實例,就像 StreamPipeline 使用 BlockingStream 一樣,則不必這樣)。但是鑒於需要編寫的代碼數量相對較少,即便是像本文的壓縮和加密示例中所體現的 20-25% 的性能提升也是非常有價值的。

另請注意,當管道各個階段自身可以並行處理多個數據段時,管道就會非常有效。例如,在封裝信封示例中,如果有多人折疊信件(或者如果乙在沒有裝信工作時幫助甲折疊信件),則整個系統的吞吐量將提高。對於壓縮/加密示例,這類似於上文提及的分塊方法:在管道的壓縮階段對塊進行並行壓縮,然後傳送到加密階段,對壓縮的塊進行並行加密,等等。引入應用程序的並發操作越精細,計算機中所有處理器能夠互相援助從而提高解決方案速度的機會就越大,尤其是隨著技術的發展,我們在通用個人計算機中可以使用的處理器越來越多,這種提升效果越明顯。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved