即使 Java 6 和 Java 7 中引入並發性更新,Java 語言仍然無法讓並行編程變得特別容易。Java 線程、synchronized 代碼塊、wait/notify 和 java.util.concurrent 包都擁有自己的位置,但面對多核系統的容量壓力,Java 開發人員正在 依靠其他語言中開創的技術。actor 模型就是這樣一項技術,它已在 Erlang、Groovy 和 Scala 中實現。本文為那些希望 體驗 actor 但又要繼續編寫 Java 代碼的開發人員帶來了 μJavaActors 庫。
μJavaActors 庫 是一個緊湊的庫, 用於在 Java 平台上實現基於 actor 的系統(μ 表示希臘字母 Mμ,意指 “微型”)。在本文中,我使用 μJavaActors 探討 actor 在 Producer/Consumer 和 Map/Reduce 等常見設計模式中的工作原理。
您隨時可以 下載 μJavaActors 庫的源代碼。
Java 平台上的 actor 並發性
這個名稱有何含義?具有任何其他名稱的 actor 也適用!
基於 actor 的系統 通過實現一種消息傳遞 模式,使並行處理更容易編碼。在此模式中,系統中的每個 actor 都可接 收消息;執行該消息所表示的操作;然後將消息發送給其他 actor(包括它們自己)以執行復雜的操作序列。actor 之間的 所有消息是異步的,這意味著發送者會在收到任何回復之前繼續進行處理。因此,一個 actor 可能終生都陷入接收和處理 消息的無限循環中。
當使用多個 actor 時,獨立的活動可輕松分配到多個可並行執行消息的線程上(進而分配在多 個處理器上)。一般而言,每個 actor 都在一個獨立線程上處理消息。一些 actor 系統靜態地向 actor 分配線程;而其 他系統(比如本文中介紹的系統)則會動態地分配它們。
μJavaActors 簡介
μJavaActors 是 actor 系統 的一個簡單的 Java 實現。只有 1,200 行代碼,μJavaActors 雖然很小,但很強大。在下面的練習中,您將學習如何使用 μJavaActors 動態地創建和管理 actor,將消息傳送給它們。
μJavaActors 圍繞 3 個核心界面而構建:
消息 是在 actor 之間發送的消息。Message 是 3 個(可選的)值和一些行為的容器:
source 是發送 actor。
subject 是定義消息含義的字符串(也稱為命令)。
data 是消息的任何參數數據;通常是一個映射、列表或數組。參數可以是要處理和/或其他 actor 要與之交互的數據 。
subjectMatches() 檢查消息主題是否與字符串或正則表達式匹配。
μJavaActors 包的默認消息類是 DefaultMessage。
ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager 擁 有以下關鍵行為或特征:
createActor() 創建一個 actor 並將它與此管理器相關聯。
startActor() 啟動一個 actor。
detachActor() 停止一個 actor 並將它與此管理器斷開。
send()/broadcast() 將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或所有 actor。
在大部分程序中,只有一個 ActorManager,但如果您希望管理多個線程和/或 actor 池,也可以有多個 ActorManager。 此接口的默認實現是 DefaultActorManager。
Actor 是一個執行單元,一次處理一條消息。Actor 具有以下關鍵行為或特征:
每個 actor 有一個 name,該名稱在每個 ActorManager 中必須是惟一的。
每個 actor 屬於一個 category;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬 於一個類別。
只要 ActorManager 可以提供一個執行 actor 的線程,系統就會調用 receive()。為了保持最高效率,actor 應該迅 速處理消息,而不要進入漫長的等待狀態(比如等待人為輸入)。
willReceive() 允許 actor 過濾潛在的消息主題。
peek() 允許該 actor 和其他 actor 查看是否存在掛起的消息(或許是為了選擇主題)。
remove() 允許該 actor 和其他 actor 刪除或取消任何尚未處理的消息。
getMessageCount() 允許該 actor 和其他 actor 獲取掛起的消息數量。
getMaxMessageCount() 允許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。
大部分程序都有許多 actor,這些 actor 常常具有不同的類型。actor 可在程序啟動時創建或在程序執行時創建(和銷 毀)。本文中的 actor 包 包含一個名為 AbstractActor 的抽象類,actor 實現基於該類。
圖 1 顯示了 actor 之間的關系。每個 actor 可向其他 actor 發送消息。這些消息保存在一個消息隊列(也稱為郵 箱;從概念上講,每個 actor 有一個隊列,當 ActorManager 看到某個線程可用於處理消息時,就會從隊列中刪除該消息 ,並將它傳送給在線程下運行的 actor,以便處理該消息。
圖 1. actor 之間的關系
μJavaActors 的並行 執行功能
現在您已可開始使用 μJavaActors 實現並行執行了。首先要創建一組 actor。這些是簡單的 actor,因 為它們所做的只是延遲少量時間並將消息發送給其他 actor。這樣做的效果是創建一個消息風暴,您首先會看到如何創建 actor,然後會看到如何逐步分派它們來處理消息。
有兩種消息類型:
initialization (init) 會導致 actor 初始化。僅需為每個 actor 發送一次這種類型的消息。
repeat 會導致 actor 發送 N-1 條消息,其中 N 是一個傳入的消息參數。
清單 1 中的 TestActor 實現從 AbstractActor 繼承的抽象方法。activate 和 deactivate 方法向 actor 通知它的 壽命信息;此示例中不會執行任何其他操作。runBody 方法是在收到任何消息之前、首次創建 actor 的時候調用的。它通 常用於將第一批消息引導至 actor。testMessage 方法在 actor 即將收到消息時調用;這裡 actor 可拒絕或接受消息。在 本例中,actor 使用繼承的 testMessage 方法測試消息接受情況;因此接受了所有消息。
清單 1. TestActor
class TestActor extends AbstractActor { @Override public void activate() { super.activate(); } @Override public void deactivate() { super.deactivate(); } @Override protected void runBody() { sleeper(1); // delay up to 1 second DefaultMessage dm = new DefaultMessage("init", 8); getManager().send(dm, null, this); } @Override protected Message testMessage() { return super.testMessage(); }
loopBody 方法(如清單 2 中所示)在 actor 收到一條消息時調用。在通過較短延遲來模擬某種一般性處理 之後,才開始處理該消息。如果消息為 “repeat”,那麼 actor 基於 count 參數開始發送另外 N-1 條消息。這些消息通 過調用 actor 管理器的 send 方法發送給一個隨機 actor。
清單 2. loopBody()
@Override protected void loopBody(Message m) { sleeper(1); String subject = m.getSubject(); if ("repeat".equals(subject)) { int count = (Integer) m.getData(); if (count > 0) { DefaultMessage dm = new DefaultMessage("repeat", count - 1); String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT); Actor to = testActors.get(toName); getManager().send(dm, this, to); } }
如果消息為 “init”,那麼 actor 通過向隨機選擇的 actor 或一個屬於 common 類別的 actor 發送兩組消 息,啟動 repeat 消息隊列。一些消息可立即處理(實際上在 actor 准備接收它們且有一個線程可用時即可處理);其他 消息則必須等待幾秒才能運行。這種延遲的消息處理對本示例不是很重要,但它可用於實現對長期運行的流程(比如等待用 戶輸入或等待對網絡請求的響應到達)的輪詢。
清單 3. 一個初始化序列
else if ("init".equals (subject)) { int count = (Integer) m.getData(); count = rand.nextInt(count) + 1; for (int i = 0; i < count; i++) { DefaultMessage dm = new DefaultMessage("repeat", count); String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT); Actor to = testActors.get(toName); getManager().send(dm, this, to); dm = new DefaultMessage("repeat", count); dm.setDelayUntil(new Date().getTime() + (rand.nextInt(5) + 1) * 1000); getManager().send(dm, this, "common"); } }
否則,表明消息不適合並會報告一個錯誤:
else { System.out.printf("TestActor:%s loopBody unknown subject: %s%n", getName(), subject); } } }
主要程序包含清單 4 中的代碼,它在 common 類別中創建了 2 個 actor,在 default 類別中創建了 5 個 actor,然後啟動它們。然後 main 至多會等待 120 秒(sleeper 等待它的參數值的時間約為 1000ms),定期顯示進度消 息。
清單 4. createActor、startActor
DefaultActorManager am = DefaultActorManager.getDefaultInstance(); : Map<String, Actor> testActors = new HashMap<String, Actor>(); for (int i = 0; i < 2; i++) { Actor a = am.createActor(TestActor.class, "common" + i); a.setCategory("common"); testActors.put(a.getName(), a); } for (int i = 0; i < 5; i++) { Actor a = am.createActor(TestActor.class, "actor" + i); testActors.put(a.getName(), a); } for (String key : testActors.keySet()) { am.startActor(testActors.get(key)); } for (int i = 120; i > 0; i--) { if (i < 10 || i % 10 == 0) { System.out.printf("main waiting: %d...%n", i); } sleeper(1); } : am.terminateAndWait();
跟蹤輸出
要理解剛執行的流程,讓我們看看來自 actor 的一些跟蹤輸出。(請 注意,因為對計數和延遲使用了隨機數,所以每次執行的輸出可能有所不同。)在清單 5 中,可以看到在程序啟動後不久 出現的消息。左列(括號中)是執行的線程名稱。在此次運行中,有 25 個線程可用於處理消息。每行的剩余部分(經過刪 減)是跟蹤輸出,顯示了收到的每條消息。請注意,repeat 計數 — 也就是參數數據,它在減少不斷。(另請注意,線程 名稱與 actor 的名稱毫無關系,盡管該名稱是以 actor 開頭。)
清單 5. 跟蹤輸出:程序啟動
[main ] - main waiting: 120... [actor17 ] - TestActor:actor4 repeat(4) [actor0 ] - TestActor:actor1 repeat(4) [actor10 ] - TestActor:common1 repeat(4) [actor1 ] - TestActor:actor2 repeat(4) [actor3 ] - TestActor:actor0 init(8) [actor22 ] - TestActor:actor3 repeat(4) [actor17 ] - TestActor:actor4 init(7) [actor20 ] - TestActor:common0 repeat(4) [actor24 ] - TestActor:actor0 repeat(4) [actor0 ] - TestActor:actor1 init(3) [actor1 ] - TestActor:actor2 repeat(4) [actor20 ] - TestActor:common0 repeat(4) [actor17 ] - TestActor:actor4 repeat(4) [actor17 ] - TestActor:actor4 repeat(3) [actor0 ] - TestActor:actor1 repeat(8) [actor10 ] - TestActor:common1 repeat(4) [actor24 ] - TestActor:actor0 repeat(8) [actor0 ] - TestActor:actor1 repeat(8) [actor24 ] - TestActor:actor0 repeat(7) [actor22 ] - TestActor:actor3 repeat(4) [actor1 ] - TestActor:actor2 repeat(3) [actor20 ] - TestActor:common0 repeat(4) [actor22 ] - TestActor:actor3 init(5) [actor24 ] - TestActor:actor0 repeat(7) [actor10 ] - TestActor:common1 repeat(4) [actor17 ] - TestActor:actor4 repeat(8) [actor1 ] - TestActor:actor2 repeat(3) [actor17 ] - TestActor:actor4 repeat(8) [actor0 ] - TestActor:actor1 repeat(8) [actor10 ] - TestActor:common1 repeat(4) [actor22 ] - TestActor:actor3 repeat(8) [actor0 ] - TestActor:actor1 repeat(7) [actor1 ] - TestActor:actor2 repeat(3) [actor0 ] - TestActor:actor1 repeat(3) [actor20 ] - TestActor:common0 repeat(4) [actor24 ] - TestActor:actor0 repeat(7) [actor24 ] - TestActor:actor0 repeat(6) [actor10 ] - TestActor:common1 repeat(8) [actor17 ] - TestActor:actor4 repeat(7)
在清單 6 中,可以看到在程序即將結束時出現的消息,這時 repeat 計數已減小。如果觀察此程序的執行,您將能夠看到生成各行的速度在逐漸減慢;這是因為生成的消息數量在逐漸 減少。如果等待足夠長時間,發送給 actor 的消息會完全停止(與清單 6 中所示的 common actor 上發生的一樣)。請注 意,消息處理工作合理地分散在可用的線程上,並且沒有任何 actor 被綁定到特定的線程上。
清單 6. 跟蹤輸出: 程序結束
[main ] - main waiting: 20... [actor0 ] - TestActor:actor4 repeat(0) [actor2 ] - TestActor:actor2 repeat(1) [actor3 ] - TestActor:actor0 repeat(0) [actor17 ] - TestActor:actor4 repeat(0) [actor0 ] - TestActor:actor1 repeat(2) [actor3 ] - TestActor:actor2 repeat(1) [actor14 ] - TestActor:actor1 repeat(2) [actor5 ] - TestActor:actor4 repeat(0) [actor14 ] - TestActor:actor2 repeat(0) [actor21 ] - TestActor:actor1 repeat(0) [actor14 ] - TestActor:actor0 repeat(1) [actor14 ] - TestActor:actor4 repeat(0) [actor5 ] - TestActor:actor2 repeat(1) [actor5 ] - TestActor:actor4 repeat(1) [actor6 ] - TestActor:actor1 repeat(1) [actor5 ] - TestActor:actor3 repeat(0) [actor6 ] - TestActor:actor2 repeat(1) [actor4 ] - TestActor:actor0 repeat(0) [actor5 ] - TestActor:actor4 repeat(1) [actor12 ] - TestActor:actor1 repeat(0) [actor20 ] - TestActor:actor2 repeat(2) [main ] - main waiting: 10... [actor7 ] - TestActor:actor4 repeat(2) [actor23 ] - TestActor:actor1 repeat(0) [actor13 ] - TestActor:actor2 repeat(1) [actor8 ] - TestActor:actor0 repeat(0) [main ] - main waiting: 9... [actor2 ] - TestActor:actor1 repeat(0) [main ] - main waiting: 8... [actor7 ] - TestActor:actor2 repeat(0) [actor13 ] - TestActor:actor1 repeat(0) [main ] - main waiting: 7... [actor2 ] - TestActor:actor2 repeat(2) [main ] - main waiting: 6... [main ] - main waiting: 5... [actor18 ] - TestActor:actor1 repeat(1) [main ] - main waiting: 4... [actor15 ] - TestActor:actor2 repeat(0) [actor16 ] - TestActor:actor1 repeat(1) [main ] - main waiting: 3... [main ] - main waiting: 2... [main ] - main waiting: 1... [actor4 ] - TestActor:actor1 repeat(0) [actor6 ] - TestActor:actor2 repeat(0)
模擬屏幕截圖
很難從前面的跟蹤信息中全面了解 actor 系統的行為,很大程度上是因為並不是所有跟蹤格式都有用。可以使用一個類 似 actor 模擬執行的快照圖像,以圖形格式查看相同的信息。每個圖像顯示一段固定時期之後的模擬情況。以下視頻演示 了一些未被代碼示例和屏幕截圖采集到的 Java actor 流程。可以在本地或在 土豆 上查看下面的視頻。
在 此處 查看腳本。
圖 2 顯示了在運行任何模擬之前模擬的用戶界面。請注意右側顯示的模擬菜單。
圖 2. 運行任何模擬之前的 actor 模擬器
在 此處 查看此圖的完整版本。
屏幕的頂部區域顯示了一個包含多種變體的模擬菜單;除非另行說明,否則下列模擬將如跟蹤輸出和以下屏幕截圖中所 示:
倒計時模擬 (0:15) 創建將一個值倒計時到 0 並發送更多請求的 actor。
Producer/Consumer 模擬 (2:40) 在經典的 Producer/Consumer 並發性問題上創建了一個變體。
Map/Reduce 模擬 (5:28) 創建對 1000 個整數的平方和的並行執行操作。
病毒掃描 模擬 (6:45) 掃描一個磁盤目錄樹來查找 “.txt” 文件(限制掃描的數量),檢測可疑的內容模式。這個沒 有 CPU 限制的模擬未在以下屏幕截圖中顯示,但它是 視頻演示的一部分。
所有模擬並發運行,但這僅是在視頻演示中 (8:18)。
該視頻格式顯示了按順序運行的所有這些模擬,每個模擬之間具有較短的暫停時間。
除了 Start 和 Stop 之外 ,圖 2 中的屏幕截圖還顯示了以下控件和設置。(請注意,Stop 不會停止線程,所以某些操作在停止線程後可能仍在進行 。)
Redistribute 半隨機地在 actor 圓圈中重新分配 actor(默認順序是創建順序)。這使您能夠更容易地通過重新放置 actor 來查看分組到一起的鄰近 actor 之間的消息。它還可以向 actor 分配新顏色。
Add Task 和 Remove Task 在啟動工具中添加或刪除任務(線程)。Remove Task 將僅刪除添加的(而不是原始的)任 務。
Maximum steps(使用值的 log2)限制模擬的持續時間,僅在模擬啟動之前生效。各個步驟大約持續 1 秒。
Show actors as transparent 使用戶更容易看到鄰近 actor 之間的消息。不透明的 actor 常常更容易看到。可以在模 擬運行時更改此設置。
Number of threads to use spinner 僅在模擬啟動之前生效。許多模擬的運行速度要比更多線程快得多。
控件下面的顯示區域顯示了當前的線程使用情況(顯示為過去的 1 秒中的平均值)。大型的中心區域會顯示模擬。底部 區域會顯示模擬歷史。右側區域會顯示完整的模擬軌跡。當運行時,模擬框架按如下方式配置:
控制區域是大約每秒更新一次的儀表顯示:
每秒接受的消息數。
每秒完成的消息數。
每秒接受的消息數與完成的消息數的比率。
如果活動顯示在右側,那麼到達的消息比正在處理的消息要多一些;最 終,消息緩沖區會發生溢出。如果活動顯示在左側,正在處理的消息比到達的消息要多一些;最終,系統會空閒下來。平衡 的系統會顯示 0 或者僅較長時間內顯示綠色水平線。
中心區域之上是一個包含綠條的網格;每個綠條表示一個線程(就像外部圓圈中一樣)。全綠的條帶表示線程正被全 面利用,全黃的條帶表示線程完全空閒。
在中央區域,正方形的外環表示線程(在這些模擬中為 10 個,在以前的跟蹤軌跡中有 25 個)。綠色線程附加到一 個 actor 來執行收到的消息;中心的點的顏色表示 actor 類型。接近正方形的數字是當前分配給此線程的 actor 數量( 從左側的 0 開始順時針排列到 360 度)。黃色線程是空閒的。
內部的圓環表示 actor;顏色表示類型(在第一個示例中僅有一種類型)。如果 actor 正忙於處理一條消息,它會顯 示在更暗的陰影中(如果使用了非透明的 actor,這會更加明顯)。圓圈 (actor) 之間的線表示消息。任何淺紅色的線都 是在給定刷新周期中發送的新消息(模擬每秒刷新 10 次);其他顏色是緩沖的消息(過去發送過來的,但目前仍未處理) 。緩沖線在接收端有一個小圓圈;該圓圈隨著緩沖消息數量增加而變大。
最右端顯示輸出軌跡;此軌跡類似於前面探討的軌跡,但更詳細一些。
圖像底部是一組較小的圓圈;每個圓圈是在過去定期顯示的主要圓圈的縮小版本。這提供了一種查看消息隨時間變化 的趨勢的輕松方式。如果觀察此歷史,您就會看到消息將迅速積壓,然後逐漸減少。
圖 3 顯示了執行大約 10 秒後的模擬效果。請注意大量的掛起消息,它們是迅速累積起來的。有 34 個 actor,但僅 有 10 個線程,所以一些 actor 需要空閒下來。在此時,所有線程都忙於處理消息。
圖 3. 啟動倒計時模擬效果 (0:15)
圖 4 是執行大約 30 秒後的模擬。掛起消息的數量已大大減少。由於消息到達率更低一些,所以只有部分線程在繁忙 地處理消息。
圖 4. 中期的倒計時模擬效果
圖 5 是執行大約 90 秒後的模擬。現在所有掛起的消息都已處理,因此所有線程都是空閒的。
圖 5. 完成時的倒計時模擬效果
一個 Producer/Consumer 系統中的 actor
接下來,讓我們看一下 Producer/Consumer 模式下的 actor 的演示。Producer/Consumer 是多處理器系統的一種最 常見的同步模式。在下面的 μJavaActors 演示中,生成者 actor 生成要求使用者 actor 創建各種項的請求。使用者會創 建這些項(這需要一定的時間),然後將一條完成消息發送回請求的生成者。
圖 6 顯示了執行大約 30 秒後的模擬 效果。請注意,兩種 actor 類型按顏色區分。生成者 actor 首先顯示在屏幕右下側。生成者在運行時創建使用者,所以隨 後才會顯示後者。工作負載隨時間的流逝而緩慢減少,大部分線程都很忙。請注意,生成者會迅速完成它們的工作,以至於 它們很少顯示為活動狀態。
圖 6. 啟動不久後的 Producer/Consumer 模擬 (2:40)
圖 7 顯示了執行大約 115 秒後的模擬,這接近程序完成的時間。新請求和掛起的消息的數量已經大大減少。在視頻 演示中,您可能注意到,一些 actor 在很短時間內顯示為未填充的圓圈;這些是處理發送給它們自身的消息的 actor。
圖 7. 接近結束時的 Producer/Consumer 模擬效果
ProducerActor
清單 7 顯示了演示中的生成者 actor 的代碼。這裡的 “produceN” 消息已處理。它轉換成為了一條 “produce1” 消息,該 actor 將該消息發送給自己。預期的響應記錄是一個掛起的回復計數,以供以後驗證。
清單 7. 生成者 actor
public class ProducerActor extends AbstractActor { Map<String , Integer> expected = new ConcurrentHashMap<String , Integer>(); @Override protected void loopBody(Message m) { String subject = m.getSubject(); if ("produceN".equals(subject)) { Object[] input = (Object[]) m.getData(); int count = (Integer) input[0]; if (count > 0) { DefaultActorTest.sleeper(1); // this takes some time String type = (String) input[1]; // request the consumers to consume work (i.e., produce) Integer mcount = expected.get(type); if (mcount == null) { mcount = new Integer(0); } mcount += count; expected.put(type, mcount); DefaultMessage dm = new DefaultMessage("produce1", new Object[] { count, type }); getManager().send(dm, this, this); }
在清單 8 中,“produce1” 消息已被處理。如果剩余計數大於 0,它會轉換為一條 “construct” 消 息並發送給使用者。請注意,此邏輯可能已作為對計數值的一個 for 循環來完成,而不是重新發送 “produce1” 消息。 重新發送該消息常常會帶來更出色的線程負載,尤其在循環主體會話占用大量時間的時候。
清單 8. 處理一個生成 者請求
} else if ("produce1".equals(subject)) { Object[] input = (Object[]) m.getData(); int count = (Integer) input[0]; if (count > 0) { sleep(100); // take a little time String type = (String) input[1]; m = new DefaultMessage("construct", type); getManager().send(m, this, getConsumerCategory()); m = new DefaultMessage("produce1", new Object[] { count - 1, type }); getManager().send(m, this, this); }
在清單 9 中,“constructionComplete” 消息(由一個使用者發送)已被處理。它會對掛起的回復計數進行 遞減。如果一切正常,在模擬完成時,所有 actor 和類型值的此計數都將為 0。
清單 9. constructionComplete
} else if ("constructionComplete".equals(subject)) { String type = (String) m.getData(); Integer mcount = expected.get(type); if (mcount != null) { mcount--; expected.put(type, mcount); }
“init” 消息在清單 10 中處理。生成者創建一些使用者 actor,然後向它自己發送多條 produceN 請求。
清單 10. 初始化
} else if ("init".equals(subject)) { // create some consumers; 1 to 3 x consumers per producer for (int i = 0; i < DefaultActorTest.nextInt(3) + 1; i++) { Actor a = getManager().createAndStartActor(ConsumerActor.class, String.format("%s_consumer%02d", getName(), i)); a.setCategory(getConsumerCategory()); if (actorTest != null) { actorTest.getTestActors().put(a.getName(), a); } } // request myself create some work items for (int i = 0; i < DefaultActorTest.nextInt(10) + 1; i++) { m = new DefaultMessage("produceN", new Object[] { DefaultActorTest.nextInt(10) + 1, DefaultActorTest.getItemTypes()[ DefaultActorTest.nextInt(DefaultActorTest.getItemTypes().length)] }); getManager().send(m, this, this); }
清單 11 處理無效的消息:
清單 11. 處理無效的消息
} else { System.out.printf("ProducerActor:%s loopBody unknown subject: %s%n", getName(), subject); } } protected String getConsumerCategory() { return getName() + "_consumer"; } }
ConsumerActor
使用者(consumer) actor 很簡單。它處理 “construct” 消息並向請求者發送回復消 息。使用者 actor 的代碼如清單 12 所示:
清單 12. 使用者 actor
public class ConsumerActor extends AbstractActor { @Override protected void loopBody(Message m) { String subject = m.getSubject(); if ("construct".equals(subject)) { String type = (String) m.getData(); delay(type); // takes ~ 1 to N seconds DefaultMessage dm = new DefaultMessage("constructionComplete", type); getManager().send(dm, this, m.getSource()); } else if ("init".equals(subject)) { // nothing to do } else { System.out.printf("ConsumerActor:%s loopBody unknown subject: %s%n", getName(), subject); } }
清單 13 中處理的生產延遲基於構造的項的類型。從跟蹤軌跡中,您可以回想起支持的項類型為 widget、 framit、frizzle、gothca 和 splat。每個類型需要花不同的時間量來構造。
清單 13. 生產延遲
protected void delay(String type) { int delay = 1; for (int i = 0; i < DefaultActorTest.getItemTypes().length; i++) { if (DefaultActorTest.getItemTypes()[i].equals(type)) { break; } delay++; } DefaultActorTest.sleeper(DefaultActorTest.nextInt(delay) + 1); } }
Producer/Consumer 模式中的 actor
Producer/Consumer 演示表明創建 actor 實現非常簡單。典型的 actor 會解碼收到的消息並處理它們,就像在一個 case 語句中一樣。實際的處理在本示例中微不足道,只是短暫的時間延 遲。在真實應用程序中會更復雜,但不會超過使用標准 Java 同步技術的實現;通常它會簡單得多。
在此演示中, 還應注意的是,復雜且重復性的算法可分解為離散(且常常可重用)的步驟。可為每個步驟分配一個不同的主題名稱,時每 個主題的情形變得非常簡單。當狀態包含在消息參數中時(比如前面演示的倒計時值),許多 actor 會變得無狀態。這樣 的程序非常容易定義和擴展(添加更多 actor 來匹配更多線程),也可以在多線程環境中安全地運行;這類似於在行數樣 式編程中使用不可變的值。
actor 的更多模式
出於特定的用途,Producer/Consumer 演示中的 actor 是硬 編碼的,但這並不是您在編碼 actor 時的惟一選擇。在本節中,您將學習如何在更加通用的模式中使用 actor,首先需要 改寫 Gang of Four Command 模式。
清單 14 中的 actor 實現大部分 Java 開發人員應該熟悉的 Command 模式的 一種變體。在這裡,CommandActor 支持兩種消息:“execute” 和 “executeStatic。”
清單 14. CommandActor
public class CommandActor extends AbstractActor { @Override protected void loopBody(Message m) { String subject = m.getSubject(); if ("execute".equals(subject)) { excuteMethod(m, false); } else if ("executeStatic".equals(subject)) { excuteMethod(m, true); } else if ("init".equals(subject)) { // nothing to do } else { System.out.printf("CommandActor:%s loopBody unknown subject: %s", getName(), subject); } }
清單 15 中的 executeMethod 方法加載了一個參數化的類,在該類或該類的實例上調用一個方法,然後返回 該方法的結果或發生的任何異常。您可以看到這個簡單的 actor 如何用於運行類路徑上具有合適的執行方法的所有服務類 。id 參數由客戶端發送,所以它可以將響應與創建它們的請求進行關聯。回復常常按照與發出時不同的順序返回。
清單 15. 執行一個參數化方法
private void excuteMethod(Message m, boolean fstatic) { Object res = null; Object id = null; try { Object[] params = (Object[]) m.getData(); id = params[0]; String className = (String) params[1]; params = params.length > 2 ? (Object[]) params[2] : null; Class<?> clazz = Class.forName(className); Method method = clazz.getMethod(fstatic ? "executeStatic" : "execute", new Class[] { Object.class }); if (Modifier.isStatic(method.getModifiers()) == fstatic) { Object target = fstatic ? null : clazz.newInstance(); res = method.invoke(target, params); } } catch (Exception e) { res = e; } DefaultMessage dm = new DefaultMessage("executeComplete", new Object[] { id, res }); getManager().send(dm, this, m.getSource()); } }
Event Listener 模式中的 actor
清單 16 中的 DelegatingActor 實現一種基於熟悉的 Java Event Listener(或 Callback)模式的類似的一般方法。它將到達的每條消息映射到每個注冊的監聽器上的一個 onMessage 回調 ,直到某個回調使用(也就是處理)該事件。這種委托方法可顯著減少 actor 系統與它的消息處理器之間的聯系。
清單 16. DelegatingActor
public class DelegatingActor extends AbstractActor { private List<MessageListener> listeners = new LinkedList<MessageListener>(); public void addMessageListener(MessageListener ml) { if (!listeners.contains(ml)) { listeners.add(ml); } } public void removeMessageListener(MessageListener ml) { listeners.remove(ml); } protected void fireMessageListeners(MessageEvent me) { for (MessageListener ml : listeners) { if (me.isConsumed()) { break; } ml.onMessage(me); } } @Override protected void loopBody(Message m) { fireMessageListeners(new MessageEvent(this, m)); } }
DelegatingActor 類(如清單 17 所示)依賴於 MessageEvent 和 MessageListener 類:
清單 17. DelegatingActor
/** Defines a message arrival event. */ public static class MessageEvent extends EventObject { private Message message; public Message getMessage() { return message; } public void setMessage(Message message) { this.message = message; } private boolean consumed; public boolean isConsumed() { return consumed; } public void setConsumed(boolean consumed) { this.consumed = consumed; } public MessageEvent(Object source, Message msg) { super(source); setMessage(msg); } } /** Defines the message arrival call back. */ public interface MessageListener { void onMessage(MessageEvent me); }
DelegatingActor 的一種示例用法如清單 18 所示:
清單 18. DelegatingActor 的示例用法
public static void addDelegate(DelegatingActor da) { MessageListener ml = new Echo("Hello world!"); da.addMessageListener(ml); } public class Echo implements MessageListener { protected String message; public Echo(String message) { this.message = message; } @Override public void onMessage(MessageEvent me) { if ("echo".equals(me.getMessage().getSubject())) { System.out.printf("%s says \"%s\".%n", me.getMessage().getSource(), message); me.setConsumed(true); } } }
Map/Reduce 模式中的 actor
清單 14 到清單 18 中的示例 actor 簡單且一目了然,因為消息僅朝一個方向發送。如果該行為需要反饋(比如當一個 流程只有在處理了所有以前的消息後才能繼續時),情況可能變得更加復雜。例如,請考慮這樣一種 Map/Reduce 實現,其 中的 reduce 階段只有在 map 階段完成後才能開始。
Map/Reduce 用於在處理大量數據的程序上實現並行處理。在 下面的示例中,map 函數接受一個較大的項列表,然後將它分解為分區,發送一條消息來映射每個分區。我選擇在每個映射 請求上遞增一個消息計數,讓分區的映射處理器發送一條會遞減該計數的回復。當計數為 0 時,所有映射已完成且 reduce 階段可以啟動。類似地,reduce 階段對該列表分區(再次實現並行性)並發送消息來 reduce 分區。像 map 階段中 一樣,reduce 也會統計它的消息,所以可以檢測到遞減操作的完成。要處理的值列表和計數在每個消息中作為消息傳輸。
對於本示例,我對許多主題使用了同一種 actor 類型。您也可以使用多種 actor 類型,為每個 actor 使用更少的 主題(最少 1 個)。
圖 8 是執行大約 20 秒後的 Map/Reduce 模擬。這是一個繁忙的處理階段,所以線程都被處 理消息所占用。
圖 8. 啟動不久後的 Map/Reduce (5:28)
使用 MapReduceer 進行映射和縮減
請注意,此實現是可插拔的;它可運行 MapReduceer 接口的任何實現,如清 單 19 所示。
清單 19. MapReduceer
public interface MapReduceer { /** * Map (in place) the elements of an array. * * @param values elements to map * @param start start position in values * @param end end position in values */ void map(Object[] values, int start, int end); /** * Reduce the elements of an array. * * @param values elements to reduce * @param start start position in values * @param end end position in values * @param target place to set reduced value * @param posn position in target to place the value */ void reduce(Object[] values, int start, int end, Object[] target, int posn); }
例如,您可以使用 MapReduceer 計算一組整數的平方和,如清單 20 所示:
清單 20. MapReduceer 計 算
public class SumOfSquaresReducer implements MapReduceer { @Override public void map(Object[] values, int start, int end) { for (int i = start; i <= end; i++) { values[i] = ((BigInteger) values[i]).multiply((BigInteger) values[i]); sleep(200); // fake taking time } } @Override public void reduce(Object[] values, int start, int end, Object[] target, int posn) { BigInteger res = new BigInteger("0"); for (int i = start; i <= end; i++) { res = res.add((BigInteger) values[i]); sleep(100); // fake taking time } target[posn] = res; } }
MapReduceActor
Map/Reduce actor 分解為多個主題,每個主題具有一個簡單的任務。您將在下面的代碼 示例中看到它們每一個。您也可以在視頻演示中查看 Map/Reduce 操作;觀看模擬,然後研究代碼示例,這會讓您非常清楚 地了解如何使用 actor 實現 Map/Reduce。(請注意,以下清單中的主題順序可按任意多種方式分解;我將示例代碼設計為 包含許多次發送,以讓視頻演示更有趣。)
mapReduce 主題(如清單 21 所示)通過對輸入數組分區來啟動 Map/Reduce,它通過發送 createPartition 消息來進行分區。Map 和 Redu測 參數是在一個 MapReduceParameters 實例中 提供的,該實例根據需要進行了克隆和修改,然後傳遞出去。請注意,該操作不需要時間延遲;我添加它們是為了確保將在 用戶界面中看到模擬。
清單 21. mapReduce
@Override protected void loopBody(Message m) { ActorManager manager = getManager(); String subject = m.getSubject(); if ("mapReduce".equals(subject)) { try { MapReduceParameters p = (MapReduceParameters) m.getData(); int index = 0; int count = (p.end - p.start + 1 + partitionSize - 1) / partitionSize; sleep(1000); // split up into partition size chunks while (p.end - p.start + 1 >= partitionSize) { MapReduceParameters xp = new MapReduceParameters(p); xp.end = xp.start + partitionSize - 1; DefaultMessage lm = new DefaultMessage("createPartition", new Object[] { xp, index, count }); manager.send(lm, this, getCategory()); p.start += partitionSize; index++; } if (p.end - p.start + 1 > 0) { DefaultMessage lm = new DefaultMessage("createPartition", new Object[] { p, index, count }); manager.send(lm, this, getCategory()); } } catch (Exception e) { triageException("mapFailed", m, e); } }
createPartition 主題創建了更多 actor,並將請求轉發給一個工作線程,如清單 22 所示。請注意, createMapReduceActor 方法在它將創建的 actor 數量上有一個上限(目前為 25)。
清單 22. createPartition
} else if ("createPartition".equals(subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; int index = (Integer) oa[1]; int count = (Integer) oa[2]; sleep(500); createMapReduceActor(this); DefaultMessage lm = new DefaultMessage("mapWorker", new Object[] { p, index, count }); manager.send(lm, this, getCategory()); } catch (Exception e) { triageException("createPartitionFailed", m, e); } }
清單 23 中的 mapWorker 主題在其分區上通過提供的 MapReducer 調用 map 操作,然後在回復中表明映射分區 是完整的:
清單 23. mapWorker
} else if ("mapWorker".equals(subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; int index = (Integer) oa[1]; int count = (Integer) oa[2]; sleep(100); p.mr.map(p.values, p.start, p.end); DefaultMessage rm = new DefaultMessage("mapResponse", new Object[] { p, index, count }); manager.send(rm, this, getCategoryName()); } catch (Exception e) { triageException("mapWorkerFailed", m, e); } }
然後,清單 24 中的 mapResponse 主題會完成 MapReduceParameters 實例(它包含計數)並啟動 Reduce 流程 :
清單 24. mapResponse
} else if ("mapResponse".equals(subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; int index = (Integer) oa[1]; int count = (Integer) oa[2]; sleep(100); p.complete(); DefaultMessage rm = new DefaultMessage("reduce", new Object[] { p, index, count }); manager.send(rm, this, getCategoryName()); } catch (Exception e) { triageException("mapResponseFailed", m, e); } }
接下來,reduce 消息會將請求轉發給某個工作線程,如清單 25 所示:
清單 25. reduce
} else if ("reduce".equals(subject)) { try { MapReduceParameters p = null; int index = 0, count = 0; Object o = m.getData(); if (o instanceof MapReduceParameters) { p = (MapReduceParameters) o; } else { Object[] oa = (Object[]) o; p = (MapReduceParameters) oa[0]; index = (Integer) oa[1]; count = (Integer) oa[2]; } sleep(100); if (p.end - p.start + 1 > 0) { createMapReduceActor(this); MapReduceParameters xp = new MapReduceParameters(p); DefaultMessage lm = new DefaultMessage("reduceWorker", new Object[] { xp, index, count }); manager.send(lm, this, getCategory()); } } catch (Exception e) { triageException("reduceFailed", m, e); } }
清單 26 中的 reduceWorker 主題在其分區上通過提供的 MapReducer 調用 reduce 操作,然後在回復中表明 Reduce 操作已完成。如果所有 Reduce 操作都已完成,則會在回復中表明 Map/Reduce 操作已完成。
清單 26. reduceWorker
} else if ("reduceWorker".equals(subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; int index = (Integer) oa[1]; int count = (Integer) oa[2]; sleep(100); if (index >= 0) { p.mr.reduce(p.values, p.start, p.end, p.target, index); DefaultMessage rm = new DefaultMessage("reduceResponse", new Object[] { p, index, count }); manager.send(rm, this, getCategory()); } else { Object[] res = new Object[1]; p.mr.reduce(p.target, 0, count - 1, res, 0); DefaultMessage rm = new DefaultMessage("done", new Object[] { p, res[0] }); manager.send(rm, this, getCategory()); } } catch (Exception e) { triageException("reduceWorkerFailed", m, e); } }
接下來,清單 27 中的 reduceResponse 主題會完成該分區,並測試所有分區是否已完成,然後表明結果:
清單 27. reduceResponse
} else if ("reduceResponse".equals(subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; int index = (Integer) oa[1]; int count = (Integer) oa[2]; sleep(100); p.complete(); if (p.isSetComplete()) { if (count > 0) { createMapReduceActor(this); MapReduceParameters xp = new MapReduceParameters(p); DefaultMessage lm = new DefaultMessage("reduceWorker", new Object[] { xp, -1, count }); manager.send(lm, this, getCategory()); } } } catch (Exception e) { triageException("mapResponseFailed", m, e); } }
最後,清單 28 中的 done 主題會報告結果:
清單 28. done
} else if ("done".equals (subject)) { try { Object[] oa = (Object[]) m.getData(); MapReduceParameters p = (MapReduceParameters) oa[0]; Object res = oa[1]; sleep(100); System.out.printf("**** mapReduce done with result %s", res); } catch (Exception e) { triageException("mapResponseFailed", m, e); } }
繼續執行循環,init 主題啟動另一個 Map/Reduce 流程,如清單 29 中所示。為每個 Map/Reduce 提供一個不 同的 “集合” 名稱,使多個 Map/Reduce 可同時運行。
清單 29. 初始化另一個 Map/Reduce
} else if ("init".equals(subject)) { try { Object[] params = (Object[]) m.getData(); if (params != null) { Object[] values = (Object[]) params[0]; Object[] targets = (Object[]) params[1]; Class clazz = (Class) params[2]; MapReduceer mr = (MapReduceer) clazz.newInstance(); sleep(2 * 1000); MapReduceParameters p = new MapReduceParameters("mrSet_" + setCount++, values, targets, mr, this); DefaultMessage rm = new DefaultMessage("mapReduce", p); manager.send(rm, this, getCategoryName()); } } catch (Exception e) { triageException("initFailed", m, e); } } else { System.out.printf("**** MapReduceActor:%s loopBody unexpected subject: %s", getName(), subject); } } }
Map/Reduce 主要過程
清單 30 中的 MapReduceActor 實現創建了一些數據值,並在這些數據上運行一個 Map/Reduce。它將分區大小設置為 10。
清單 30. Map/Reduce 主要過程
BigInteger[] values = new BigInteger[1000]; for (int i = 0; i < values.length; i++) { values[i] = new BigInteger(Long.toString((long)rand.nextInt(values.length))); } BigInteger[] targets = new BigInteger[Math.max(1, values.length / 10)]; // start at least 5 actors DefaultActorManager am = new DefaultActorManager(); MapReduceActor.createMapReduceActor(am, 10); MapReduceActor.createMapReduceActor(am, 10); MapReduceActor.createMapReduceActor(am, 10); MapReduceActor.createMapReduceActor(am, 10); MapReduceActor.createMapReduceActor(am, 10); DefaultMessage dm = new DefaultMessage("init", new Object[] { values, targets, SumOfSquaresReducer.class }); am.send(dm, null, MapReduceActor.getCategoryName());
Map/Reduce 是一種最普遍的分而治之設計模式。從最 基本的函數編程算法一直到大規模並行處理(Google 用於構建它自己的 Web 搜索引擎索引的類型),都可以看見到它的身 影。μJavaActors 庫能夠以某種直觀的方式實現這一高級模式,這凸顯了它的強大功能以及潛在的用途。
μJavaActors 庫的內幕
管理器對 actor 說:不要找我;我會去找您。
您已看到如何使用 actor 將一些常見的面向對象模式用於其他用途。現在可以考慮一下 μJavaActors 系統的實現細節,即 AbstractActor 和 DefaultActorManager 類。我將僅討論每個類的關鍵方法;您可以查看 μJavaActors 源代碼 來獲取更多實現細節。
AbstractActor
每個 actor 都知道管理它的 ActorManager。actor 使用該管理器幫助它將消息發送給其他 actor。
在清單 31 中,receive 方法有條件地處理一條消息。如果 testMessage 方法返回 null,那麼將不會使用 任何消息。否則,會從 actor 的消息隊列中刪除消息,並通過調用 loopBody 方法來處理它。每個具體的 actor 子類都必 須提供此方法。無論在哪種情況下,actor 都會通過調用管理器的 awaitMessage 方法來等待更多消息傳來。
清單 31. AbstractActor 實現 DefaultActorManager
public abstract class AbstractActor implements Actor { protected DefaultActorManager manager; @Override public boolean receive() { Message m = testMessage(); boolean res = m != null; if (res) { remove(m); try { loopBody(m); } catch (Exception e) { System.out.printf("loop exception: %s%n", e); } } manager.awaitMessage(this); return res; } abstract protected void loopBody(Message m);
每個 actor 都可以實現 willReceive 方法來控制將接受哪 些消息主題(表明它將放在消息列表中);默認情況下,會接受所有具有非空主題的消息。每個 actor 還可以實現 testMessage 方法來檢查是否有消息可供處理(也就是說,它存在於消息列表中);默認情況下,這一監督工作是通過使用 peekNext 方法來實現的。
清單 32. willReceive()、testMessage() 和 peekNext()
@Override public boolean willReceive(String subject) { return !isEmpty(subject); } protected Message testMessage() { return getMatch(null, false); } protected Message getMatch(String subject, boolean isRegExpr) { Message res = null; synchronized (messages) { res = peekNext(subject, isRegExpr); } return res; }
消息容量
actor 可具有無限 或有限 的消息容量。一般而言,有限的容量更好,因為它可幫助檢測不受 控制的消息發送者。任何客戶端(但通常是 ActorManager)均可向 actor 添加未經篩選的消息。請注意,對 messages 列 表的所有訪問都是異步的。
清單 33. 消息處理
public static final int DEFAULT_MAX_MESSAGES = 100; protected List<DefaultMessage> messages = new LinkedList<DefaultMessage>(); @Override public int getMessageCount() { synchronized (messages) { return messages.size(); } } @Override public int getMaxMessageCount() { return DEFAULT_MAX_MESSAGES; } public void addMessage(Message message) { synchronized (messages) { if (messages.size() < getMaxMessageCount()) { messages.add(message); } else { throw new IllegalStateException("too many messages, cannot add"); } } } @Override public boolean remove(Message message) { synchronized (messages) { return messages.remove(message); } }
消息匹配
客戶端(具體來講是 actor 本身)可檢查一個 actor 是否擁有掛起的消息。這可用於不按發 送順序處理消息,或者為某些主題提供優先級。消息匹配是通過測試消息主題與一個字符串值的同等性來完成的,或者通過 將一個正則表達式與一個參數值匹配來完成的。null 主題匹配任何消息。再次提醒,請注意,對消息列表的所有訪問都是 異步的。
清單 34. peekNext()
@Override public Message peekNext() { return peekNext(null); } @Override public Message peekNext(String subject) { return peekNext(subject, false); } @Override public Message peekNext(String subject, boolean isRegExpr) { long now = new Date().getTime(); Message res = null; Pattern p = subject != null ? (isRegExpr ? Pattern.compile(subject) : null) : null; synchronized (messages) { for (DefaultMessage m : messages) { if (m.getDelayUntil() <= now) { boolean match = subject == null || (isRegExpr ? m.subjectMatches(p) : m.subjectMatches(subject)); if (match) { res = m; break; } } } } return res; }
生命周期方法
每個 actor 都有生命周期方法。每次與某個特定 ActorManager 關聯時,都會調用 activate 和 deactivate 方法。每次與某個特定的 ActorManager 關聯時還會調用 run 方法,它通常通過自行向 actor 發送啟動消息來啟動該 actor。run 消息開始消息處理。
清單 35. 生命周期方法
@Override public void activate() { // defaults to no action } @Override public void deactivate() { // defaults to no action } /** Do startup processing. */ protected abstract void runBody(); @Override public void run() { runBody(); ((DefaultActorManager) getManager()).awaitMessage(this); } }
DefaultActorManager
以下字段包含 actor 管理器的狀態:
actors 包含向管理器注冊的所有 actor。
runnables 包含已創建但尚未調用其 run 方法的 actor。
waiters 包含所有等待消息的 actor。
threads 包含管理器啟動的所有線程。
請注意,LinkedHashMap 的使用至關重要(對等待者列表尤為如此) ;否則,一些 actor 可能會急需線程。
清單 36. DefaultActorManager 類和狀態
public class DefaultActorManager implements ActorManager { public static final int DEFAULT_ACTOR_THREAD_COUNT = 25; protected static DefaultActorManager instance; public static DefaultActorManager getDefaultInstance() { if (instance == null) { instance = new DefaultActorManager(); } return instance; } protected Map<String , AbstractActor> actors = new LinkedHashMap<String , AbstractActor>(); protected Map<String , AbstractActor> runnables = new LinkedHashMap<String , AbstractActor>(); protected Map<String , AbstractActor> waiters = new LinkedHashMap<String , AbstractActor>(); protected List<Thread> threads = new LinkedList<Thread>();
detachActor 方法打破了 actor 和它的管理器之間的關聯:
清單 37. actor 終止
@Override public void detachActor(Actor actor) { synchronized (actors) { actor.deactivate(); ((AbstractActor)actor).setManager(null); String name = actor.getName(); actors.remove(name); runnables.remove(name); waiters.remove(name); } }
發送方法
send 方法家族將一條消息發送給一個或多個 actor。首先需要檢查每條消息,查看 actor 是 否會接受它。對消息進行排隊後,就會使用 notify 喚醒一個線程來處理消息。在發送到某個類別時,只有該類別中的一個 actor(當前具有最少消息的 actor)會實際收到該消息。awaitMessage 方法在 waiters 列表基礎上對 actor 排隊。
清單 38. DefaultActorManager 類處理一個發送操作
@Override public int send(Message message, Actor from, Actor to) { int count = 0; AbstractActor aa = (AbstractActor) to; if (aa != null) { if (aa.willReceive(message.getSubject())) { DefaultMessage xmessage = (DefaultMessage) ((DefaultMessage) message).assignSender(from); aa.addMessage(xmessage); count++; synchronized (actors) { actors.notifyAll(); } } } return count; } @Override public int send(Message message, Actor from, Actor[] to) { int count = 0; for (Actor a : to) { count += send(message, from, a); } return count; } @Override public int send(Message message, Actor from, Collection<Actor> to) { int count = 0; for (Actor a : to) { count += send(message, from, a); } return count; } @Override public int send(Message message, Actor from, String category) { int count = 0; Map<String, Actor> xactors = cloneActors(); List<Actor> catMembers = new LinkedList<Actor>(); for (String key : xactors.keySet()) { Actor to = xactors.get(key); if (category.equals(to.getCategory()) && (to.getMessageCount() < to.getMaxMessageCount())) { catMembers.add(to); } } // find an actor with lowest message count int min = Integer.MAX_VALUE; Actor amin = null; for (Actor a : catMembers) { int mcount = a.getMessageCount(); if (mcount < min) { min = mcount; amin = a; } } if (amin != null) { count += send(message, from, amin); } return count; } @Override public int broadcast(Message message, Actor from) { int count = 0; Map<String, Actor> xactors = cloneActors(); for (String key : xactors.keySet()) { Actor to = xactors.get(key); count += send(message, from, to); } return count; } public void awaitMessage(AbstractActor a) { synchronized (actors) { waiters.put(a.getName(), a); } }
線程池初始化
管理器提供一個低優先級後台線程池,將它分配給 actor,以便處理收到的消息。(請注 意,為保持簡潔,我們省略了選項處理,它包含在提供的源代碼中。)
清單 39. DefaultActorManager 類初始化
protected static int groupCount; @Override public void initialize(Map<String, Object> options) { int count = getThreadCount(options); ThreadGroup tg = new ThreadGroup("ActorManager" + groupCount++); for (int i = 0; i < count; i++) { Thread t = new Thread(tg, new ActorRunnable(), "actor" + i); threads.add(t); t.setDaemon(true); t.setPriority(Math.max(Thread.MIN_PRIORITY, Thread.currentThread().getPriority() - 1)); } running = true; for (Thread t : threads) { t.start(); } }
每個 actor 由清單 40 中的 Runnable 實現分派。只要准備好的 actor(具有掛起的消息的 actor)可用,就 會將它們分派出去;否則,線程會等待(具有可變的超時)消息到來。
清單 40. 通過一個 Runnable 處理消息
public class ActorRunnable implements Runnable { public void run() { int delay = 1; while (running) { try { if (!procesNextActor()) { synchronized (actors) { actors.wait(delay * 1000); } delay = Math.max(5, delay + 1); } else { delay = 1; } } catch (InterruptedException e) { } catch (Exception e) { System.out.printf("procesNextActor exception %s%n", e); } } } }
procesNextActor 方法首先測試是否存在任何新創建的 actor,然後運行其中一個。否則,它會測試一個等待的 actor。如果有任何等待的 actor,則會分派一個 actor 來處理它的下一條消息。最多一次調用處理一條消息。請注意,所 有同步操作都是使用 actors 字段完成的;這減少了發生死鎖的可能性。
清單 41. 選擇和分派下一個 actor
protected boolean procesNextActor() { boolean run = false, wait = false, res = false; AbstractActor a = null; synchronized (actors) { for (String key : runnables.keySet()) { a = runnables.remove(key); break; } } if (a != null) { run = true; a.run(); } else { synchronized (actors) { for (String key : waiters.keySet()) { a = waiters.remove(key); break; } } if (a != null) { // then waiting for responses wait = true; res = a.receive(); } } return run || res; }
終止方法
可以通過調用 terminate 或 terminateAndWait 方法來請求管理器終止處理。terminate 告訴 所有線程盡快停止處理。terminateAndWait 仍會等待線程完成。
清單 42. DefaultActorManager 類終止
@Override public void terminateAndWait() { terminate(); for (Thread t : threads) { try { t.join(); } catch (InterruptedException e) { } } } boolean running; @Override public void terminate() { running = false; for(Thread t: threads) { t.interrupt(); } synchronized (actors) { for (String key : actors.keySet()) { actors.get(key).deactivate(); } } }
創建方法
create 方法家族構造 actor 並將它們與此管理器關聯。create 通過 actor 的類提供,它 必須有一個默認的構造函數。此外,actor 可在創建時或以後啟動。請注意,此實現需要所有 actor 擴展 AbstractActor 。
清單 43. 創建和啟動 actor
@Override public Actor createAndStartActor(Class<? extends Actor> clazz, String name, Map<String, Object> options) { Actor res = createActor(clazz, name, options); startActor(res); return res; } @Override public Actor createActor(Class<? extends Actor> clazz, String name, Map<String, Object> options) { AbstractActor a = null; synchronized (actors) { if (!actors.containsKey(name)) { try { a = (AbstractActor) clazz.newInstance(); a.setName(name); a.setManager(this); } catch (Exception e) { throw e instanceof RuntimeException ? (RuntimeException) e : new RuntimeException( "mapped exception: " + e, e); } } else { throw new IllegalArgumentException("name already in use: " + name); } } return a; } } @Override public void startActor(Actor a) { a.activate(); synchronized (actors) { String name = a.getName(); actors.put(name, (AbstractActor) a); runnables.put(name, (AbstractActor) a); } }
結束語
送君千裡,終有一別!
在本文中,您學習了如何將一個相對簡單的 actor 系統用於各 種常見的 Java 編程場景和模式。μJavaActors 庫具有靈活的、動態的行為,為 Akka 等更加龐大的 actor 庫提供了一個 基於 Java 的替代方案。
從代碼示例和視頻模擬中可以明顯看到,μJavaActors 可跨一個執行線程池高效地分配 actor 消息處理工作。而且,可在用戶界面中迅速確定是否需要更多線程。該界面還容易確定哪些 actor 渴求工作或者是 否有一些 actor 負載過重。
DefaultActorManager(ActorManager 接口的默認實現)可保證沒有 actor 會一次處 理多條消息。因此這會減輕 actor 作者的負擔,他們無需處理任何重新輸入考慮因素。該實現還不需要 actor 同步,只要 :(1) actor 僅使用私有(實例或方法本地的)數據,(2) 消息參數僅由消息發送者編寫,以及 (3) 僅由消息接收者讀取 。
DefaultActorManager 的兩個重要的設計參數是線程與 actor 的比率 以及要使用的線程總數。線程數量至少應 該與計算機上的處理器一樣多,除非一些線程為其他用途而保留。因為線程可能常常空閒(例如,當等待 I/O 時),所以 正確的比率常常是線程是處理器的 2 倍或多倍。一般而言,應該有足夠的 actor(其實是 actor 之間的消息比率)來保持 線程池中大部分時間都很繁忙。(為了獲得最佳的響應,應該有一些保留線程可用;通常平均 75% 到 80% 的活動比率最佳 。)這意味著 actor 通常比線程更多,因為有時 actor 可能沒有任何要處理的掛起消息。當然,您的情況可能有所不同。 執行等待操作(比如等待一個人為響應)的 actor 將需要更多線程。(線程在等待時變為 actor 專用的,無法處理其他消 息。)
DefaultActorManager 很好地利用了 Java 線程,因為在 actor 處理一條消息時,一個線程僅與一個特定的 actor 關聯;否則,它可供其他 actor 自由使用。這允許一個固定大小的線程池為無限數量的 actor 提供服務。結果,需 要為給定的工作負載創建的線程更少。這很重要,因為線程是重量級的對象,常常被主機操作系統限制於相對較少數量的實 例。μJavaActors 庫正是因為這一點而與為每個 actor 分配一個線程的 actor 系統區分開來;如果 actor 沒有消息要處 理,並且可能限制了可存在的 actor 實例數量,這麼做可以讓線程實際空閒下來。
在線程切換方面,μJavaActors 實現有很大不同。如果在消息處理完成時有一條新消息需要處理,則不會發生線程切換;而是會重復一個簡單循環來處理該 新消息。因此,如果等待的消息數量至少與線程一樣多,則沒有線程是空閒線程,因此不需要進行切換。如果存在足夠的處 理器(至少一個線程一個),則可以有效地將每個線程分配給一個處理器,而從不會發生線程切換。如果緩沖的消息不足, 那麼線程將會休眠,但這並不明顯,因為只有在沒有工作掛起時才會出現負載過重的現象。
用於 JVM 的其他 actor 庫
還存在其他用於 JVM 的 actor 解決方案。表 1 簡短介紹了它們與 μJavaActors 庫的對比特征:
表 1. 對比 JVM actor 庫與 μJavaActors
請注意,表 1 中的一些 JVM actor 解決方案添 加了同步發送功能(也就是發送者需要等待回復)。盡管很方便,但這可能導致更低的消息處理公平性和/或對 actor 的更 少的重新輸入調用。μJavaActors 使用了 POJT(純舊 Java 線程)和標准線程顯示器,它是一種更加傳統的實現。其他這 些方法中的一些方法為提供它們自己的線程模型提供了專門支持。μJavaActors 是一個純 Java 庫;要使用它,僅需確保 它的 JAR 位於類路徑上即可,此外,它不需要字節代碼操作或其他特殊操作。
增強 μJavaActors
當然,還 有改進或擴展 μJavaActors 庫的空間。如果您感興趣,我總結了以下可能性:
在一個類別中重新分配掛起的消息 :目前,在發送時會為消息分配 round-robin,而不會在以後重新均衡。
允許基於優先級的 actor 執行:目前,所有 actor 都在具有同等優先級的線程上執行;如果存在具有不同優先級的線程(或線程池)並且可在條件更改後向這些線程分 配 actor,那麼系統可能更加靈活。
允許優先級消息:目前,消息通常按發送順序處理,允許優先級處理將支持更靈活 的處理。
允許 actor 處理來自多個類別的消息:目前,一次僅允許處理一個類別的消息。
可以通過實現優化來減少 線程切換,進而提高潛在的消息處理速率:這樣做的代價將是更高的復雜性。
分布式 actor:目前,actor 必須都在一 個 JVM 中運行;跨 JVM 執行將是一種強大的擴展。
下載