這裡用到了一些技術點,比如平台調用、反射,多線程等,當然還有iocp和winsock的api,及 GCHandle,SafeHandle,Marshal類的使用等,不過相當多的東西,我上篇帖子講的都很細了,如果對 winsock api不了解可以查閱MSDN。也沒什麼技術難點,說幾個細節的地方吧。
1、.net自帶的System.Threading.NativeOverlapped類型是完全按照win32的Overlapped結構實現的, 因為我們在WSASend和WSAReceive的時候想要傳遞更多的數據,而不只是一個重疊結構,所以我自己定義 了一個WaOverlapped,在原有結構的末尾加了一個指針,指向一個自定義類的GC句柄,這樣在工作線程裡 就可以拿到自定義的單IO數據了,這個是我想了N種辦法不行後的一個可行的辦法。
2、注意GCHandle在取到數據後不用的話記著Free掉,否則就有可能造成內存洩漏。
3、如果調用WSASend或者WSAReceive返回6的話,多半是你准備的單IO數據不對,6表示無效的句柄。
4、如果傳遞給WSASend或者WSAReceive的Overlapped沒pin住,會拋異常的,等不到 GetLastWin32Error,所以用GCHandle.Alloc(PerIoData.Overlapped, GCHandleType.Pinned)把它pin住 。
5、這個類還沒有進行各方面的優化,其中的單IO數據,socket等都可以做成對象池來重用,Accept還 可以替換成AcceptEx來用一個現成的Socket來接受新的連接,而不是自動創建一個新的,還有緩沖區可以 做成環狀的,關於性能方面的優化,下次有機會再給大家做實驗。
完整代碼如下,windows2008打開不安全代碼進行編譯,然後可以用telnet進行測試。
using System;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.ConstrainedExecution;
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Win32.SafeHandles;
namespace WawaSocket.Net.Iocp
{
用IOCP和winsock api實現一個echo服務器#region 用IOCP和winsock api實現一個echo服務器
class IocpTest
{
private static readonly IntPtr INVALID_HANDLE_VALUE = new IntPtr(-1); //無 效句柄
const int PORT = 5150; //要監聽的端口
const int DATA_BUFSIZE = 8192; //默認緩沖區
const int ERROR_IO_PENDING = 997; //表示數據正在接受或者發送中
const uint INIFINITE = 0xffffffff; //表示等待無限時長
private static readonly Logger _logger = Logger.GetLogger(typeof (IocpTest));
單IO數據#region 單IO數據
[StructLayout(LayoutKind.Sequential)]
class PerIoOperationData
{
public WaOverlapped Overlapped;
public WSABuffer DataBuf;
public readonly byte[] Buffer = new byte[DATA_BUFSIZE];
public uint BytesSEND;
public uint BytesRECV;
}
#endregion
單句柄數據#region 單句柄數據
[StructLayout(LayoutKind.Sequential)]
class PerHandleData
{
public SafeSocketHandle Socket;
}
#endregion
public static void Run()
{
WSAData wsaData;
SocketError Ret;
初始化套接字#region 初始化套接字
_logger.Log("初始化socket");
if ((Ret = Win32Api.WSAStartup(0x0202, out wsaData)) != SocketError.Success)
{
_logger.Error("WSAStartup failed with error {0}\n", Ret);
return;
}
#endregion
創建一個完成端口內核對象#region 創建一個完成端口內核對象
_logger.Log("創建完成端口");
// Setup an I/O completion port.
SafeFileHandle CompletionPort = Win32Api.CreateIoCompletionPort (INVALID_HANDLE_VALUE, IntPtr.Zero, IntPtr.Zero, 0);
if (CompletionPort.IsInvalid)
{
_logger.Error("CreateIoCompletionPort failed with error: {0}\n", Marshal.GetLastWin32Error());
Marshal.ThrowExceptionForHR(Marshal.GetLastWin32Error ());
return;
}
#endregion
創建工作線程#region 創建工作線程
int processorCount = Environment.ProcessorCount;
_logger.Log("創建{0}個工作線程", processorCount);
for (int i = 0; i < processorCount; i++)
{
// Create a server worker thread and pass the completion port to the thread.
var thread = new Thread(ThreadProc);
thread.Start(CompletionPort);
}
#endregion
創建監聽用的套接字#region 創建監聽用的套接字
_logger.Log("創建監聽套接字");
// Create a listening socket
SafeSocketHandle Listen = Win32Api.WSASocket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp, IntPtr.Zero, 0, SocketConstructorFlags.WSA_FLAG_OVERLAPPED);
if (Listen.IsInvalid)
{
Listen.SetHandleAsInvalid();
_logger.Error("WSASocket() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
將套接字與本地端口綁定#region 將套接字與本地端口綁定
IPEndPoint InternetAddr = new IPEndPoint(IPAddress.Any, PORT);
SocketAddress socketAddress = InternetAddr.Serialize();
byte[] adress_buffer;
int adress_size;
_logger.Log("進行套接字綁定");
if (!DoBind(Listen, socketAddress, out adress_buffer, out adress_size))
{
_logger.Error("bind() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
開始監聽端口#region 開始監聽端口
_logger.Log("開始監聽:{0}-{1}", InternetAddr.Address, InternetAddr.Port);
// Prepare socket for listening
if (Win32Api.listen(Listen, 5) == SocketError.SocketError)
{
_logger.Error("listen() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR(Win32Api.WSAGetLastError ());
return;
}
#endregion
起一個循環來接受新連接#region 起一個循環來接受新連接
// Accept connections and assign to the completion port.
while (true)
unsafe
{
接受新連接#region 接受新連接
_logger.Log("開始接受入站連接");
SafeSocketHandle Accept = Win32Api.accept (Listen.DangerousGetHandle(), adress_buffer, ref adress_size);
if (Accept.IsInvalid)
{
_logger.Error("WSAAccept() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
}
_logger.Log("有新連接進入:{0}", Accept.GetHashCode ());
#endregion
創建單句柄數據#region 創建單句柄數據
// Create a socket information structure to associate with the socket
PerHandleData PerHandleData = new PerHandleData ();
GCHandle gch_PerHandleData = GCHandle.Alloc (PerHandleData);
// Associate the accepted socket with the original completion port.
PerHandleData.Socket = Accept;
#endregion
把新接受的套接字與完成端口綁定#region 把新接受的套 接字與完成端口綁定
SafeFileHandle iocp = Win32Api.CreateIoCompletionPort(Accept.DangerousGetHandle(),
CompletionPort.DangerousGetHandle(),
GCHandle.ToIntPtr(gch_PerHandleData), 0);
if (iocp == null)
{
_logger.Error("CreateIoCompletionPort failed with error {0}\n", Marshal.GetLastWin32Error());
Marshal.ThrowExceptionForHR (Marshal.GetLastWin32Error());
return;
}
#endregion
准備單IO數據#region 准備單IO數據
// Create per I/O socket information structure to associate with the
// WSARecv call below.
PerIoOperationData PerIoData = new PerIoOperationData();
GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
GCHandle gcHandle = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
PerIoData.BytesSEND = 0;
PerIoData.BytesRECV = 0;
PerIoData.DataBuf.Length = DATA_BUFSIZE;
PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(PerIoData.Buffer, 0);
#endregion
開始投遞異步接受數據的請求#region 開始投遞異步接受 數據的請求
SocketFlags Flags = SocketFlags.None;
_logger.Log("開始異步接受數據");
int RecvBytes;
SocketError error = Win32Api.WSARecv(Accept, ref PerIoData.DataBuf,
1, out RecvBytes, ref Flags, gcHandle.AddrOfPinnedObject(),
IntPtr.Zero);
if (error == SocketError.SocketError)
{
if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
{
_logger.Error("WSARecv() failed with error {0}\n", Win32Api.WSAGetLastError());
Marshal.ThrowExceptionForHR (Win32Api.WSAGetLastError());
//其實在主線程退出之前都應該用 PostQueuedCompletionStatus通知工作線程退出
return;
}
}
#endregion
}
#endregion
}
把一個套接字綁定在一個端口上的工具方法#region 把一個套接字綁定在一個端口上的工具方法
private static bool DoBind(SafeSocketHandle Listen, SocketAddress address, out byte[] buffer, out int size)
{
FieldInfo socketAddress_m_Buffer = typeof(SocketAddress).GetField ("m_Buffer",
BindingFlags.Instance | BindingFlags.NonPublic);
FieldInfo socketAddress_m_Size = typeof(SocketAddress).GetField ("m_Size",
BindingFlags.Instance | BindingFlags.NonPublic);
var m_buffer = (byte[])socketAddress_m_Buffer.GetValue (address);
var m_Size = (int)socketAddress_m_Size.GetValue(address);
buffer = m_buffer;
size = m_Size;
if (Win32Api.bind(Listen, m_buffer, m_Size) != SocketError.Success)
{
return false;
}
return true;
}
#endregion
工作線程#region 工作線程
static unsafe void ThreadProc(object CompletionPortID)
{
var CompletionPort = (SafeFileHandle)CompletionPortID; //接受通知的 完成端口
SocketFlags Flags;
IntPtr intptr_per_io_data, intptr_per_handle_data; //單句柄數據,單 實例數據的指針
GCHandle gcHandle_per_io_data, gcHandle_per_handle_data;//單句柄數 據,單實例數據的gc句柄
uint BytesTransferred; //接受或發送的數據
PerHandleData PerHandleData; //單據並數據
PerIoOperationData PerIoData; //單IO數據
int SendBytes; //發送出的字節
int RecvBytes; //接受到的字節
在循環裡接受和發送數據#region 在循環裡接受和發送數據
while (true)
{
在完成端口上等消息#region 在完成端口上等消息
if (!Win32Api.GetQueuedCompletionStatus(CompletionPort, out BytesTransferred,
out intptr_per_handle_data, out intptr_per_io_data, INIFINITE))
{
_logger.Error("GetQueuedCompletionStatus failed with error {0}\n",
Marshal.GetLastWin32Error());
return;
}
#endregion
拿到單據並數據#region 拿到單據並數據
gcHandle_per_handle_data = GCHandle.FromIntPtr (intptr_per_handle_data);
PerHandleData = (PerHandleData) gcHandle_per_handle_data.Target;
#endregion
拿到單IO數據#region 拿到單IO數據
WaOverlapped o = new WaOverlapped();
Marshal.PtrToStructure(intptr_per_io_data, o);
gcHandle_per_io_data = GCHandle.FromIntPtr(o.State);
PerIoData = (PerIoOperationData) gcHandle_per_io_data.Target;
#endregion
判斷是否為斷開請求#region 判斷是否為斷開請求
if (BytesTransferred == 0)
{
_logger.Log("斷開連接 {0}", PerHandleData.Socket.GetHashCode());
PerHandleData.Socket.Close();
gcHandle_per_handle_data.Free();
gcHandle_per_io_data.Free();
continue;
}
#endregion
根據異步操作的類型來更新單IO數據#region 根據異步操作的類型 來更新單IO數據
// Check to see if the BytesRECV field equals zero. If this is so, then
// this means a WSARecv call just completed so update the BytesRECV field
// with the BytesTransferred value from the completed WSARecv() call.
if (PerIoData.BytesRECV == 0)
{
PerIoData.BytesRECV = BytesTransferred;
PerIoData.BytesSEND = 0;
}
else
{
PerIoData.BytesSEND += BytesTransferred;
}
#endregion
try
{
if (PerIoData.BytesRECV > PerIoData.BytesSEND)
{
如果收到消息就原封不動發給發送者#region 如 果收到消息就原封不動發給發送者
_logger.Log("開始異步發送數據:{0}-{1}", PerHandleData.Socket.GetHashCode(),
PerIoData.Overlapped.GetHashCode ());
更新單IO數據#region 更新單IO數據
// Post another WSASend() request.
// Since WSASend() is not gauranteed to send all of the bytes requested,
// continue posting WSASend() calls until all received bytes are sent.
GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
GCHandle gchOverlapped = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(
PerIoData.Buffer, (int) PerIoData.BytesSEND);
PerIoData.DataBuf.Length = PerIoData.BytesRECV - PerIoData.BytesSEND;
#endregion
投遞異步發送數據請求#region 投遞異步發送數 據請求
SocketError error = Win32Api.WSASend (PerHandleData.Socket, ref PerIoData.DataBuf,
1, out SendBytes, 0, gchOverlapped.AddrOfPinnedObject(), IntPtr.Zero);
if (error == SocketError.SocketError)
{
if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
{
_logger.Error("WSASend() failed with error {0}", Win32Api.WSAGetLastError());
return;
}
}
#endregion
#endregion
}
else
{
如果沒有需要發送的數據,就投遞一個異步接受 數據請求#region 如果沒有需要發送的數據,就投遞一個異步接受數據請求
_logger.Log("開始異步接受數據:{0}-{1}", PerHandleData.Socket.GetHashCode(),
PerIoData.Overlapped.GetHashCode());
更新單IO數據#region 更新單IO數據
PerIoData.BytesRECV = 0;
// Now that there are no more bytes to send post another WSARecv() request.
Flags = SocketFlags.None;
GCHandle gchPerIoData = GCHandle.Alloc (PerIoData);
PerIoData.Overlapped = new WaOverlapped { State = ((IntPtr)gchPerIoData) };
GCHandle gchOverlapped = GCHandle.Alloc (PerIoData.Overlapped, GCHandleType.Pinned);
PerIoData.DataBuf.Length = DATA_BUFSIZE;
PerIoData.DataBuf.Pointer = Marshal.UnsafeAddrOfPinnedArrayElement(PerIoData.Buffer, 0);
#endregion
投遞異步接受請求#region 投遞異步接受請求
SocketError error = Win32Api.WSARecv (PerHandleData.Socket, ref PerIoData.DataBuf,
1, out RecvBytes, ref Flags, gchOverlapped.AddrOfPinnedObject(),
IntPtr.Zero);
if (error == SocketError.SocketError)
{
if (Win32Api.WSAGetLastError() != ERROR_IO_PENDING)
{
_logger.Error("WSARecv() failed with error{0}", Win32Api.WSAGetLastError());
return;
}
}
#endregion
#endregion
}
}
finally
{
if (gcHandle_per_handle_data.IsAllocated)
gcHandle_per_io_data.Free();
}
}
#endregion
}
#endregion
}
#endregion
封裝原生的socket對象#region 封裝原生的socket對象
public class SafeSocketHandle : SafeHandleMinusOneIsInvalid
{
private Logger _logger = Logger.GetLogger(typeof(SafeSocketHandle));
public SafeSocketHandle()
: base(true)
{
}
public SafeSocketHandle(bool ownsHandle)
: base(ownsHandle)
{
}
protected override bool ReleaseHandle()
{
if (Win32Api.closesocket(base.handle) == SocketError.SocketError)
{
_logger.Error("closesocket() failed with error {0}\n", Win32Api.WSAGetLastError());
}
return true;
}
}
#endregion
日志類#region 日志類
class Logger
{
public static Logger GetLogger(Type type)
{
return new Logger();
}
public void Log(object o)
{
Console.WriteLine(o);
}
public void Log(string format, params object[] objects)
{
Console.WriteLine(format, objects);
}
public void Error(object o)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(o);
Console.ForegroundColor = ConsoleColor.White;
}
public void Error(string format, params object[] objects)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(format, objects);
Console.ForegroundColor = ConsoleColor.White;
}
}
#endregion
win32 structs#region win32 structs
[StructLayout(LayoutKind.Sequential)]
public class WaOverlapped
{
public IntPtr InternalLow;
public IntPtr InternalHigh;
public int OffsetLow;
public int OffsetHigh;
public IntPtr EventHandle;
public IntPtr State;
}
[StructLayout(LayoutKind.Sequential)]
internal struct WSABuffer
{
internal uint Length;
internal IntPtr Pointer;
}
[StructLayout(LayoutKind.Sequential)]
internal struct WSAData
{
internal short wVersion;
internal short wHighVersion;
[MarshalAs(UnmanagedType.ByValTStr, SizeConst = 0x101)]
internal string szDescription;
[MarshalAs(UnmanagedType.ByValTStr, SizeConst = 0x81)]
internal string szSystemStatus;
internal short iMaxSockets;
internal short iMaxUdpDg;
internal IntPtr lpVendorInfo;
}
[Flags]
internal enum SocketConstructorFlags
{
WSA_FLAG_MULTIPOINT_C_LEAF = 4,
WSA_FLAG_MULTIPOINT_C_ROOT = 2,
WSA_FLAG_MULTIPOINT_D_LEAF = 0x10,
WSA_FLAG_MULTIPOINT_D_ROOT = 8,
WSA_FLAG_OVERLAPPED = 1
}
#endregion
封裝winsock和iocp的相關API原型#region 封裝winsock和iocp的相關API原型
class Win32Api
{
[DllImport("ws2_32.dll", CharSet = CharSet.Ansi, SetLastError = true)]
internal static extern SocketError WSAStartup([In] short wVersionRequested, out WSAData lpWSAData);
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern SafeFileHandle CreateIoCompletionPort(IntPtr FileHandle, IntPtr ExistingCompletionPort, IntPtr CompletionKey, uint NumberOfConcurrentThreads);
[DllImport("ws2_32.dll", CharSet = CharSet.Auto, SetLastError = true)]
internal static extern SafeSocketHandle WSASocket([In] AddressFamily addressFamily, [In] SocketType socketType, [In] ProtocolType protocolType, [In] IntPtr protocolInfo, [In] uint group, [In] SocketConstructorFlags flags);
[DllImport("Ws2_32.dll", EntryPoint = "WSAGetLastError", SetLastError = true, CharSet = CharSet.Ansi, ExactSpelling = true, CallingConvention = CallingConvention.StdCall)]
public static extern int WSAGetLastError();
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern SocketError bind([In] SafeSocketHandle socketHandle, [In] byte[] socketAddress, [In] int socketAddressSize);
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern SocketError listen([In] SafeSocketHandle socketHandle, [In] int backlog);
[DllImport("ws2_32.dll", SetLastError = true, ExactSpelling = true)]
internal static extern SafeSocketHandle accept([In] IntPtr socketHandle, [Out] byte[] socketAddress, [In, Out] ref int socketAddressSize);
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern SocketError WSARecv([In] SafeSocketHandle socketHandle, [In, Out] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In, Out] ref SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);
[DllImport("kernel32.dll", CharSet = CharSet.Auto, SetLastError = true)]
public static extern unsafe bool GetQueuedCompletionStatus(SafeFileHandle CompletionPort,
out uint lpNumberOfBytes, out IntPtr lpCompletionKey,
out IntPtr lpOverlapped, uint dwMilliseconds);
[ReliabilityContract(Consistency.WillNotCorruptState, Cer.Success), DllImport("ws2_32.dll", SetLastError = true, ExactSpelling = true)]
internal static extern SocketError closesocket([In] IntPtr socketHandle);
[DllImport("ws2_32.dll", SetLastError = true)]
internal static extern SocketError WSASend([In] SafeSocketHandle socketHandle, [In] ref WSABuffer buffer, [In] int bufferCount, out int bytesTransferred, [In] SocketFlags socketFlags, [In] IntPtr overlapped, [In] IntPtr completionRoutine);
}
#endregion
public class WawaIocpTest
{
public static void Main(String[] args)
{
IocpTest.Run();
}
}
}
小結:希望通過這3篇帖子能加深大家對c#開發windows上的網絡應用的了解。