這次給大家演示一下利用IOCP的在線程間傳遞數據的例子,順便打算講一些細節和注意的地方。
概述:這裡主要使用IOCP的三個API,CreateIoCompletionPort,PostQueuedCompletionStatus,GetQueuedCompletionStatus,第一個是用來創建一個完成端口對象,第二個是向一個端口發送數據,第三個是接受數據,基本上用著三個函數,就可以寫一個使用IOCP的簡單示例。
其中完成端口一個內核對象,所以創建的時候會耗費性能,CPU得切換到內核模式,而且一旦創建了內核對象,我們都要記著要不用的時候顯式的釋放它的句柄,釋放非托管資源的最佳實踐肯定是使用Dispose模式,這個博客園有人講過N次了。而一般要獲取一個內核對象的引用,最好用SafeHandle來引用它,這個類可以幫你管理引用計數,而且用它引用內核對象,代碼更健壯,如果用指針引用內核對象,在創建成功內核對象並復制給指針這個時間段,如果拋了ThreadAbortException,這個內核對象就洩漏了,而用SafeHandle去應用內核對象就不會在賦值的時候發生ThreadAbortException。另外SafeHandle類繼承自CriticalFinalizerObject類,並實現了IDispose接口,CLR對CriticalFinalizerObject及其子類有特殊照顧,比如說在編譯的時候優先編譯,在調用非CriticalFinalizerObject類的Finalize方法後再調用CriticalFinalizerObject類的Finalize類的Finalize方法等。在win32裡,一般一個句柄是-1或者0的時候表示這個句柄是無效的,所以.net有一個SafeHandle的派生類SafeHandleZeroOrMinusOneIsInvalid ,但是這個類是一個抽象類,你要引用自己使用的內核對象或者非托管對象,要從這個類派生一個類並重寫Relseas方法。另外在.net框架裡它有兩個實現幾乎一模一樣的子類,一個是SafeFileHandle一個是SafeWaitHandle,前者表示文件句柄,後者表示等待句柄,我們這裡為了方便就直接用SafeFileHandle來引用完成端口對象了。
CreateIoCompletionPort函數的原型如下 [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
FileHandle參數表示要綁定在完成端口上的句柄,比如說一個已經accept的socket句柄。
ExistingCompletionPort參數表示一個已有的完成端口句柄,第一次創建完成端口的時候顯然隨便傳個值就行,所以這個參數直接定義成IntPtr類型了。當你創建了工作線程來為I/O請求服務的時候,才會把句柄和完成端口關聯在一起,而之前第一次創建完成端口的時候這個參數傳一個zero指針就O了,而FileHandle參數傳一個-1的指針就行了。
CompletionKey是完成鍵的意思,它可以是任意想傳遞給工作線程的數據,學名叫做單句柄數據,就是說跟隨FileHandle參數走的一些狀態數據,一般在socket的iocp程序裡是把socket傳進去,以便在工作線程裡拿到這個socket句柄,在收到異步操作完成的通知及處理後繼續進行下一個異步操作的投遞,如發送和接受數據等。
NumberOfConcurrentThreads參數表示在一個完成端口上同時允許執行的最大線程數量。如果傳0,就是說你有幾個CPU,就是允許最大有幾個線程,這也是最理想情況,因為一個CPU一個線程可以防止線程上下文切換。關於這個值要和創建工作線程的數量的關系,大家要理解清楚,不一定CPU有多少個,你的工作線程就創建多少個。因為你的工作線程有時候會阻塞或者等待,而如果你正好創建了CPU個數個工作線程,有一個等待的話,因為你分配了同時最多有CPU個數多個最大IOCP線程,這時候就不能效率最大化了。所以一般工作線程創建的要比CPU個數多一些,除非你保證你的工作線程不會阻塞。
PostQueuedCompletionStatus函數原型如下 [DllImport("Kernel32", CharSet = CharSet.Auto)] private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped); 該方法用於給完成端口投遞自定義信息,一般情況下如果把某個句柄和完成端口綁定後,當有數據收發操作完成時會自動同時工作線程,工作線程裡的GetQueuedCompletionStatus就不會阻塞,而繼續往下走,來進行接收到IO操作完成通知的流程。而有時候我們需要手工向工作者線程投遞一些消息,比如說我們主線程知道所有的socket句柄都關閉了,工作線程可以退出了,我們就可以給工作線程發一個自定義數據,工作線程收到後判斷是否是退出指令,然後退出。
CompletionPort參數表示向哪個完成端口對象投遞信息,在這個完成端口上等待消息的工作線程就會收到消息了。 dwNumberOfBytesTransferred表示你投遞的數據有多大,我們一般投遞的是一個對象的指針,在32位系統裡,int指針就是4個字節了,直接寫4就O了,要不就用sizeof你傳的數據,如sizeof(IntPtr)。
dwCompletionKey同CreateIoCompletionPort的解釋,是單句柄數據,本示例用不到,不細說,直接用IntPtr.Zero填充了事。
lpOverlapped參數,本意是一個win32的overlapped結構的指針,本示例中不用,所以不詳細講。它叫單IO數據,是相對單據並拘束CompletionKey來講的,前者是一個句柄的每次IO操作的上下文,比如單詞IO操作的數據、操作類型等,後者是整個句柄的上下文。但這裡我們表示你要投遞的數據,可以是任何類型的數據(誰讓它是個指針呢,所以傳啥都行),值得注意的一點就是,這個數據傳遞到工作線程的時候,中間這個數據走的是非托管代碼。所以不能直接傳一個引用進去,這裡要使用到GCHandle類。先大致介紹一下這個類吧。它有個靜態方法Alloc來給把一個對象在GC句柄表裡注冊,GC句柄表示CLR為沒個應用程序域提供的一個表,它允許你來監視和管理對象的生命周期,你可以往裡加一個對象的引用,也可以從裡面移除一個對象,往裡加對象的時候,還可以指定一個標記來表示我們希望如何監視和控制這個對象。而加入一個條目的操作就是GCHandle的Alloc對象,它有兩個參數,第一個參數是對象,第二參數是GCHandleType類型的枚舉,第二個參數表示我們如何來監視和控制這個對象的生命周期。當這個參數是GCHandleType.Normal時,表示我們告訴垃圾收集器,及時托管代碼裡沒有該對象的根,也不要回收該對象,但垃圾收集器可以移動它,一般我們向非托管代碼傳遞一個對象,而又從非托管代碼傳遞回來的時候用這個類型非常好,它不會讓垃圾收集器在非托管代碼返回托管代碼的時候回收掉該對象,還不怎麼影響GC的性能,因為GC還可以移動它。dwCompletionKey就是我們在托管-非托管-托管之間傳遞的一個很典型的場景。所以這裡用它,另外還有GCHandleType.Pinned,它和GCHandleType.Normal不同的一點就是GC除了在沒有根的時候不能回收這個對象外,還不能移動它,應用場景是給非托管代碼傳遞一個byte[]的buffer,讓托管代碼去填充,如果用GCHandleType.Normal有可能在非托管代碼返回托管代碼的時候寫錯內存位置,因為有可能GC移動了這個對象的內存地址。關於根、GC原理,大家可以參考相關資料。另外在你的數據從非托管代碼傳遞會托管代碼後,要調用GCHandle的實例方法free來在GC句柄表裡移除該對象,這時候你的托管代碼還有個該對象的引用,也就是根,GC也不會給你回收的,當你用完了後,GC就給你回收了。GCHandle的Target屬性用來訪問GCHandle指向的對象。其它兩個GCHandleType的成員是關於弱引用的,和本文關系不大,就不介紹了。
GetQueuedCompletionStatus原型如下 [DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)] public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort, out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey, out IntPtr lpOverlapped, uint dwMilliseconds); 前幾個參數和PostQueuedCompletionStatus差不多, CompletionPort表示在哪個完成端口上等待PostQueuedCompletionStatus發來的消息,或者IO操作完成的通知,
lpNumberOfBytesTransferred表示收到數據的大小,這個大小不是說CompletionKey的大小,而是在單次I/O操作完成後(WSASend或者WSAReceve),實際傳輸的字節數,我在這裡理解的不是很透徹,我覺得如果是接受PostQueuedCompletionStatus的消息的話,應該是收到lpOverlapped的大小,因為它才是單IO數據嘛。
lpCompletionKey用來接收單據並數據,我們沒傳遞啥,後來也沒用,在socket程序裡,一般接socket句柄。
lpOverlapped用來接收單IO數據,或者我們的自定義消息。
dwMilliseconds表示等待一個自定義消息或者IO完成通知消息在完成端口上出現的時間,傳遞INIFINITE(0xffffffff)表示無限等待下去。
好了,API大概介紹這麼多,下面介紹代碼 1、主線程創建一個完成端口對象,不和任何句柄綁定,前幾個參數都寫0,NumberOfConcurrentThreads參數我們寫1,因為我們的示例就一個工作線程。 2、創建一個工作線程,把第一步創建的完成端口傳進去 3、創建兩個單IO數據,分別發投遞給第一步創建的完成端口 4、在工作線程裡執行一個死循環,循環在傳遞進來的完成端口上等待消息,沒有消息的時候GetQueuedCompletionStatus處於休息狀態,有消息來的時候把指針轉換成對象,然後輸出 5、如果收到退出指令,就退出循環,從而結束工作者線程。下面是完整代碼,需要打開不安全代碼的編譯選項。
using System;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;
[StructLayout(LayoutKind.Sequential)]
class PER_IO_DATA
{
public string Data;
}
public class IOCPApiTest
{
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
out uint lpNumberOfBytesTransferred, out IntPtr lpCompletionKey,
out IntPtr lpOverlapped, uint dwMilliseconds);
[DllImport("Kernel32", CharSet = CharSet.Auto)]
private static extern bool PostQueuedCompletionStatus(SafeFileHandle CompletionPort, uint dwNumberOfBytesTransferred, IntPtr dwCompletionKey, IntPtr lpOverlapped);
public static unsafe void TestIOCPApi()
{
var CompletionPort = CreateIoCompletionPort(new IntPtr(-1), IntPtr.Zero, IntPtr.Zero, 1);
if(CompletionPort.IsInvalid)
{
Console.WriteLine("CreateIoCompletionPort 出錯:{0}",Marshal.GetLastWin32Error());
}
var thread = new Thread(ThreadProc);
thread.Start(CompletionPort);
var PerIOData = new PER_IO_DATA() ;
var gch = GCHandle.Alloc(PerIOData);
PerIOData.Data = "hi,我是蛙蛙王子,你是誰?";
Console.WriteLine("{0}-主線程發送數據",Thread.CurrentThread.GetHashCode());
PostQueuedCompletionStatus(CompletionPort, (uint)sizeof(IntPtr), IntPtr.Zero, (IntPtr)gch);
var PerIOData2 = new PER_IO_DATA();
var gch2 = GCHandle.Alloc(PerIOData2);
PerIOData2.Data = "關閉工作線程吧";
Console.WriteLine("{0}-主線程發送數據", Thread.CurrentThread.GetHashCode());
PostQueuedCompletionStatus(CompletionPort, 4, IntPtr.Zero, (IntPtr)gch2);
Console.WriteLine("主線程執行完畢");
Console.ReadKey();
}
static void ThreadProc(object CompletionPortID)
{
var CompletionPort = (SafeFileHandle)CompletionPortID;
while (true)
{
uint BytesTransferred;
IntPtr PerHandleData;
IntPtr lpOverlapped;
Console.WriteLine("{0}-工作線程准備接受數據",Thread.CurrentThread.GetHashCode());
GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
out PerHandleData, out lpOverlapped, 0xffffffff);
if(BytesTransferred <= 0)
continue;
GCHandle gch = GCHandle.FromIntPtr(lpOverlapped);
var per_HANDLE_DATA = (PER_IO_DATA)gch.Target;
Console.WriteLine("{0}-工作線程收到數據:{1}", Thread.CurrentThread.GetHashCode(), per_HANDLE_DATA.Data);
gch.Free();
if (per_HANDLE_DATA.Data != "關閉工作線程吧") continue;
Console.WriteLine("收到退出指令,正在退出");
CompletionPort.Dispose();
break;
}
}
public static int Main(String[] args)
{
TestIOCPApi();
return 0;
}
}
開始我也一樣迷惑怎樣傳送對象引用,後來經過研究發現可以這樣解決完成端口傳送對象的問題。使用以下方式來聲明api:
[DllImport("Kernel32")]
private static extern bool PostQueuedCompletionStatus(UInt32 completionPort, int numberOfBytesTransferred,IntPtr completionKey, IntPtr overlapped);
[DllImport("Kernel32")]
private static extern bool GetQueuedCompletionStatus(UInt32 completionPort, ref int numberOfBytes,ref IntPtr completionKey, ref IntPtr overlapped,UInt32 milliseconds);
使用GetQueuedCompletionStatus和PostQueuedCompletionStatus的時候,用GCHandle對象來取得對象的指針,再傳送給它們作為參數就可以了。
int i=0;
object Value=new object();
GCHandle gcValue=GCHandle.Alloc(Value);
GCHandle gcKey=GCHandle.Alloc(i);
// Post an event into the IOCP Thread Pool
PostQueuedCompletionStatus(GetHandle, 4, (IntPtr)gcKey, (IntPtr)gcValue);
其它的按這個思路去解決就可以了。