程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> C# >> C#入門知識 >> C# 完成端口組件實現精髓

C# 完成端口組件實現精髓

編輯:C#入門知識

前面用C++實現了windows平台上的網絡完成端口組件,那麼到C#中如何做了?起初我打算通過PInvoke來調用win底層API來仿照C++實現,但問題很快就出來了--C#中的Unsafe指針無法穩定的指向一塊緩沖區的首地址,也就是說當垃圾回收進行的時候,我們的unsafe指針的值可能已經無效了。用pin?我也想過,可是鎖住所有的TCP接收緩沖區,會極大的降低運行時的效率。難道沒有辦法了嗎?想想完成端口模型的本質思想是將"啟動異步操作的線程"和"提供服務的線程"(即工作者線程)拆伙。做到這一點不就夠了嗎,這是本質的東西。

        分析一下我們需要幾種類型的線程,首先我們需要一個線程來接收TCP連接請求,這就是所謂監聽線程,當成功的接收到一個連接後,就向連接發送一個異步接收數據的請求,由於是異步操作,所以會立即返回,然後再去接收新的連接請求,如此監聽線程就循環運作起來了。值得提出的是,在異步接收的回調函數中,應該對接收到的數據進行處理,完成端口模型所做的就是將接收到的數據放在了完成端口隊列中,注意,是一個隊列。
        第二種線程類型,就是工作者線程。工作者線程的個數有個經驗值是( Cpu個數×2 + 2),當然具體取多少,還要取決於你的應用的要求。工作者線程的任務就是不斷地從完成端口隊列中取出數據,並處理它,然後如果有回復,再將回復寫入對應的連接。

剛才已經提到,我打算在C#中不使用Pinvoke來實現完成端口,而在.NET平台中是沒有特定的完成端口隊列這個組件或類的,所以我們首先要實現一個這樣的隊列。

         好,讓我們來定義接口IRequestQueueManager,用於模擬完成端口的隊列。

下面給出其定義和默認實現。

using System;
using System.Collections ;

namespace EnterpriseServerBase.Network
{
 /// <summary>
 /// IRequestQueueManager 用於模擬完成端口的隊列。
 /// </summary>
 public interface IRequestQueueManager :IRequestPusher
 {
  //void Push(object package) ;
  object Pop() ;
  void Clear() ;
  int Length {get ;}
 }

 //向隊列中的Push一個請求包
 public interface IRequestPusher
 {
  void Push(object package) ;
 }
 
 /// <summary>
 /// IRequestQueueManager 的默認實現
 /// </summary>
 public class RequestQueueManager :IRequestQueueManager
 {
  private Queue queue = null ;

  public RequestQueueManager()
  {
   this.queue = new Queue() ;
  }

  public void Push(object package)
  {
   lock(this.queue)
   {
    this.queue.Enqueue(package) ;
   }
  }

  public object Pop()
  {
   object package = null ;

   lock(this.queue)
   {
    if(this.queue.Count > 0)
    {   
     package = this.queue.Dequeue() ;
    }
   }

   return package ;
  }

  public void Clear()
  {
   lock(this.queue)
   {
    this.queue.Clear() ;
   }
  }

  public int Length
  {
   get
   {
    return this.queue.Count ;
   }
  }

 }
}

在IRequestQueueManager的基礎上,可以將工作者線程和啟動異步操作的線程拆開了。由於工作者線程只與端口隊列相關,所以我決定將它們一起封裝起來--IIOCPManager

來看看完成端口類如何實現:

using System;
using System.Threading ;

namespace EnterpriseServerBase.Network
{
 /// <summary>
 /// IIOCPManager 完成端口管理者,主要管理工作者線程和完成端口隊列。
 /// 2005.05.23
 /// </summary>
 public interface IIOCPManager : IRequestPusher
 {
  void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount) ;
  void Start() ; //啟動工作者線程
  void Stop() ;  //退出工作者線程

  int WorkThreadCount{get ;}

  event CallBackPackageHandled PackageHandled ;
 }

 //IOCPPackageHandler 用於處理從完成端口隊列中取出的package
 public interface IOCPPackageHandler
 {
   void HandlerPackage(object package) ; //一般以同步實現
 }

 public delegate void CallBackPackageHandled(object package) ;

 /// <summary>
 /// IOCPManager 是IIOCPManager的默認實現
 /// </summary>
 public class IOCPManager :IIOCPManager
 {
  private IRequestQueueManager requestQueueMgr = null ;
  private IOCPPackageHandler packageHandler ;
  private int workThreadCount = 0 ; //實際的工作者線程數
  private int MaxThreadCount  = 0 ;

  private bool stateIsStop = true ;
 
  public IOCPManager()
  {  
  }


  #region ICPWorkThreadManager 成員
  public event CallBackPackageHandled PackageHandled ;

  public void Initialize(IOCPPackageHandler i_packageHandler ,int threadCount )
  {
   this.requestQueueMgr = new RequestQueueManager() ;
   this.MaxThreadCount = threadCount ;
   this.packageHandler = i_packageHandler ;
  }

  public void Push(object package)
  {
   this.requestQueueMgr.Push(package) ;
  }

  public void Start()
  {
   if(! this.stateIsStop)
   {
    return ;
   }

   this.stateIsStop = false ;
   this.CreateWorkThreads() ;

  }

  public void Stop()
  {
   if(this.stateIsStop)
   {
    return ;
   }

   this.stateIsStop = true ;

   //等待所有工作者線程結束
   int count = 0 ;
   while(this.workThreadCount != 0)
   {
    if(count < 10)
    {
     Thread.Sleep(200) ;
    }
    else
    {   
     throw new Exception("WorkThread Not Terminated !") ;
    }

    ++ count ;
   }

   this.requestQueueMgr.Clear() ;
  }  

  public int WorkThreadCount
  {
   get
   {
    return this.workThreadCount ;
   }
  }
  #endregion

  #region CreateWorkThreads
  private void CreateWorkThreads()
  {
   for(int i= 0 ;i< this.MaxThreadCount ;i++)
   {
    Thread t = new Thread(new ThreadStart(this.ServeOverLap)) ;
    Interlocked.Increment(ref this.workThreadCount) ;
    t.Start() ;
   }
  }
  #endregion

  #region ServeOverLap 工作者線程
  private void ServeOverLap()
  {
   while(! this.stateIsStop)
   {
    object package = this.requestQueueMgr.Pop() ;
&

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved