程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> .NET網頁編程 >> 關於.NET >> ActorLite:一個輕量級Actor模型實現(中)

ActorLite:一個輕量級Actor模型實現(中)

編輯:關於.NET

從上一篇文章的反響來看,似乎大家對於這一話題並沒有太大興趣。而這篇文章將會 為大家帶來一個簡單但完整的Actor模型實現。此外,在下一篇文章中……可能會出現一 些讓您覺得有趣的東西。:)

任務分配邏輯

如上文所述,這次要實現的是一個非常簡單的Actor模型,使用基於事件的分配方式, 直接把任務交給.NET自帶的線程池去使用。不過我們又該什麼時候把一個Actor推入線程 池的執行隊列呢?這其實取決於我們執行Actor的兩個“基本原則”:

如果Actor的郵箱中包含消息,那麼要盡早執行。

對於單個Actor對象來說,它的消息是順序執行的。

因此,我們有兩個“時機”可以把一個Actor交由線程池去執行:

當Actor接收到一個消息(且該Actor處於“等待”狀態)

當Actor執行完一個消息(且Actor的郵箱中存在更多消息)

顯然,在進行操作時需要小心處理並發造成的問題,因為一個“執行完”和多個“接 受到”事件可能同時出現。如果操作不當,則容易出現各種錯誤的情況:

某個Actor的郵箱未空,卻已停止執行。

同一個Actor的兩個消息被並行地處理。

Actor的郵箱已經沒有消息,卻被要求再次執行。

至於並行控制的方式,就請關注下面的實現吧。

簡單的Actor模型實現

Actor模型中最關鍵的莫過於Actor對象的實現。一個Actor的功能有如下三種:

將消息放入郵箱

接受並處理消息

循環/退出循環

因此Actor抽象類對外的接口大致如下:

public abstract class Actor<T> : IActor
{
   protected abstract void Receive(T message);

   protected void Exit() { ... }

   public void Post(T message) { ... }
}

三個方法的簽名應該已經充分說明了各自的含義。不過IActor又是什麼呢?請看它的 定義:

internal interface IActor
{
   void Execute();

   bool Existed { get; }

   int MessageCount { get; }

   ActorContext Context { get; }
}

這是一個internal修飾的類型,這意味著它的訪問級別被限制在程序集內部。IActor 接口的作用是作為一個統一的類型,交給Dispatcher——也就是Actor模型的任務分發邏 輯所使用的。IActor接口的前三個成員很容易從名稱上理解其含義,那麼ActorContext又 是做什麼用的呢?

internal class ActorContext
{
   public ActorContext(IActor actor)
   {
     this.Actor = actor;
   }

   public IActor Actor { get; private set; }

   ...
}

public abstract class Actor<T> : IActor
{
   protected Actor()
   {
     this.m_context = new ActorContext(this);
   }

   private ActorContext m_context;
   ActorContext IActor.Context
   {
     get
     {
       return this.m_context;
     }
   }

   ...
}

在多線程的環境中,進行一些同步控制是非常重要的事情。線程同步的常用手段是 lock,不過如果要減小鎖的粒度,那麼勢必會使用Interlocked類下的CAS等原子操作,而 那些操作只能針對最基礎的域變量,而不能針對經過封裝的屬性或方法等成員。 ActorContext便包含了用於同步控制,以及其他直接表示Actor內部狀態各種字段的對象 。這樣,我們便可以通過ActorContext對象來實現一個Lock-Free的鏈表或隊列。您可以 會說,那麼為什麼要用獨立的ActorContext類型,而不直接把字段放置在統一的基類(例 如ActorBase)中呢?這有兩點原因,第一點是所謂的“統一控制”便於管理,而第二點 才是更為關鍵的:後文會涉及到F#對這Actor模型的使用,只可惜F#在對待父類的 internal成員時有一個bug,因此不得不把相關實現替換成接口(IActor)。不過這不是 本文的主題,我們下次再討論F#的問題。

ActorContext目前只有一個字段——沒錯,只需要一個,這個字段便是表示狀態的 m_status。

internal class ActorContext
{
   ...

   public const int WAITING = 0;
   public const int EXECUTING = 1;
   public const int EXITED = 2;

   public int m_status;
}

m_status字段的類型為int,而不是枚舉,這是為了可以使用Interlocked中的CAS操作 。而對這個狀態的操作,也正好形成了我們同步操作過程中的“壁壘”。我們的每個 Actor在任意時刻都處於三種狀態之一:

等待(Waiting):郵箱為空,或剛執行完一個消息,正等待分配任務。

執行(Executing):正在執行一個消息(確切地說,由於線程池的緣故,它也可能是 還在隊列中等待,不過從概念上理解,我們認為它“已經”執行了)。

退出(Exited):已經退出,不會再執行任何消息。

顯然,只有當m_status為WAITING時才能夠為Actor分配運算資源(線程)以便執行, 而分配好資源(將其推入.NET線程池)之後,它的狀態就要變成EXECUTING。這恰好可以 用一個原子操作形成我們需要的“壁壘”,可以讓多個“請求”,“有且只有一個”成功 ,即“把Actor的執行任務塞入線程池”。如下:

internal class Dispatcher
{
   ...

   public void ReadyToExecute(IActor actor)
   {
     if (actor.Existed) return;

     int status = Interlocked.CompareExchange(
       ref actor.Context.m_status,
       ActorContext.EXECUTING,
       ActorContext.WAITING);

     if (status == ActorContext.WAITING)
     {
       ThreadPool.QueueUserWorkItem(this.Execute, actor);
     }
   }

   ...
}

CompareExchange方法返回這次原子操作前m_status的值,如果它為WAITING,那麼這 次操作(也僅有這次操作)成功地將m_status修改為EXECUTING。在這個情況下,Actor將 會被放入線程池,將會由Execute方法來執行。從上述實現中我們可以發現,這個方法在 多線程的情況下也能夠正常工作。那麼ReadyToExecute方法該在什麼地方被調用呢?應該 說是在任何“可能”讓Actor開始執行的時候得到調用。按照文章開始的說法,其中一個 情況便是“當Actor接收到一個消息時”:

public abstract class Actor<T> : IActor
{
   ...

   private Queue<T> m_messageQueue = new Queue<T> ();

   ...

   public void Post(T message)
   {
     if (this.m_exited) return;

     lock (this.m_messageQueue)
     {
       this.m_messageQueue.Enqueue(message);
     }

     Dispatcher.Instance.ReadyToExecute(this);
   }
}

而另一個地方,自然是消息“執行完畢”,且Actor的郵箱中還擁有消息的時候,則再 次為其分配運算資源。這便是Dispatcher.Execute方法的邏輯:

public abstract class Actor<T> : IActor
{
   ...

   bool IActor.Existed
   {
     get
     {
       return this.m_exited;
     }
   }

   int IActor.MessageCount
   {
     get
     {
       return this.m_messageQueue.Count;
     }
   }

   void IActor.Execute()
   {
     T message;
     lock (this.m_messageQueue)
     {
       message = this.m_messageQueue.Dequeue();
     }

     this.Receive(message);
   }

   private bool m_exited = false;

   protected void Exit()
   {
     this.m_exited = true;
   }

   ...
}

internal class Dispatcher
{
   ...

   private void Execute(object o)
   {
     IActor actor = (IActor)o;
     actor.Execute();

當程序執行到此處時,actor的Execute方法已經從郵箱尾部獲取了一條消息,並交由 用戶實現的Receive方法執行。同時,Actor的Exit方法也可能被調用,使它的Exited屬性 返回true。不過到目前為止,因為ActorContext.m_status一直保持為EXECUTING,因此這 段時間中任意新消息所造成的ReadyToExecute方法的調用都不會為Actor再次分配運算資 源。不過接下來,我們將會修改m_status,這可能會造成競爭。那麼我們又該怎麼處理呢 ?

如果用戶調用了Actor.Exit方法,那麼它的Exited屬性則會返回true,我們可以將 m_status設為EXITED,這樣Actor再也不會回到WAITING狀態,也就避免了無謂的資源分配 :

if (actor.Existed)
     {
       Thread.VolatileWrite(
         ref actor.Context.m_status,
         ActorContext.EXITED);
     }
     else
     {

如果Actor沒有退出,那麼它會被短暫地切換為WAITING狀態。此後如果Actor的郵箱中 存在剩余的消息,那麼我們會再次調用ReadyToExecute方法“嘗試”再次為Actor分配運 算資源:

Thread.VolatileWrite(
         ref actor.Context.m_status,
         ActorContext.WAITING);

       if (actor.MessageCount > 0)
       {
         this.ReadyToExecute(actor);
       }
     }
   }
}

顯然,在VolatileWrite和ReadyToExecute方法之間,可能會到來一條新的消息,因而 再次引發一次並行地ReadyToExecute調用。不過根據我們之前的分析,這樣的競爭並不會 造成問題,因此在這方面我們可以完全放心。

至此,我們已經完整地實現了一個簡單的Actor模型,邏輯清晰,功能完整——而這一 切,僅僅用了不到150行代碼。不用懷疑,這的確是事實。

使用示例

Actor模型的關鍵在於消息傳遞形式(Message Passing Style)的工作方式,通信的 唯一手段便是傳遞消息。在使用我們的Actor模型之前,我們需要繼承Actor<T>類 來構建一個真正的Actor類型。例如一個最簡單的計數器:

public class Counter : Actor<int>
{
   private int m_value;

   public Counter() : this(0) { }

   public Counter(int initial)
   {
     this.m_value = initial;
   }

   protected override void Receive(int message)
   {
     this.m_value += message;

     if (message == -1)
     {
       Console.WriteLine(this.m_value);
       this.Exit();
     }
   }
}

當計數器收到-1以外的數值時,便會累加到它的計數器上,否則便會打印出當前的值 並退出。這裡無需做任何同步方面的考慮,因為對於單個Actor來說,所有的消息都是依 次處理,不會出現並發的情況。Counter的使用自然非常簡單:

static void Main(string[] args)
{
   Counter counter = new Counter();
   for (int i = 0; i < 10000; i++)
   {
     counter.Post(i);
   }

   counter.Post(-1);

   Console.ReadLine();
}

不過您可能會問,這樣的調用又有什麼作用,又能實現什麼呢?您現在可以去網上搜 索一些Actor模型解決問題的示例,或者您可以等待下一篇文章中,我們使用F#來操作這 個Actor模型。您會發現,配合F#的一些特性,這個Actor模型會變得更加實用,更為有趣 。

此外,在下一篇文章裡我們也會對這個Actor模型進行簡單的性能分析。如果您要把它 用在生產環境中,那麼可能還需要對它再進行一些細微地調整。

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved