前面用C++實現了windows平台上的網絡完成端口組件,那麼到C#中如何做了?起初我打算通過PInvoke來調用win底層API來仿照C++實現,但問題很快就出來了--C#中的Unsafe指針無法穩定的指向一塊緩沖區的首地址,也就是說當垃圾回收進行的時候,我們的unsafe指針的值可能已經無效了。用pin?我也想過,可是鎖住所有的TCP接收緩沖區,會極大的降低運行時的效率。難道沒有辦法了嗎?想想完成端口模型的本質思想是將"啟動異步操作的線程"和"提供服務的線程"(即工作者線程)拆伙。做到這一點不就夠了嗎,這是本質的東西。
分析一下我們需要幾種類型的線程,首先我們需要一個線程來接收TCP連接請求,這就是所謂監聽線程,當成功的接收到一個連接後,就向連接發送一個異步接收數據的請求,由於是異步操作,所以會立即返回,然後再去接收新的連接請求,如此監聽線程就循環運作起來了。值得提出的是,在異步接收的回調函數中,應該對接收到的數據進行處理,完成端口模型所做的就是將接收到的數據放在了完成端口隊列中,注意,是一個隊列。
第二種線程類型,就是工作者線程。工作者線程的個數有個經驗值是( Cpu個數×2 + 2),當然具體取多少,還要取決於你的應用的要求。工作者線程的任務就是不斷地從完成端口隊列中取出數據,並處理它,然後如果有回復,再將回復寫入對應的連接。
剛才已經提到,我打算在C#中不使用Pinvoke來實現完成端口,而在.NET平台中是沒有特定的完成端口隊列這個組件或類的,所以我們首先要實現一個這樣的隊列。
好,讓我們來定義接口IRequestQueueManager,用於模擬完成端口的隊列。
下面給出其定義和默認實現。
using System;
using System.Collections ;
namespace EnterpriseServerBase.Network
{
/// <summary>
/// IRequestQueueManager 用於模擬完成端口的隊列。
/// </summary>
public interface IRequestQueueManager :IRequestPusher
{
//void Push(object package) ;
object Pop() ;
void Clear() ;
int Length {get ;}
}
//向隊列中的Push一個請求包
public interface IRequestPusher
{
void Push(object package) ;
}
/// <summary>
/// IRequestQueueManager 的默認實現
/// </summary>
public class RequestQueueManager :IRequestQueueManager
{
private Queue queue = null ;
public RequestQueueManager()
{
this.queue = new Queue() ;
}
public void Push(object package)
{
lock(this.queue)
{
this.queue.Enqueue(package) ;
}
}
public object Pop()
{
object package = null ;
lock(this.queue)
{
if(this.queue.Count > 0)
{
package = this.queue.Dequeue() ;
}
}
return package ;
}
public void Clear()
{
lock(this.queue)
{
this.queue.Clear() ;
}
}
public int Length
{
get
{
return this.queue.Count ;
}
}
}
}
在IRequestQueueManager的基礎上,可以將工作者線程和啟動異步操作的線程拆開了。由於工作者線程只與端口隊列相關,所以我決定將它們一起封裝起來--IIOCPManager
來看看完成端口類如何實現:
using System;
using System.Threading ;
namespace EnterpriseServerBase.Network
{
/// <summary>
/// IIOCPManager 完成端口管理者,主要管理工作者線程和完成端口隊列。
/// 2005.05.23
/// </summary>
public interface IIOCPManager : IRequestPusher
{
void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;
void Start() ; //啟動工作者線程
void Stop() ; //退出工作者線程
int WorkThreadCount{get ;}
event CallBackPackageHandled PackageHandled ;
}
//IOCPPackageHandler 用於處理從完成端口隊列中取出的package
public interface IOCPPackageHandler
{
void HandlerPackage(object package) ; //一般以同步實現
}
public delegate void CallBackPackageHandled(object package) ;
/// <summary>
/// IOCPManager 是IIOCPManager的默認實現
/// </summary>
public class IOCPManager :IIOCPManager
{
private IRequestQueueManager requestQueueMgr = null ;
private IOCPPackageHandler packageHandler ;
private int workThreadCount = 0 ; //實際的工作者線程數
private int MaxThreadCount = 0 ;
private bool stateIsStop = true ;
public IOCPManager()
{
}
#region ICPWorkThreadManager 成員
public event CallBackPackageHandled PackageHandled ;
public void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount )
{
this.requestQueueMgr = new RequestQueueManager() ;
this.MaxThreadCount = threadCount ;
this.packageHandler = i_packageHandler ;
}
public void Push(object package)
{
this.requestQueueMgr.Push(package) ;
}
public void Start()
{
if(! this.stateIsStop)
{
return ;
}
this.stateIsStop = false ;
this.CreateWorkThreads() ;
}
public void Stop()
{
if(this.stateIsStop)
{
return ;
}
this.stateIsStop = true ;
//等待所有工作者線程結束
int count = 0 ;
while(this.workThreadCount != 0)
{
if(count < 10)
{
Thread.Sleep(200) ;
}
else
{
throw new Exception("WorkThread Not Terminated !") ;
}
++ count ;
}
this.requestQueueMgr.Clear() ;
}
public int WorkThreadCount
{
get
{
return this.workThreadCount ;
}
}
#endregion
#region CreateWorkThreads
private void CreateWorkThreads()
{
for(int i= 0 ;i< this.MaxThreadCount ;i++)
{
Thread t = new Thread(new ThreadStart(this.ServeOverLap)) ;
Interlocked.Increment(ref this.workThreadCount) ;
t.Start() ;
}
}
#endregion
#region ServeOverLap 工作者線程
private void ServeOverLap()
{
while(! this.stateIsStop)
{
object package = this.requestQueueMgr.Pop() ;
&