本文及程序不是介紹WCF怎麼用,而是研究如何在WCF通信時的通道兩端自動進行數據壓縮和解壓縮, 從而增加分布式數據傳輸速度。
而且,這個過程是完全透明的,用戶及編程人員根本不需要知道它的存在,相當於HOOK在兩端的一個 組件。可以使用中網絡帶寬較小的網絡環境中。當WCF在兩個實體間通訊的時候,便自動創建一個信息通道轉接通訊,這個消息包含數 據請求和相應。WCF使用特殊的編碼器將請求和響應數據轉換成一系列的字節。
我所帶的項目裡遇到大文件分布式傳輸問題,經過分析考慮采用WCF通道壓縮技術來解決此問題。執行 這樣的編碼是需要傳輸大文件(XML格式)由一台機器到另一台機器傳輸,而連接有速度限制。我不用寫一個特殊的函數邊壓縮 和邊解壓,而是配置傳輸通道可以做到這一點,這種方式壓縮可重復使用的任何契約。我發現自己編寫的消息編碼器是最簡單的方式 來實現功能,真正的問題是如何編寫信息編碼器,在MSDN上沒有找到任何關於此應用的實例。消息契約編碼器的想法是Hook連接兩端發 送和接收信息的渠道。程序是采用Microsoft Visual Studio 2008 WCF設計。
圖1 WCF消息通道編碼過程時序圖
發送方:代碼中加入方法,該方法及其參數的序列化成SOAP消息,消息編碼序列化的信息將成為一個 字節數組,字節數組發送傳輸層。
接收方:傳輸層接收字節數組,消息編碼器並行化字節數組到一條消息,該方法及其參數並行化到一 個SOAP消息,方法是被監聽的。
當加入壓縮信息編碼器,該方法要求有一點改變:
發送方:代碼中加入方法,該方法及其參數的序列化成SOAP消息,消息契約編碼讓其內在的信息編碼 序列的信息成為一個字節數組,
消息契約編碼壓縮的字節數組第二個字節數組,字節數組發送傳輸層。
接收方:傳輸層接收字節數組,消息契約編碼的字節數組解壓到第二字節數組,消息契約編碼讓其內 在的信息編碼化的第二個字節數
組消息,該方法及其參並行化到SOAP消息,方法是被監聽的。
這個消息契約編碼分為幾個類:
CompactMessageEncoder //這個類提供了信息編碼實施。
CompactMessageEncoderFactory //這個類是負責提供契約信息編碼實例。
CompactMessageEncodingBindingElement //這個類負責通道的協議約束規范。
CompactMessageEncodingElement //這個類使信息編碼通過增加應用程序配置文件。
圖2 消息通道編碼器靜態類圖
壓縮方法:契約消息編碼器是使用gzip壓縮的NET Framework范圍內執行的,是調用 System.IO.Compression.GZipStream名字空間類中。
加入引用CompactMessageEncoder.dll,修改app.config文件引用,應用程序必須要在客戶端和服務器 端。
壓縮緩沖代碼:
private static ArraySegment<byte> CompressBuffer(ArraySegment<byte> buffer, BufferManager bufferManager, int messageOffset) { // Create a memory stream for the final message MemoryStream memoryStream = new MemoryStream(); // Copy the bytes that should not be compressed into the stream memoryStream.Write(buffer.Array, 0, messageOffset); // Compress the message into the stream using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Compress, true)) { gzStream.Write(buffer.Array, messageOffset, buffer.Count); } // Convert the stream into a bytes array byte[] compressedBytes = memoryStream.ToArray(); // Allocate a new buffer to hold the new bytes array byte[] bufferedBytes = bufferManager.TakeBuffer(compressedBytes.Length); // Copy the compressed data into the allocated buffer Array.Copy(compressedBytes, 0, bufferedBytes, 0, compressedBytes.Length); // Release the original buffer we got as an argument bufferManager.ReturnBuffer(buffer.Array); // Create a new ArraySegment that points to the new message buffer ArraySegment<byte> byteArray = new ArraySegment<byte>(bufferedBytes, messageOffset, compressedBytes.Length - messageOffset); return byteArray; }
解壓緩沖代碼:
private static ArraySegment<byte> DecompressBuffer(ArraySegment<byte> buffer, BufferManager bufferManager) { // Create a new memory stream, and copy into it the buffer to decompress MemoryStream memoryStream = new MemoryStream(buffer.Array, buffer.Offset, buffer.Count); // Create a memory stream to store the decompressed data MemoryStream decompressedStream = new MemoryStream(); // The totalRead stores the number of decompressed bytes int totalRead = 0; int blockSize = 1024; // Allocate a temporary buffer to use with the decompression byte[] tempBuffer = bufferManager.TakeBuffer(blockSize); // Uncompress the compressed data using (GZipStream gzStream = new GZipStream(memoryStream, CompressionMode.Decompress)) { while (true) { // Read from the compressed data stream int bytesRead = gzStream.Read(tempBuffer, 0, blockSize); if (bytesRead == 0) break; // Write to the decompressed data stream decompressedStream.Write(tempBuffer, 0, bytesRead); totalRead += bytesRead; } } // Release the temporary buffer bufferManager.ReturnBuffer(tempBuffer); // Convert the decompressed data stream into bytes array byte[] decompressedBytes = decompressedStream.ToArray(); // Allocate a new buffer to store the message byte[] bufferManagerBuffer = bufferManager.TakeBuffer(decompressedBytes.Length + buffer.Offset); // Copy the bytes that comes before the compressed message in the buffer argument Array.Copy(buffer.Array, 0, bufferManagerBuffer, 0, buffer.Offset); // Copy the decompressed data Array.Copy(decompressedBytes, 0, bufferManagerBuffer, buffer.Offset, decompressedBytes.Length); // Create a new ArraySegment that points to the new message buffer ArraySegment<byte> byteArray = new ArraySegment<byte> (bufferManagerBuffer, buffer.Offset, decompressedBytes.Length); // Release the original message buffer bufferManager.ReturnBuffer(buffer.Array); return byteArray; }
改變服務端配置
加入消息契約編碼器之前app.config的實例:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <services> <service name="Server.MyService"> <endpoint address="net.tcp://localhost:1234/MyService" binding="netTcpBinding" contract="Server.IMyService" /> </service> </services> </system.serviceModel> </configuration>
加入消息契約編碼器後app.config的例子:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <services> <service name="Server.MyService"> <!-- Set the binding of the endpoint to customBinding --> <endpoint address="net.tcp://localhost:1234/MyService" binding="customBinding" contract="Server.IMyService" /> </service> </services> <!-- Defines a new customBinding that contains the compactMessageEncoding --> <bindings> <customBinding> <binding name="compactBinding"> <compactMessageEncoding> <!-- Defines the inner message encoder as binary encoder --> <binaryMessageEncoding /> </compactMessageEncoding> <tcpTransport /> </binding> </customBinding> </bindings> <!-- Adds the extension dll so the WCF can find the compactMessageEncoding --> <extensions> <bindingElementExtensions> <add name="compactMessageEncoding" type="Amib.WCF.CompactMessageEncodingElement, CompactMessageEncoder, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" /> </bindingElementExtensions> </extensions> </system.serviceModel> </configuration>
客戶端配置變化
加入消息契約編碼器之前app.config的實例:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <client> <endpoint address="net.tcp://localhost:1234/MyService" binding="customBinding" bindingConfiguration="compactBinding" contract="Client.IMyService" /> </client> </system.serviceModel> </configuration>
加入消息契約編碼器後app.config的例子:
<?xml version="1.0" encoding="utf-8" ?> <configuration> <system.serviceModel> <client> <endpoint address="net.tcp://localhost:1234/MyService" binding="customBinding" bindingConfiguration="compactBinding" contract="Client.IMyService" /> </client> <!-- Defines a new customBinding that contains the compactMessageEncoding -- >
<bindings> <customBinding> <binding name="compactBinding"> <compactMessageEncoding> <binaryMessageEncoding/> </compactMessageEncoding> <tcpTransport /> </binding> </customBinding> </bindings> <!-- Adds the extension dll so the WCF can find the compactMessageEncoding --> <extensions> <bindingElementExtensions> <add name="compactMessageEncoding" type="Amib.WCF.CompactMessageEncodingElement, CompactMessageEncoder, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null" /> </bindingElementExtensions> </extensions> </system.serviceModel> </configuration>
這種壓縮方法,消息堵塞的幾率很小。使用CompactMessageEncoder在同一台機器運行客戶端和服務器 上可能會降低效率。
本文配套源碼:http://www.bianceng.net/dotnet/201212/785.htm