從上一篇文章的反響來看,似乎大家對於這一話題並沒有太大興趣。而這篇文章將會 為大家帶來一個簡單但完整的Actor模型實現。此外,在下一篇文章中……可能會出現一 些讓您覺得有趣的東西。:)
任務分配邏輯
如上文所述,這次要實現的是一個非常簡單的Actor模型,使用基於事件的分配方式, 直接把任務交給.NET自帶的線程池去使用。不過我們又該什麼時候把一個Actor推入線程 池的執行隊列呢?這其實取決於我們執行Actor的兩個“基本原則”:
如果Actor的郵箱中包含消息,那麼要盡早執行。
對於單個Actor對象來說,它的消息是順序執行的。
因此,我們有兩個“時機”可以把一個Actor交由線程池去執行:
當Actor接收到一個消息(且該Actor處於“等待”狀態)
當Actor執行完一個消息(且Actor的郵箱中存在更多消息)
顯然,在進行操作時需要小心處理並發造成的問題,因為一個“執行完”和多個“接 受到”事件可能同時出現。如果操作不當,則容易出現各種錯誤的情況:
某個Actor的郵箱未空,卻已停止執行。
同一個Actor的兩個消息被並行地處理。
Actor的郵箱已經沒有消息,卻被要求再次執行。
至於並行控制的方式,就請關注下面的實現吧。
簡單的Actor模型實現
Actor模型中最關鍵的莫過於Actor對象的實現。一個Actor的功能有如下三種:
將消息放入郵箱
接受並處理消息
循環/退出循環
因此Actor抽象類對外的接口大致如下:
public abstract class Actor<T> : IActor
{
protected abstract void Receive(T message);
protected void Exit() { ... }
public void Post(T message) { ... }
}
三個方法的簽名應該已經充分說明了各自的含義。不過IActor又是什麼呢?請看它的 定義:
internal interface IActor
{
void Execute();
bool Existed { get; }
int MessageCount { get; }
ActorContext Context { get; }
}
這是一個internal修飾的類型,這意味著它的訪問級別被限制在程序集內部。IActor 接口的作用是作為一個統一的類型,交給Dispatcher——也就是Actor模型的任務分發邏 輯所使用的。IActor接口的前三個成員很容易從名稱上理解其含義,那麼ActorContext又 是做什麼用的呢?
internal class ActorContext
{
public ActorContext(IActor actor)
{
this.Actor = actor;
}
public IActor Actor { get; private set; }
...
}
public abstract class Actor<T> : IActor
{
protected Actor()
{
this.m_context = new ActorContext(this);
}
private ActorContext m_context;
ActorContext IActor.Context
{
get
{
return this.m_context;
}
}
...
}
在多線程的環境中,進行一些同步控制是非常重要的事情。線程同步的常用手段是 lock,不過如果要減小鎖的粒度,那麼勢必會使用Interlocked類下的CAS等原子操作,而 那些操作只能針對最基礎的域變量,而不能針對經過封裝的屬性或方法等成員。 ActorContext便包含了用於同步控制,以及其他直接表示Actor內部狀態各種字段的對象 。這樣,我們便可以通過ActorContext對象來實現一個Lock-Free的鏈表或隊列。您可以 會說,那麼為什麼要用獨立的ActorContext類型,而不直接把字段放置在統一的基類(例 如ActorBase)中呢?這有兩點原因,第一點是所謂的“統一控制”便於管理,而第二點 才是更為關鍵的:後文會涉及到F#對這Actor模型的使用,只可惜F#在對待父類的 internal成員時有一個bug,因此不得不把相關實現替換成接口(IActor)。不過這不是 本文的主題,我們下次再討論F#的問題。
ActorContext目前只有一個字段——沒錯,只需要一個,這個字段便是表示狀態的 m_status。
internal class ActorContext
{
...
public const int WAITING = 0;
public const int EXECUTING = 1;
public const int EXITED = 2;
public int m_status;
}
m_status字段的類型為int,而不是枚舉,這是為了可以使用Interlocked中的CAS操作 。而對這個狀態的操作,也正好形成了我們同步操作過程中的“壁壘”。我們的每個 Actor在任意時刻都處於三種狀態之一:
等待(Waiting):郵箱為空,或剛執行完一個消息,正等待分配任務。
執行(Executing):正在執行一個消息(確切地說,由於線程池的緣故,它也可能是 還在隊列中等待,不過從概念上理解,我們認為它“已經”執行了)。
退出(Exited):已經退出,不會再執行任何消息。
顯然,只有當m_status為WAITING時才能夠為Actor分配運算資源(線程)以便執行, 而分配好資源(將其推入.NET線程池)之後,它的狀態就要變成EXECUTING。這恰好可以 用一個原子操作形成我們需要的“壁壘”,可以讓多個“請求”,“有且只有一個”成功 ,即“把Actor的執行任務塞入線程池”。如下:
internal class Dispatcher
{
...
public void ReadyToExecute(IActor actor)
{
if (actor.Existed) return;
int status = Interlocked.CompareExchange(
ref actor.Context.m_status,
ActorContext.EXECUTING,
ActorContext.WAITING);
if (status == ActorContext.WAITING)
{
ThreadPool.QueueUserWorkItem(this.Execute, actor);
}
}
...
}
CompareExchange方法返回這次原子操作前m_status的值,如果它為WAITING,那麼這 次操作(也僅有這次操作)成功地將m_status修改為EXECUTING。在這個情況下,Actor將 會被放入線程池,將會由Execute方法來執行。從上述實現中我們可以發現,這個方法在 多線程的情況下也能夠正常工作。那麼ReadyToExecute方法該在什麼地方被調用呢?應該 說是在任何“可能”讓Actor開始執行的時候得到調用。按照文章開始的說法,其中一個 情況便是“當Actor接收到一個消息時”:
public abstract class Actor<T> : IActor
{
...
private Queue<T> m_messageQueue = new Queue<T> ();
...
public void Post(T message)
{
if (this.m_exited) return;
lock (this.m_messageQueue)
{
this.m_messageQueue.Enqueue(message);
}
Dispatcher.Instance.ReadyToExecute(this);
}
}
而另一個地方,自然是消息“執行完畢”,且Actor的郵箱中還擁有消息的時候,則再 次為其分配運算資源。這便是Dispatcher.Execute方法的邏輯:
public abstract class Actor<T> : IActor
{
...
bool IActor.Existed
{
get
{
return this.m_exited;
}
}
int IActor.MessageCount
{
get
{
return this.m_messageQueue.Count;
}
}
void IActor.Execute()
{
T message;
lock (this.m_messageQueue)
{
message = this.m_messageQueue.Dequeue();
}
this.Receive(message);
}
private bool m_exited = false;
protected void Exit()
{
this.m_exited = true;
}
...
}
internal class Dispatcher
{
...
private void Execute(object o)
{
IActor actor = (IActor)o;
actor.Execute();
當程序執行到此處時,actor的Execute方法已經從郵箱尾部獲取了一條消息,並交由 用戶實現的Receive方法執行。同時,Actor的Exit方法也可能被調用,使它的Exited屬性 返回true。不過到目前為止,因為ActorContext.m_status一直保持為EXECUTING,因此這 段時間中任意新消息所造成的ReadyToExecute方法的調用都不會為Actor再次分配運算資 源。不過接下來,我們將會修改m_status,這可能會造成競爭。那麼我們又該怎麼處理呢 ?
如果用戶調用了Actor.Exit方法,那麼它的Exited屬性則會返回true,我們可以將 m_status設為EXITED,這樣Actor再也不會回到WAITING狀態,也就避免了無謂的資源分配 :
if (actor.Existed)
{
Thread.VolatileWrite(
ref actor.Context.m_status,
ActorContext.EXITED);
}
else
{
如果Actor沒有退出,那麼它會被短暫地切換為WAITING狀態。此後如果Actor的郵箱中 存在剩余的消息,那麼我們會再次調用ReadyToExecute方法“嘗試”再次為Actor分配運 算資源:
Thread.VolatileWrite(
ref actor.Context.m_status,
ActorContext.WAITING);
if (actor.MessageCount > 0)
{
this.ReadyToExecute(actor);
}
}
}
}
顯然,在VolatileWrite和ReadyToExecute方法之間,可能會到來一條新的消息,因而 再次引發一次並行地ReadyToExecute調用。不過根據我們之前的分析,這樣的競爭並不會 造成問題,因此在這方面我們可以完全放心。
至此,我們已經完整地實現了一個簡單的Actor模型,邏輯清晰,功能完整——而這一 切,僅僅用了不到150行代碼。不用懷疑,這的確是事實。
使用示例
Actor模型的關鍵在於消息傳遞形式(Message Passing Style)的工作方式,通信的 唯一手段便是傳遞消息。在使用我們的Actor模型之前,我們需要繼承Actor<T>類 來構建一個真正的Actor類型。例如一個最簡單的計數器:
public class Counter : Actor<int>
{
private int m_value;
public Counter() : this(0) { }
public Counter(int initial)
{
this.m_value = initial;
}
protected override void Receive(int message)
{
this.m_value += message;
if (message == -1)
{
Console.WriteLine(this.m_value);
this.Exit();
}
}
}
當計數器收到-1以外的數值時,便會累加到它的計數器上,否則便會打印出當前的值 並退出。這裡無需做任何同步方面的考慮,因為對於單個Actor來說,所有的消息都是依 次處理,不會出現並發的情況。Counter的使用自然非常簡單:
static void Main(string[] args)
{
Counter counter = new Counter();
for (int i = 0; i < 10000; i++)
{
counter.Post(i);
}
counter.Post(-1);
Console.ReadLine();
}
不過您可能會問,這樣的調用又有什麼作用,又能實現什麼呢?您現在可以去網上搜 索一些Actor模型解決問題的示例,或者您可以等待下一篇文章中,我們使用F#來操作這 個Actor模型。您會發現,配合F#的一些特性,這個Actor模型會變得更加實用,更為有趣 。
此外,在下一篇文章裡我們也會對這個Actor模型進行簡單的性能分析。如果您要把它 用在生產環境中,那麼可能還需要對它再進行一些細微地調整。