一個用於實現並行執行的 Java actor 庫

即便 Java 6 和 Java 7 中引入併發性更新,Java 語言仍然沒法讓並行編程變得特別容易。Java 線程、synchronized 代碼塊、wait/notify 和java.util.concurrent 包都擁有本身的位置,但面對多核系統的容量壓力,Java 開發人員正在依靠其餘語言中開創的技術。actor 模型就是這樣一項技術,它已在 Erlang、Groovy 和 Scala 中實現。本文爲那些但願體驗 actor 但又要繼續編寫 Java 代碼的開發人員帶來了 μJavaActors 庫。html

用於 JVM 的另外 3 個 actor 庫

請參閱 「表 1:對比 JVM actor 庫」,快速瞭解 3 個用於 JVM 的流行的 actor 庫與 μJavaActors 的對比特徵。java

μJavaActors 庫 是一個緊湊的庫,用於在 Java 平臺上實現基於 actor 的系統(μ 表示希臘字母 Mμ,意指 「微型」)。在本文中,我使用 μJavaActors 探討 actor 在 Producer/Consumer 和 Map/Reduce 等常見設計模式中的工做原理。正則表達式

您隨時能夠 下載 μJavaActors 庫的源代碼算法

Java 平臺上的 actor 併發性

這個名稱有何含義?具備任何其餘名稱的 actor 也適用!

基於 actor 的系統 經過實現一種消息傳遞 模式,使並行處理更容易編碼。在此模式中,系統中的每一個 actor 均可接收消息;執行該消息所表示的操做;而後將消息發送給其餘 actor(包括它們本身)以執行復雜的操做序列。actor 之間的全部消息是異步的,這意味着發送者會在收到任何回覆以前繼續進行處理。所以,一個 actor 可能終生都陷入接收和處理消息的無限循環中。編程

當使用多個 actor 時,獨立的活動可輕鬆分配到多個可並行執行消息的線程上(進而分配在多個處理器上)。通常而言,每一個 actor 都在一個獨立線程上處理消息。一些 actor 系統靜態地向 actor 分配線程;而其餘系統(好比本文中介紹的系統)則會動態地分配它們。設計模式

μJavaActors 簡介

μJavaActors 是 actor 系統的一個簡單的 Java 實現。只有 1,200 行代碼,μJavaActors 雖然很小,但很強大。在下面的練習中,您將學習如何使用 μJavaActors 動態地建立和管理 actor,將消息傳送給它們。數組

μJavaActors 圍繞 3 個核心界面而構建:安全

  • 消息 是在 actor 之間發送的消息。Message 是 3 個(可選的)值和一些行爲的容器:
    • source 是發送 actor。
    • subject 是定義消息含義的字符串(也稱爲命令)。
    • data 是消息的任何參數數據;一般是一個映射、列表或數組。參數能夠是要處理和/或其餘 actor 要與之交互的數據。
    • subjectMatches() 檢查消息主題是否與字符串或正則表達式匹配。
    μJavaActors 包的默認消息類是 DefaultMessage
  • ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager 擁有如下關鍵行爲或特徵:
    • createActor() 建立一個 actor 並將它與此管理器相關聯。
    • startActor() 啓動一個 actor。
    • detachActor() 中止一個 actor 並將它與此管理器斷開。
    • send()/broadcast() 將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或全部 actor。
    在大部分程序中,只有一個 ActorManager,但若是您但願管理多個線程和/或 actor 池,也能夠有多個 ActorManager。此接口的默認實現是DefaultActorManager
  • Actor 是一個執行單元,一次處理一條消息。Actor 具備如下關鍵行爲或特徵:
    • 每一個 actor 有一個 name,該名稱在每一個 ActorManager 中必須是唯一的。
    • 每一個 actor 屬於一個 category;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬於一個類別。
    • 只要 ActorManager 能夠提供一個執行 actor 的線程,系統就會調用 receive()。爲了保持最高效率,actor 應該迅速處理消息,而不要進入漫長的等待狀態(好比等待人爲輸入)。
    • willReceive() 容許 actor 過濾潛在的消息主題。
    • peek() 容許該 actor 和其餘 actor 查看是否存在掛起的消息(或許是爲了選擇主題)。
    • remove() 容許該 actor 和其餘 actor 刪除或取消任何還沒有處理的消息。
    • getMessageCount() 容許該 actor 和其餘 actor 獲取掛起的消息數量。
    • getMaxMessageCount() 容許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。
    大部分程序都有許多 actor,這些 actor 經常具備不一樣的類型。actor 可在程序啓動時建立或在程序執行時建立(和銷燬)。本文中的 actor 包 包含一個名爲 AbstractActor 的抽象類,actor 實現基於該類。

圖 1 顯示了 actor 之間的關係。每一個 actor 可向其餘 actor 發送消息。這些消息保存在一個消息隊列(也稱爲郵箱;從概念上講,每一個 actor 有一個隊列,當 ActorManager 看到某個線程可用於處理消息時,就會從隊列中刪除該消息,並將它傳送給在線程下運行的 actor,以便處理該消息。網絡

圖 1. actor 之間的關係

actor 經過線程向其餘 actor 發送消息

 

μJavaActors 的並行執行功能

如今您已可開始使用 μJavaActors 實現並行執行了。首先要建立一組 actor。這些是簡單的 actor,由於它們所作的只是延遲少許時間並將消息發送給其餘 actor。這樣作的效果是建立一個消息風暴,您首先會看到如何建立 actor,而後會看到如何逐步分派它們來處理消息。

有兩種消息類型:

  • initialization (init) 會致使 actor 初始化。僅需爲每一個 actor 發送一次這種類型的消息。
  • repeat 會致使 actor 發送 N-1 條消息,其中 N 是一個傳入的消息參數。

清單 1 中的 TestActor 實現從 AbstractActor 繼承的抽象方法。activate 和 deactivate 方法向 actor 通知它的壽命信息;此示例中不會執行任何其餘操做。runBody 方法是在收到任何消息以前、首次建立 actor 的時候調用的。它一般用於將第一批消息引導至 actor。testMessage方法在 actor 即將收到消息時調用;這裏 actor 可拒絕或接受消息。在本例中,actor 使用繼承的 testMessage 方法測試消息接受狀況;所以接受了全部消息。

清單 1. TestActor
  class TestActor extends AbstractActor {

    @Override
    public void activate() {
      super.activate();
    }

    @Override
    public void deactivate() {
      super.deactivate();
    }

    @Override
    protected void runBody() {
      sleeper(1);  // delay up to 1 second
      DefaultMessage dm = new DefaultMessage("init", 8);
      getManager().send(dm, null, this);
    }

    @Override
    protected Message testMessage() {
      return super.testMessage();
    }

loopBody 方法(如清單 2 中所示)在 actor 收到一條消息時調用。在經過較短延遲來模擬某種通常性處理以後,纔開始處理該消息。若是消息爲 「repeat」,那麼 actor 基於 count 參數開始發送另外 N-1 條消息。這些消息經過調用 actor 管理器的 send 方法發送給一個隨機 actor。

清單 2. loopBody()
    @Override
    protected void loopBody(Message m) {
      sleeper(1);
      String subject = m.getSubject();
      if ("repeat".equals(subject)) {
        int count = (Integer) m.getData();
        if (count > 0) {
          DefaultMessage dm = new DefaultMessage("repeat", count - 1);
          String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT);
          Actor to = testActors.get(toName);
          getManager().send(dm, this, to);
        }
      }

若是消息爲 「init」,那麼 actor 經過向隨機選擇的 actor 或一個屬於 common 類別的 actor 發送兩組消息,啓動 repeat 消息隊列。一些消息可當即處理(實際上在 actor 準備接收它們且有一個線程可用時便可處理);其餘消息則必須等待幾秒才能運行。這種延遲的消息處理對本示例不是很重要,但它可用於實現對長期運行的流程(好比等待用戶輸入或等待對網絡請求的響應到達)的輪詢。

清單 3. 一個初始化序列
      else if ("init".equals(subject)) {
        int count = (Integer) m.getData();
        count = rand.nextInt(count) + 1;
        for (int i = 0; i < count; i++) {
          DefaultMessage dm = new DefaultMessage("repeat", count);
          String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT);
          Actor to = testActors.get(toName);
          getManager().send(dm, this, to);
          
          dm = new DefaultMessage("repeat", count);
          dm.setDelayUntil(new Date().getTime() + (rand.nextInt(5) + 1) * 1000);
          getManager().send(dm, this, "common");
        }
      }

不然,代表消息不適合並會報告一個錯誤:

      else {
        System.out.printf("TestActor:%s loopBody unknown subject: %s%n", 
          getName(), subject);
      }
    }
  }

主要程序包含清單 4 中的代碼,它在 common 類別中建立了 2 個 actor,在 default 類別中建立了 5 個 actor,而後啓動它們。而後 main 至多會等待 120 秒(sleeper 等待它的參數值的時間約爲 1000ms),按期顯示進度消息。

清單 4. createActor、startActor
    DefaultActorManager am = DefaultActorManager.getDefaultInstance();
    :
    Map<String, Actor> testActors = new HashMap<String, Actor>();
    for (int i = 0; i < 2; i++) {
        Actor a = am.createActor(TestActor.class, "common" + i);
        a.setCategory("common");
        testActors.put(a.getName(), a);
    }
    for (int i = 0; i < 5; i++) {
        Actor a = am.createActor(TestActor.class, "actor" + i);
        testActors.put(a.getName(), a);
    }
    for (String key : testActors.keySet()) {
       am.startActor(testActors.get(key));
    }    
    for (int i = 120; i > 0; i--) {
        if (i < 10 || i % 10 == 0) {
            System.out.printf("main waiting: %d...%n", i);
        }
        sleeper(1);
    }
    :
    am.terminateAndWait();

跟蹤輸出

要理解剛執行的流程,讓咱們看看來自 actor 的一些跟蹤輸出。(請注意,由於對計數和延遲使用了隨機數,因此每次執行的輸出可能有所不一樣。)在清單 5 中,能夠看到在程序啓動後不久出現的消息。左列(括號中)是執行的線程名稱。在這次運行中,有 25 個線程可用於處理消息。每行的剩餘部分(通過刪減)是跟蹤輸出,顯示了收到的每條消息。請注意,repeat 計數 — 也就是參數數據,它在減小不斷。(另請注意,線程名稱與 actor 的名稱毫無關係,儘管該名稱是以 actor 開頭。)

清單 5. 跟蹤輸出:程序啓動
[main         ] - main waiting: 120...
[actor17      ] - TestActor:actor4 repeat(4)
[actor0       ] - TestActor:actor1 repeat(4)
[actor10      ] - TestActor:common1 repeat(4)
[actor1       ] - TestActor:actor2 repeat(4)
[actor3       ] - TestActor:actor0 init(8)
[actor22      ] - TestActor:actor3 repeat(4)
[actor17      ] - TestActor:actor4 init(7)
[actor20      ] - TestActor:common0 repeat(4)
[actor24      ] - TestActor:actor0 repeat(4)   
[actor0       ] - TestActor:actor1 init(3)
[actor1       ] - TestActor:actor2 repeat(4)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(3)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor24      ] - TestActor:actor0 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor24      ] - TestActor:actor0 repeat(7)   
[actor22      ] - TestActor:actor3 repeat(4)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor22      ] - TestActor:actor3 init(5)
[actor24      ] - TestActor:actor0 repeat(7)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(8)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor17      ] - TestActor:actor4 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor22      ] - TestActor:actor3 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(7)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor0       ] - TestActor:actor1 repeat(3)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor24      ] - TestActor:actor0 repeat(7)   
[actor24      ] - TestActor:actor0 repeat(6)   
[actor10      ] - TestActor:common1 repeat(8)   
[actor17      ] - TestActor:actor4 repeat(7)

在清單 6 中,能夠看到在程序即將結束時出現的消息,這時 repeat 計數已減少。若是觀察此程序的執行,您將可以看到生成各行的速度在逐漸減慢;這是由於生成的消息數量在逐漸減小。若是等待足夠長時間,發送給 actor 的消息會徹底中止(與清單 6 中所示的 common actor 上發生的同樣)。請注意,消息處理工做合理地分散在可用的線程上,而且沒有任何 actor 被綁定到特定的線程上。

清單 6. 跟蹤輸出:程序結束
[main         ] - main waiting: 20...
[actor0       ] - TestActor:actor4 repeat(0)   
[actor2       ] - TestActor:actor2 repeat(1)   
[actor3       ] - TestActor:actor0 repeat(0)   
[actor17      ] - TestActor:actor4 repeat(0)   
[actor0       ] - TestActor:actor1 repeat(2)   
[actor3       ] - TestActor:actor2 repeat(1)   
[actor14      ] - TestActor:actor1 repeat(2)   
[actor5       ] - TestActor:actor4 repeat(0)   
[actor14      ] - TestActor:actor2 repeat(0)   
[actor21      ] - TestActor:actor1 repeat(0)   
[actor14      ] - TestActor:actor0 repeat(1)   
[actor14      ] - TestActor:actor4 repeat(0)   
[actor5       ] - TestActor:actor2 repeat(1)   
[actor5       ] - TestActor:actor4 repeat(1)   
[actor6       ] - TestActor:actor1 repeat(1)   
[actor5       ] - TestActor:actor3 repeat(0)   
[actor6       ] - TestActor:actor2 repeat(1)   
[actor4       ] - TestActor:actor0 repeat(0)   
[actor5       ] - TestActor:actor4 repeat(1)   
[actor12      ] - TestActor:actor1 repeat(0)   
[actor20      ] - TestActor:actor2 repeat(2)   
[main         ] - main waiting: 10...
[actor7       ] - TestActor:actor4 repeat(2)   
[actor23      ] - TestActor:actor1 repeat(0)   
[actor13      ] - TestActor:actor2 repeat(1)   
[actor8       ] - TestActor:actor0 repeat(0)   
[main         ] - main waiting: 9...
[actor2       ] - TestActor:actor1 repeat(0)   
[main         ] - main waiting: 8...
[actor7       ] - TestActor:actor2 repeat(0)   
[actor13      ] - TestActor:actor1 repeat(0)   
[main         ] - main waiting: 7...
[actor2       ] - TestActor:actor2 repeat(2)   
[main         ] - main waiting: 6...
[main         ] - main waiting: 5...
[actor18      ] - TestActor:actor1 repeat(1)   
[main         ] - main waiting: 4...
[actor15      ] - TestActor:actor2 repeat(0)   
[actor16      ] - TestActor:actor1 repeat(1)   
[main         ] - main waiting: 3...
[main         ] - main waiting: 2...
[main         ] - main waiting: 1...
[actor4       ] - TestActor:actor1 repeat(0)   
[actor6       ] - TestActor:actor2 repeat(0)

模擬屏幕截圖

很難從前面的跟蹤信息中全面瞭解 actor 系統的行爲,很大程度上是由於並非全部跟蹤格式都有用。可使用一個相似 actor 模擬執行的快照圖像,以圖形格式查看相同的信息。每一個圖像顯示一段固定時期以後的模擬狀況。如下視頻演示了一些未被代碼示例和屏幕截圖採集到的 Java actor 流程。能夠在本地或在 土豆 上查看下面的視頻。

 

在 此處 查看腳本。

圖 2 顯示了在運行任何模擬以前模擬的用戶界面。請注意右側顯示的模擬菜單。

圖 2. 運行任何模擬以前的 actor 模擬器

運行任何模擬以前的 actor 模擬器

在 此處 查看此圖的完整版本。

屏幕的頂部區域顯示了一個包含多種變體的模擬菜單;除非另行說明,不然下列模擬將如跟蹤輸出和如下屏幕截圖中所示:

  • 倒計時模擬 (0:15) 建立將一個值倒計時到 0 併發送更多請求的 actor。
  • Producer/Consumer 模擬 (2:40) 在經典的 Producer/Consumer 併發性問題上建立了一個變體。
  • Map/Reduce 模擬 (5:28) 建立對 1000 個整數的平方和的並行執行操做。
  • 病毒掃描 模擬 (6:45) 掃描一個磁盤目錄樹來查找 「.txt」 文件(限制掃描的數量),檢測可疑的內容模式。這個沒有 CPU 限制的模擬未在如下屏幕截圖中顯示,但它 視頻演示的一部分。
  • 全部模擬併發運行,但這僅是在視頻演示中 (8:18)。

該視頻格式顯示了按順序運行的全部這些模擬,每一個模擬之間具備較短的暫停時間。

除了 Start 和 Stop 以外,圖 2 中的屏幕截圖還顯示瞭如下控件和設置。(請注意,Stop 不會中止線程,因此某些操做在中止線程後可能仍在進行。)

  • Redistribute 半隨機地在 actor 圓圈中從新分配 actor(默認順序是建立順序)。這使您可以更容易地經過從新放置 actor 來查看分組到一塊兒的鄰近 actor 之間的消息。它還能夠向 actor 分配新顏色。
  • Add Task 和 Remove Task 在啓動工具中添加或刪除任務(線程)。Remove Task 將僅刪除添加的(而不是原始的)任務。
  • Maximum steps(使用值的 log2)限制模擬的持續時間,僅在模擬啓動以前生效。各個步驟大約持續 1 秒。
  • Show actors as transparent 使用戶更容易看到鄰近 actor 之間的消息。不透明的 actor 經常更容易看到。能夠在模擬運行時更改此設置。
  • Number of threads to use spinner 僅在模擬啓動以前生效。許多模擬的運行速度要比更多線程快得多。

控件下面的顯示區域顯示了當前的線程使用狀況(顯示爲過去的 1 秒中的平均值)。大型的中心區域會顯示模擬。底部區域會顯示模擬歷史。右側區域會顯示完整的模擬軌跡。當運行時,模擬框架按以下方式配置:

  • 控制區域是大約每秒更新一次的儀表顯示:
    • 每秒接受的消息數。
    • 每秒完成的消息數。
    • 每秒接受的消息數與完成的消息數的比率。
      若是活動顯示在右側,那麼到達的消息比正在處理的消息要多一些;最終,消息緩衝區會發生溢出。若是活動顯示在左側,正在處理的消息比到達的消息要多一些;最終,系統會空閒下來。平衡的系統會顯示 0 或者僅較長時間內顯示綠色水平線。
  • 中心區域之上是一個包含綠條的網格;每一個綠條表示一個線程(就像外部圓圈中同樣)。全綠的條帶表示線程正被全面利用,全黃的條帶表示線程徹底空閒。
  • 在中央區域,正方形的外環表示線程(在這些模擬中爲 10 個,在之前的跟蹤軌跡中有 25 個)。綠色線程附加到一個 actor 來執行收到的消息;中心的點的顏色表示 actor 類型。接近正方形的數字是當前分配給此線程的 actor 數量(從左側的 0 開始順時針排列到 360 度)。黃色線程是空閒的。
  • 內部的圓環表示 actor;顏色表示類型(在第一個示例中僅有一種類型)。若是 actor 正忙於處理一條消息,它會顯示在更暗的陰影中(若是使用了非透明的 actor,這會更加明顯)。圓圈 (actor) 之間的線表示消息。任何淺紅色的線都是在給定刷新週期中發送的新消息(模擬每秒刷新 10 次);其餘顏色是緩衝的消息(過去發送過來的,但目前仍未處理)。緩衝線在接收端有一個小圓圈;該圓圈隨着緩衝消息數量增長而變大。
  • 最右端顯示輸出軌跡;此軌跡相似於前面探討的軌跡,但更詳細一些。
  • 圖像底部是一組較小的圓圈;每一個圓圈是在過去按期顯示的主要圓圈的縮小版本。這提供了一種查看消息隨時間變化的趨勢的輕鬆方式。若是觀察此歷史,您就會看到消息將迅速積壓,而後逐漸減小。

圖 3 顯示了執行大約 10 秒後的模擬效果。請注意大量的掛起消息,它們是迅速累積起來的。有 34 個 actor,但僅有 10 個線程,因此一些 actor 須要空閒下來。在此時,全部線程都忙於處理消息。

圖 3. 啓動倒計時模擬效果 (0:15)

執行 10 秒時的倒計時模擬

在 此處 查看全圖。

圖 4 是執行大約 30 秒後的模擬。掛起消息的數量已大大減小。因爲消息到達率更低一些,因此只有部分線程在繁忙地處理消息。

圖 4. 中期的倒計時模擬效果

執行 30 秒時的倒計時模擬

在 此處 查看全圖。

圖 5 是執行大約 90 秒後的模擬。如今全部掛起的消息都已處理,所以全部線程都是空閒的。

圖 5. 完成時的倒計時模擬效果

執行 90 秒時的倒計時模擬

在 此處 查看全圖。

 

一個 Producer/Consumer 系統中的 actor

接下來,讓咱們看一下 Producer/Consumer 模式下的 actor 的演示。Producer/Consumer 是多處理器系統的一種最多見的同步模式。在下面的 μJavaActors 演示中,生成者 actor 生成要求使用者 actor 建立各類項的請求。使用者會建立這些項(這須要必定的時間),而後將一條完成消息發送回請求的生成者。

圖 6 顯示了執行大約 30 秒後的模擬效果。請注意,兩種 actor 類型按顏色區分。生成者 actor 首先顯示在屏幕右下側。生成者在運行時建立使用者,因此隨後纔會顯示後者。工做負載隨時間的流逝而緩慢減小,大部分線程都很忙。請注意,生成者會迅速完成它們的工做,以致於它們不多顯示爲活動狀態。

圖 6. 啓動不久後的 Producer/Consumer 模擬 (2:40)

執行 30 秒時的 Producer/Consumer 模擬

在 此處 查看全圖。

圖 7 顯示了執行大約 115 秒後的模擬,這接近程序完成的時間。新請求和掛起的消息的數量已經大大減小。在視頻演示中,您可能注意到,一些 actor 在很短期內顯示爲未填充的圓圈;這些是處理髮送給它們自身的消息的 actor。

圖 7. 接近結束時的 Producer/Consumer 模擬效果

執行 115 秒時的 Producer/Consumer 模擬

在 此處 查看全圖。

ProducerActor

清單 7 顯示了演示中的生成者 actor 的代碼。這裏的 「produceN」 消息已處理。它轉換成爲了一條 「produce1」 消息,該 actor 將該消息發送給本身。預期的響應記錄是一個掛起的回覆計數,以供之後驗證。

清單 7. 生成者 actor
public class ProducerActor extends AbstractActor {
  Map<String , Integer> expected = new ConcurrentHashMap<String
        , Integer>();

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("produceN".equals(subject)) {
      Object[] input = (Object[]) m.getData();
      int count = (Integer) input[0];
      if (count > 0) {
        DefaultActorTest.sleeper(1); // this takes some time
        String type = (String) input[1];
        // request the consumers to consume work (i.e., produce)
        Integer mcount = expected.get(type);
        if (mcount == null) {
          mcount = new Integer(0);
        }
        mcount += count;
        expected.put(type, mcount);

        DefaultMessage dm = new DefaultMessage("produce1", 
          new Object[] { count, type });
        getManager().send(dm, this, this);
      }

在清單 8 中,「produce1」 消息已被處理。若是剩餘計數大於 0,它會轉換爲一條 「construct」 消息併發送給使用者。請注意,此邏輯可能已做爲對計數值的一個 for 循環來完成,而不是從新發送 「produce1」 消息。從新發送該消息經常會帶來更出色的線程負載,尤爲在循環主體會話佔用大量時間的時候。

清單 8. 處理一個生成者請求
    } else if ("produce1".equals(subject)) {
      Object[] input = (Object[]) m.getData();
      int count = (Integer) input[0];
      if (count > 0) {
        sleep(100); // take a little time
        String type = (String) input[1];
        m = new DefaultMessage("construct", type);
        getManager().send(m, this, getConsumerCategory());

        m = new DefaultMessage("produce1", new Object[] { count - 1, type });
        getManager().send(m, this, this);
      }

在清單 9 中,「constructionComplete」 消息(由一個使用者發送)已被處理。它會對掛起的回覆計數進行遞減。若是一切正常,在模擬完成時,全部 actor 和類型值的此計數都將爲 0。

清單 9. constructionComplete
    } else if ("constructionComplete".equals(subject)) {
      String type = (String) m.getData();
      Integer mcount = expected.get(type);
      if (mcount != null) {
        mcount--;
        expected.put(type, mcount);
      }

init」 消息在清單 10 中處理。生成者建立一些使用者 actor,而後向它本身發送多條 produceN 請求。

清單 10. 初始化
    } else if ("init".equals(subject)) {
      // create some consumers; 1 to 3 x consumers per producer
      for (int i = 0; i < DefaultActorTest.nextInt(3) + 1; i++) {
        Actor a = getManager().createAndStartActor(ConsumerActor.class,
            String.format("%s_consumer%02d", getName(), i));
        a.setCategory(getConsumerCategory());
        if (actorTest != null) {
          actorTest.getTestActors().put(a.getName(), a);
        }
      }
      // request myself create some work items
      for (int i = 0; i < DefaultActorTest.nextInt(10) + 1; i++) {
        m = new DefaultMessage("produceN", new Object[] 
             { DefaultActorTest.nextInt(10) + 1,
               DefaultActorTest.getItemTypes()[
                  DefaultActorTest.nextInt(DefaultActorTest.getItemTypes().length)] });
        getManager().send(m, this, this);
      }

清單 11 處理無效的消息:

清單 11. 處理無效的消息
    } else {
      System.out.printf("ProducerActor:%s loopBody unknown subject: %s%n", 
         getName(), subject);
    }
  }

  protected String getConsumerCategory() {
    return getName() + "_consumer";
  }
}

ConsumerActor

使用者(consumer) actor 很簡單。它處理 「construct」 消息並向請求者發送回覆消息。使用者 actor 的代碼如清單 12 所示:

清單 12. 使用者 actor
public class ConsumerActor extends AbstractActor {

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("construct".equals(subject)) {
      String type = (String) m.getData();
      delay(type); // takes ~ 1 to N seconds

      DefaultMessage dm = new 
         DefaultMessage("constructionComplete", type);
      getManager().send(dm, this, m.getSource());
    } else if ("init".equals(subject)) {
      // nothing to do
    } else {
      System.out.printf("ConsumerActor:%s loopBody unknown subject: %s%n", 
        getName(), subject);
    }
  }

清單 13 中處理的生產延遲基於構造的項的類型。從跟蹤軌跡中,您能夠回想起支持的項類型爲 widgetframitfrizzlegothca 和splat。每一個類型須要花不一樣的時間量來構造。

清單 13. 生產延遲
  protected void delay(String type) {
    int delay = 1;
    for (int i = 0; i < DefaultActorTest.getItemTypes().length; i++) {
      if (DefaultActorTest.getItemTypes()[i].equals(type)) {
        break;
      }
      delay++;
    }
    DefaultActorTest.sleeper(DefaultActorTest.nextInt(delay) + 1);
  }
}

Producer/Consumer 模式中的 actor

Producer/Consumer 演示代表建立 actor 實現很是簡單。典型的 actor 會解碼收到的消息並處理它們,就像在一個 case 語句中同樣。實際的處理在本示例中微不足道,只是短暫的時間延遲。在真實應用程序中會更復雜,但不會超過使用標準 Java 同步技術的實現;一般它會簡單得多。

在此演示中,還應注意的是,複雜且重複性的算法可分解爲離散(且經常可重用)的步驟。可爲每一個步驟分配一個不一樣的主題名稱,時每一個主題的情形變得很是簡單。當狀態包含在消息參數中時(好比前面演示的倒計時值),許多 actor 會變得無狀態。這樣的程序很是容易定義和擴展(添加更多 actor 來匹配更多線程),也能夠在多線程環境中安全地運行;這相似於在行數樣式編程中使用不可變的值。

 

actor 的更多模式

出於特定的用途,Producer/Consumer 演示中的 actor 是硬編碼的,但這並非您在編碼 actor 時的唯一選擇。在本節中,您將學習如何在更加通用的模式中使用 actor,首先須要改寫 Gang of Four Command 模式

清單 14 中的 actor 實現大部分 Java 開發人員應該熟悉的 Command 模式的一種變體。在這裏,CommandActor 支持兩種消息:「execute」 和 「executeStatic。」

清單 14. CommandActor
public class CommandActor extends AbstractActor {

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("execute".equals(subject)) {
      excuteMethod(m, false);
    } else if ("executeStatic".equals(subject)) {
      excuteMethod(m, true);
    } else if ("init".equals(subject)) {
      // nothing to do
    } else {
      System.out.printf("CommandActor:%s loopBody unknown subject: %s",
          getName(), subject);
    }
  }

清單 15 中的 executeMethod 方法加載了一個參數化的類,在該類或該類的實例上調用一個方法,而後返回該方法的結果或發生的任何異常。您能夠看到這個簡單的 actor 如何用於運行類路徑上具備合適的執行方法的全部服務類。id 參數由客戶端發送,因此它能夠將響應與建立它們的請求進行關聯。回覆經常按照與發出時不一樣的順序返回。

清單 15. 執行一個參數化方法
  private void excuteMethod(Message m, boolean fstatic) {
    Object res = null;
    Object id = null;
    try {
      Object[] params = (Object[]) m.getData();
      id = params[0];
      String className = (String) params[1];
      params = params.length > 2 ? (Object[]) params[2] : null;
      Class<?> clazz = Class.forName(className);
      Method method = clazz.getMethod(fstatic ? "executeStatic"
          : "execute", new Class[] { Object.class });
      if (Modifier.isStatic(method.getModifiers()) == fstatic) {
        Object target = fstatic ? null : clazz.newInstance();
        res = method.invoke(target, params);
      }
    } catch (Exception e) {
      res = e;
    }

    DefaultMessage dm = new DefaultMessage("executeComplete", new Object[] {
        id, res });
    getManager().send(dm, this, m.getSource());
  }
}

Event Listener 模式中的 actor

清單 16 中的 DelegatingActor 實現一種基於熟悉的 Java Event Listener(或 Callback)模式的相似的通常方法。它將到達的每條消息映射到每一個註冊的監聽器上的一個 onMessage 回調,直到某個回調使用(也就是處理)該事件。這種委託方法可顯著減小 actor 系統與它的消息處理器之間的聯繫。

清單 16. DelegatingActor
public class DelegatingActor extends AbstractActor {
  private List<MessageListener> listeners = new LinkedList<MessageListener>();

  public void addMessageListener(MessageListener ml) {
    if (!listeners.contains(ml)) {
      listeners.add(ml);
    }
  }

  public void removeMessageListener(MessageListener ml) {
    listeners.remove(ml);
  }

  protected void fireMessageListeners(MessageEvent me) {
    for (MessageListener ml : listeners) {
      if (me.isConsumed()) {
        break;
      }
      ml.onMessage(me);
    }
  }

  @Override
  protected void loopBody(Message m) {
    fireMessageListeners(new MessageEvent(this, m));
  }
}

DelegatingActor 類(如清單 17 所示)依賴於 MessageEvent 和 MessageListener 類:

清單 17. DelegatingActor
/** Defines a message arrival event. */
public static class MessageEvent extends EventObject {
  private Message message;

  public Message getMessage() {
    return message;
  }

  public void setMessage(Message message) {
    this.message = message;
  }

  private boolean consumed;

  public boolean isConsumed() {
    return consumed;
  }

  public void setConsumed(boolean consumed) {
    this.consumed = consumed;
  }

  public MessageEvent(Object source, Message msg) {
    super(source);
    setMessage(msg);
  }
}

/** Defines the message arrival call back. */
public interface MessageListener {
  void onMessage(MessageEvent me);
}

DelegatingActor 的一種示例用法如清單 18 所示:

清單 18. DelegatingActor 的示例用法
public static void addDelegate(DelegatingActor da) {
  MessageListener ml = new Echo("Hello world!");
  da.addMessageListener(ml);
}
	
	
public class Echo implements MessageListener {
  protected String message;

  public Echo(String message) {
    this.message = message;
  }

  @Override
  public void onMessage(MessageEvent me) {
    if ("echo".equals(me.getMessage().getSubject())) {
      System.out.printf("%s says \"%s\".%n", 
         me.getMessage().getSource(), message);
      me.setConsumed(true);
    }
  }
}

Map/Reduce 模式中的 actor

清單 14 到清單 18 中的示例 actor 簡單且一目瞭然,由於消息僅朝一個方向發送。若是該行爲須要反饋(好比當一個流程只有在處理了全部之前的消息後才能繼續時),狀況可能變得更加複雜。例如,請考慮這樣一種 Map/Reduce 實現,其中的 reduce 階段只有在 map 階段完成後才能開始。

Map/Reduce 用於在處理大量數據的程序上實現並行處理。在下面的示例中,map 函數接受一個較大的項列表,而後將它分解爲分區,發送一條消息來映射每一個分區。我選擇在每一個映射請求上遞增一個消息計數,讓分區的映射處理器發送一條會遞減該計數的回覆。當計數爲 0 時,全部映射已完成且 reduce 階段能夠啓動。相似地,reduce 階段對該列表分區(再次實現並行性)併發送消息來 reduce 分區。像 map 階段中同樣,reduce也會統計它的消息,因此能夠檢測到遞減操做的完成。要處理的值列表和計數在每一個消息中做爲消息傳輸。

對於本示例,我對許多主題使用了同一種 actor 類型。您也可使用多種 actor 類型,爲每一個 actor 使用更少的主題(最少 1 個)。

圖 8 是執行大約 20 秒後的 Map/Reduce 模擬。這是一個繁忙的處理階段,因此線程都被處理消息所佔用。

圖 8. 啓動不久後的 Map/Reduce (5:28)

執行 20 秒時的 Map/Reduce 模擬

在 此處 查看全圖。

使用 MapReduceer 進行映射和縮減

請注意,此實現是可插拔的;它可運行 MapReduceer 接口的任何實現,如清單 19 所示。

清單 19. MapReduceer
public interface MapReduceer {
  /**
   * Map (in place) the elements of an array.
   * 
   * @param values elements to map
   * @param start start position in values
   * @param end end position in values
   */
  void map(Object[] values, int start, int end);

  /**
   * Reduce the elements of an array.
   * 
   * @param values elements to reduce
   * @param start start position in values
   * @param end end position in values
   * @param target place to set reduced value
   * @param posn position in target to place the value
   */
  void reduce(Object[] values, int start, int end, Object[] target, int posn);
}

例如,您可使用 MapReduceer 計算一組整數的平方和,如清單 20 所示:

清單 20. MapReduceer 計算
public class SumOfSquaresReducer implements MapReduceer {
  @Override
  public void map(Object[] values, int start, int end) {
    for (int i = start; i <= end; i++) {
      values[i] = ((BigInteger) values[i]).multiply((BigInteger) values[i]);
      sleep(200); // fake taking time
    }
  }

  @Override
  public void reduce(Object[] values, int start, int end, Object[] target, int posn) {
    BigInteger res = new BigInteger("0");
    for (int i = start; i <= end; i++) {
      res = res.add((BigInteger) values[i]);
      sleep(100); // fake taking time
    }
    target[posn] = res;
  }
}

MapReduceActor

Map/Reduce actor 分解爲多個主題,每一個主題具備一個簡單的任務。您將在下面的代碼示例中看到它們每個。您也能夠在視頻演示中查看 Map/Reduce 操做;觀看模擬,而後研究代碼示例,這會讓您很是清楚地瞭解如何使用 actor 實現 Map/Reduce。(請注意,如下清單中的主題順序可按任意多種方式分解;我將示例代碼設計爲包含許屢次發送,以讓視頻演示更有趣。)

mapReduce 主題(如清單 21 所示)經過對輸入數組分區來啓動 Map/Reduce,它經過發送 createPartition 消息來進行分區。Map 和 Redu測 參數是在一個 MapReduceParameters 實例中提供的,該實例根據須要進行了克隆和修改,而後傳遞出去。請注意,該操做不須要時間延遲;我添加它們是爲了確保將在用戶界面中看到模擬。

清單 21. mapReduce
  @Override
  protected void loopBody(Message m) {
    ActorManager manager = getManager();
    String subject = m.getSubject();
    if ("mapReduce".equals(subject)) {
      try {
        MapReduceParameters p = (MapReduceParameters) m.getData();
        int index = 0;
        int count = (p.end - p.start + 1 + partitionSize - 1) / partitionSize;
        sleep(1000);
        // split up into partition size chunks
        while (p.end - p.start + 1 >= partitionSize) {
          MapReduceParameters xp = new MapReduceParameters(p);
          xp.end = xp.start + partitionSize - 1;
          DefaultMessage lm = new DefaultMessage("createPartition", 
            new Object[] { xp, index, count });
          manager.send(lm, this, getCategory());
          p.start += partitionSize;
          index++;
        }
        if (p.end - p.start + 1 > 0) {
          DefaultMessage lm = new DefaultMessage("createPartition", 
            new Object[] { p, index, count });
          manager.send(lm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("mapFailed", m, e);
      }
}

createPartition 主題建立了更多 actor,並將請求轉發給一個工做線程,如清單 22 所示。請注意,createMapReduceActor 方法在它將建立的 actor 數量上有一個上限(目前爲 25)。

清單 22. createPartition
    } else if ("createPartition".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(500);
        createMapReduceActor(this);
        DefaultMessage lm = new DefaultMessage("mapWorker", 
          new Object[] { p, index, count });
        manager.send(lm, this, getCategory());
      } catch (Exception e) {
        triageException("createPartitionFailed", m, e);
      }
}

清單 23 中的 mapWorker 主題在其分區上經過提供的 MapReducer 調用 map 操做,而後在回覆中代表映射分區是完整的:

清單 23. mapWorker
    } else if ("mapWorker".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.mr.map(p.values, p.start, p.end);
        DefaultMessage rm = new DefaultMessage("mapResponse", 
          new Object[] { p, index, count });
        manager.send(rm, this, getCategoryName());
      } catch (Exception e) {
        triageException("mapWorkerFailed", m, e);
      }
}

而後,清單 24 中的 mapResponse 主題會完成 MapReduceParameters 實例(它包含計數)並啓動 Reduce 流程:

清單 24. mapResponse
    } else if ("mapResponse".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.complete();
        DefaultMessage rm = new DefaultMessage("reduce", 
          new Object[] { p, index, count });
        manager.send(rm, this, getCategoryName());
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

接下來,reduce 消息會將請求轉發給某個工做線程,如清單 25 所示:

清單 25. reduce
    } else if ("reduce".equals(subject)) {
      try {
        MapReduceParameters p = null;
        int index = 0, count = 0;
        Object o = m.getData();
        if (o instanceof MapReduceParameters) {
          p = (MapReduceParameters) o;
        } else {
          Object[] oa = (Object[]) o;
          p = (MapReduceParameters) oa[0];
          index = (Integer) oa[1];
          count = (Integer) oa[2];
        }
        sleep(100);
        if (p.end - p.start + 1 > 0) {
          createMapReduceActor(this);
          MapReduceParameters xp = new MapReduceParameters(p);
          DefaultMessage lm = new DefaultMessage("reduceWorker", 
            new Object[] { xp, index, count });
          manager.send(lm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("reduceFailed", m, e);
      }
}

清單 26 中的 reduceWorker 主題在其分區上經過提供的 MapReducer 調用 reduce 操做,而後在回覆中代表 Reduce 操做已完成。若是全部 Reduce 操做都已完成,則會在回覆中代表 Map/Reduce 操做已完成。

清單 26. reduceWorker
    } else if ("reduceWorker".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        if (index >= 0) {
          p.mr.reduce(p.values, p.start, p.end, p.target, index);
          DefaultMessage rm = new DefaultMessage("reduceResponse", 
            new Object[] { p, index, count });
          manager.send(rm, this, getCategory());
        } else {
          Object[] res = new Object[1];
          p.mr.reduce(p.target, 0, count - 1, res, 0);
          DefaultMessage rm = new DefaultMessage("done", 
            new Object[] { p, res[0] });
          manager.send(rm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("reduceWorkerFailed", m, e);
      }
}

接下來,清單 27 中的 reduceResponse 主題會完成該分區,並測試全部分區是否已完成,而後代表結果:

清單 27. reduceResponse
    } else if ("reduceResponse".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.complete();
        if (p.isSetComplete()) {
          if (count > 0) {
            createMapReduceActor(this);
            MapReduceParameters xp = new MapReduceParameters(p);
            DefaultMessage lm = new DefaultMessage("reduceWorker", 
              new Object[] { xp, -1, count });
            manager.send(lm, this, getCategory());
          }
        }
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

最後,清單 28 中的 done 主題會報告結果:

清單 28. done
    } else if ("done".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        Object res = oa[1];
        sleep(100);
        System.out.printf("**** mapReduce done with result %s", res);
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

繼續執行循環,init 主題啓動另外一個 Map/Reduce 流程,如清單 29 中所示。爲每一個 Map/Reduce 提供一個不一樣的 「集合」 名稱,使多個 Map/Reduce 可同時運行。

清單 29. 初始化另外一個 Map/Reduce
    } else if ("init".equals(subject)) {
      try {
        Object[] params = (Object[]) m.getData();
        if (params != null) {
          Object[] values = (Object[]) params[0];
          Object[] targets = (Object[]) params[1];
          Class clazz = (Class) params[2];
          MapReduceer mr = (MapReduceer) clazz.newInstance();
          sleep(2 * 1000);
          MapReduceParameters p = new MapReduceParameters("mrSet_" + setCount++, 
            values, targets, mr, this);
          DefaultMessage rm = new DefaultMessage("mapReduce", p);
          manager.send(rm, this, getCategoryName());
        }
      } catch (Exception e) {
        triageException("initFailed", m, e);
      }
    } else {
      System.out.printf("**** MapReduceActor:%s loopBody unexpected subject: %s", 
        getName(), subject);
    }
  }
}

Map/Reduce 主要過程

清單 30 中的 MapReduceActor 實現建立了一些數據值,並在這些數據上運行一個 Map/Reduce。它將分區大小設置爲 10。

清單 30. Map/Reduce 主要過程
BigInteger[] values = new BigInteger[1000];
for (int i = 0; i < values.length; i++) {
  values[i] = new BigInteger(Long.toString((long)rand.nextInt(values.length)));
}
BigInteger[] targets = new BigInteger[Math.max(1, values.length / 10)];

// start at least 5 actors
DefaultActorManager am = new DefaultActorManager();
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
        
DefaultMessage dm = new DefaultMessage("init", new Object[] 
    { values, targets, SumOfSquaresReducer.class });
am.send(dm, null, MapReduceActor.getCategoryName());

Map/Reduce 是一種最廣泛的分而治之設計模式。從最基本的函數編程算法一直到大規模並行處理(Google 用於構建它本身的 Web 搜索引擎索引的類型),均可以看見到它的身影。μJavaActors 庫可以以某種直觀的方式實現這一高級模式,這凸顯了它的強大功能以及潛在的用途。

 

μJavaActors 庫的內幕

管理器對 actor 說:不要找我;我會去找您。

您已看到如何使用 actor 將一些常見的面向對象模式用於其餘用途。如今能夠考慮一下 μJavaActors 系統的實現細節,即 AbstractActor 和DefaultActorManager 類。我將僅討論每一個類的關鍵方法;您能夠查看 μJavaActors 源代碼 來獲取更多實現細節。

AbstractActor

每一個 actor 都知道管理它的 ActorManager。actor 使用該管理器幫助它將消息發送給其餘 actor。

在清單 31 中,receive 方法有條件地處理一條消息。若是 testMessage 方法返回 null,那麼將不會使用任何消息。不然,會從 actor 的消息隊列中刪除消息,並經過調用 loopBody 方法來處理它。每一個具體的 actor 子類都必須提供此方法。不管在哪一種狀況下,actor 都會經過調用管理器的 awaitMessage 方法來等待更多消息傳來。

清單 31. AbstractActor 實現 DefaultActorManager
public abstract class AbstractActor implements Actor {
  protected DefaultActorManager manager;

  @Override
  public boolean receive() {
    Message m = testMessage();
    boolean res = m != null;
    if (res) {
      remove(m);
      try {
        loopBody(m);
      } catch (Exception e) {
        System.out.printf("loop exception: %s%n", e);
      }
    }
    manager.awaitMessage(this);
    return res;
  }

  abstract protected void loopBody(Message m);

每一個 actor 均可以實現 willReceive 方法來控制將接受哪些消息主題(代表它將放在消息列表中);默認狀況下,會接受全部具備非空主題的消息。每一個 actor 還能夠實現 testMessage 方法來檢查是否有消息可供處理(也就是說,它存在於消息列表中);默認狀況下,這一監督工做是經過使用 peekNext 方法來實現的。

清單 32. willReceive()、testMessage() 和 peekNext()
  @Override
  public boolean willReceive(String subject) {
    return !isEmpty(subject); 
  }

  protected Message testMessage() {
    return getMatch(null, false);
  }

  protected Message getMatch(String subject, boolean isRegExpr) {
    Message res = null;
    synchronized (messages) {
      res = peekNext(subject, isRegExpr);
    }
    return res;
  }

消息容量

actor 可具備無限 或有限 的消息容量。通常而言,有限的容量更好,由於它可幫助檢測不受控制的消息發送者。任何客戶端(但一般是ActorManager)都可向 actor 添加未經篩選的消息。請注意,對 messages 列表的全部訪問都是異步的。

清單 33. 消息處理
  public static final int DEFAULT_MAX_MESSAGES = 100;
  protected List<DefaultMessage> messages = new LinkedList<DefaultMessage>();

  @Override
  public int getMessageCount() {
    synchronized (messages) {
      return messages.size();
    }
  }

  @Override
  public int getMaxMessageCount() {
    return DEFAULT_MAX_MESSAGES;
  }

  public void addMessage(Message message) {
    synchronized (messages) {
      if (messages.size() < getMaxMessageCount()) {
        messages.add(message);
      } else {
        throw new IllegalStateException("too many messages, cannot add");
      }
    }
  }

  @Override
  public boolean remove(Message message) {
    synchronized (messages) {
      return messages.remove(message);
    }
  }

消息匹配

客戶端(具體來說是 actor 自己)可檢查一個 actor 是否擁有掛起的消息。這可用於不按發送順序處理消息,或者爲某些主題提供優先級。消息匹配是經過測試消息主題與一個字符串值的同等性來完成的,或者經過將一個正則表達式與一個參數值匹配來完成的。null 主題匹配任何消息。再次提醒,請注意,對消息列表的全部訪問都是異步的。

清單 34. peekNext()
  @Override
  public Message peekNext() {
    return peekNext(null);
  }

  @Override
  public Message peekNext(String subject) {
    return peekNext(subject, false);
  }

  @Override
  public Message peekNext(String subject, boolean isRegExpr) {
    long now = new Date().getTime();
    Message res = null;
    Pattern p = subject != null ? (isRegExpr ? Pattern.compile(subject) : null) : null;
    synchronized (messages) {
      for (DefaultMessage m : messages) {
        if (m.getDelayUntil() <= now) {
          boolean match = subject == null || 
            (isRegExpr ? m.subjectMatches(p) : m.subjectMatches(subject));
          if (match) {
            res = m;
            break;
          }
        }
      }
    }
    return res;
  }

生命週期方法

每一個 actor 都有生命週期方法。每次與某個特定 ActorManager 關聯時,都會調用 activate 和 deactivate 方法。每次與某個特定的ActorManager 關聯時還會調用 run 方法,它一般經過自行向 actor 發送啓動消息來啓動該 actor。run 消息開始消息處理。

清單 35. 生命週期方法
  @Override
  public void activate() {
    // defaults to no action
  }

  @Override
  public void deactivate() {
    // defaults to no action
  }

  /** Do startup processing. */
  protected abstract void runBody();

  @Override
  public void run() {
    runBody();
    ((DefaultActorManager) getManager()).awaitMessage(this);
  }
}

DefaultActorManager

如下字段包含 actor 管理器的狀態:

  • actors 包含向管理器註冊的全部 actor。
  • runnables 包含已建立但還沒有調用其 run 方法的 actor。
  • waiters 包含全部等待消息的 actor。
  • threads 包含管理器啓動的全部線程。

請注意,LinkedHashMap 的使用相當重要(對等待者列表尤其如此);不然,一些 actor 可能會急需線程。

清單 36. DefaultActorManager 類和狀態
public class DefaultActorManager implements ActorManager {

  public static final int DEFAULT_ACTOR_THREAD_COUNT = 25;

  protected static DefaultActorManager instance;
  public static DefaultActorManager getDefaultInstance() {
    if (instance == null) {
      instance = new DefaultActorManager();
    }
    return instance;
  }

  protected Map<String , AbstractActor> actors = 
    new LinkedHashMap<String , AbstractActor>();

  protected Map<String , AbstractActor> runnables = 
    new LinkedHashMap<String , AbstractActor>();

  protected Map<String , AbstractActor> waiters = 
    new LinkedHashMap<String , AbstractActor>();

  protected List<Thread> threads = new LinkedList<Thread>();

detachActor 方法打破了 actor 和它的管理器之間的關聯:

清單 37. actor 終止
  @Override
  public void detachActor(Actor actor) {
    synchronized (actors) {
      actor.deactivate();
      ((AbstractActor)actor).setManager(null);
      String name = actor.getName();
      actors.remove(name);
      runnables.remove(name);
      waiters.remove(name);
    }
  }

發送方法

send 方法家族將一條消息發送給一個或多個 actor。首先須要檢查每條消息,查看 actor 是否會接受它。對消息進行排隊後,就會使用 notify 喚醒一個線程來處理消息。在發送到某個類別時,只有該類別中的一個 actor(當前具備最少消息的 actor)會實際收到該消息。awaitMessage 方法在 waiters 列表基礎上對 actor 排隊。

清單 38. DefaultActorManager 類處理一個髮送操做
  @Override
  public int send(Message message, Actor from, Actor to) {
    int count = 0;
    AbstractActor aa = (AbstractActor) to;
    if (aa != null) {
      if (aa.willReceive(message.getSubject())) {
        DefaultMessage xmessage = (DefaultMessage) 
           ((DefaultMessage) message).assignSender(from);
        aa.addMessage(xmessage);
        count++;
        synchronized (actors) {
          actors.notifyAll();
        }
      }
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, Actor[] to) {
    int count = 0;
    for (Actor a : to) {
      count += send(message, from, a);
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, Collection<Actor> to) {
    int count = 0;
    for (Actor a : to) {
      count += send(message, from, a);
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, String category) {
    int count = 0;
    Map<String, Actor> xactors = cloneActors();
    List<Actor> catMembers = new LinkedList<Actor>();
    for (String key : xactors.keySet()) {
      Actor to = xactors.get(key);
      if (category.equals(to.getCategory()) && 
            (to.getMessageCount() < to.getMaxMessageCount())) {
        catMembers.add(to);
      }
    }
    // find an actor with lowest message count
    int min = Integer.MAX_VALUE;
    Actor amin = null;
    for (Actor a : catMembers) {
      int mcount = a.getMessageCount();
      if (mcount < min) {
        min = mcount;
        amin = a;
      }
    }
    if (amin != null) {
      count += send(message, from, amin);
    }
    return count;
  }

  @Override
  public int broadcast(Message message, Actor from) {
    int count = 0;
    Map<String, Actor> xactors = cloneActors();
    for (String key : xactors.keySet()) {
      Actor to = xactors.get(key);
      count += send(message, from, to);
    }
    return count;
  }

  public void awaitMessage(AbstractActor a) {
    synchronized (actors) {
      waiters.put(a.getName(), a);
    }
  }

線程池初始化

管理器提供一個低優先級後臺線程池,將它分配給 actor,以便處理收到的消息。(請注意,爲保持簡潔,咱們省略了選項處理,它包含在提供的源代碼中。)

清單 39. DefaultActorManager 類初始化
  protected static int groupCount;

  @Override
  public void initialize(Map<String, Object> options) {
    int count = getThreadCount(options);
    ThreadGroup tg = new ThreadGroup("ActorManager" + groupCount++);
    for (int i = 0; i < count; i++) {
      Thread t = new Thread(tg, new ActorRunnable(), "actor" + i);
      threads.add(t);
      t.setDaemon(true);
      t.setPriority(Math.max(Thread.MIN_PRIORITY, 
         Thread.currentThread().getPriority() - 1));
    }
    running = true;
    for (Thread t : threads) {
      t.start();
    }
  }

每一個 actor 由清單 40 中的 Runnable 實現分派。只要準備好的 actor(具備掛起的消息的 actor)可用,就會將它們分派出去;不然,線程會等待(具備可變的超時)消息到來。

清單 40. 經過一個 Runnable 處理消息
  public class ActorRunnable implements Runnable {
    public void run() {
      int delay = 1;
      while (running) {
        try {
          if (!procesNextActor()) {
            synchronized (actors) {
              actors.wait(delay * 1000);
            }
            delay = Math.max(5, delay + 1);
          } else {
            delay = 1;
          }
        } catch (InterruptedException e) {
        } catch (Exception e) {
          System.out.printf("procesNextActor exception %s%n", e);
        }
      }
    }
  }

procesNextActor 方法首先測試是否存在任何新建立的 actor,而後運行其中一個。不然,它會測試一個等待的 actor。若是有任何等待的 actor,則會分派一個 actor 來處理它的下一條消息。最多一次調用處理一條消息。請注意,全部同步操做都是使用 actors 字段完成的;這減小了發生死鎖的可能性。

清單 41. 選擇和分派下一個 actor
  protected boolean procesNextActor() {
    boolean run = false, wait = false, res = false;
    AbstractActor a = null;
    synchronized (actors) {
      for (String key : runnables.keySet()) {
        a = runnables.remove(key);
        break;
      }
    }
    if (a != null) {
      run = true;
      a.run();
    } else {
      synchronized (actors) {
        for (String key : waiters.keySet()) {
          a = waiters.remove(key);
          break;
        }
      }
      if (a != null) {
        // then waiting for responses
        wait = true;
        res = a.receive();
      }
    }
    return run || res;
  }

終止方法

能夠經過調用 terminate 或 terminateAndWait 方法來請求管理器終止處理。terminate 告訴全部線程儘快中止處理。terminateAndWait 仍會等待線程完成。

清單 42. DefaultActorManager 類終止
@Override
  public void terminateAndWait() {
    terminate();
    for (Thread t : threads) {
      try {
        t.join();
      } catch (InterruptedException e) {
      }
    }
  }

  boolean running;

  @Override
  public void terminate() {
    running = false;
    for(Thread t: threads) {
      t.interrupt();
    }
    synchronized (actors) {
      for (String key : actors.keySet()) {
        actors.get(key).deactivate();
      }
    }
  }

建立方法

create 方法家族構造 actor 並將它們與此管理器關聯。create 經過 actor 的類提供,它必須有一個默認的構造函數。此外,actor 可在建立時或之後啓動。請注意,此實現須要全部 actor 擴展 AbstractActor

清單 43. 建立和啓動 actor
@Override
  public Actor createAndStartActor(Class<? extends Actor> clazz, String name, 
        Map<String, Object> options) {
    Actor res = createActor(clazz, name, options);
    startActor(res);
    return res;
  }

  @Override
  public Actor createActor(Class<? extends Actor> clazz, String name, 
       Map<String, Object> options) {
    AbstractActor a = null;
    synchronized (actors) {
      if (!actors.containsKey(name)) {
        try {
          a = (AbstractActor) clazz.newInstance();
          a.setName(name);
          a.setManager(this);
        } catch (Exception e) {
          throw e instanceof RuntimeException ? 
             (RuntimeException) e : new RuntimeException(
              "mapped exception: " + e, e);
        }
      } else {
        throw new IllegalArgumentException("name already in use: " + name);
      }
    }
    return a;
  }
}

  @Override
  public void startActor(Actor a) {
    a.activate();
    synchronized (actors) {
      String name = a.getName();
      actors.put(name, (AbstractActor) a);
      runnables.put(name, (AbstractActor) a);
    }
  }
 

結束語

送君千里,終有一別!

在本文中,您學習瞭如何將一個相對簡單的 actor 系統用於各類常見的 Java 編程場景和模式。μJavaActors 庫具備靈活的、動態的行爲,爲 Akka 等更加龐大的 actor 庫提供了一個基於 Java 的替代方案。

從代碼示例和視頻模擬中能夠明顯看到,μJavaActors 可跨一個執行線程池高效地分配 actor 消息處理工做。並且,可在用戶界面中迅速肯定是否須要更多線程。該界面還容易肯定哪些 actor 渴求工做或者是否有一些 actor 負載太重。

DefaultActorManagerActorManager 接口的默認實現)可保證沒有 actor 會一次處理多條消息。所以這會減輕 actor 做者的負擔,他們無需處理任何從新輸入考慮因素。該實現還不須要 actor 同步,只要:(1) actor 僅使用私有(實例或方法本地的)數據,(2) 消息參數僅由消息發送者編寫,以及 (3) 僅由消息接收者讀取。

DefaultActorManager 的兩個重要的設計參數是線程與 actor 的比率 以及要使用的線程總數。線程數量至少應該與計算機上的處理器同樣多,除非一些線程爲其餘用途而保留。由於線程可能經常空閒(例如,當等待 I/O 時),因此正確的比率經常是線程是處理器的 2 倍或多倍。通常而言,應該有足夠的 actor(實際上是 actor 之間的消息比率)來保持線程池中大部分時間都很繁忙。(爲了得到最佳的響應,應該有一些保留線程可用;一般平均 75% 到 80% 的活動比率最佳。)這意味着 actor 一般比線程更多,由於有時 actor 可能沒有任何要處理的掛起消息。固然,您的狀況可能有所不一樣。執行等待操做(好比等待一我的爲響應)的 actor 將須要更多線程。(線程在等待時變爲 actor 專用的,沒法處理其餘消息。)

DefaultActorManager 很好地利用了 Java 線程,由於在 actor 處理一條消息時,一個線程僅與一個特定的 actor 關聯;不然,它可供其餘 actor 自由使用。這容許一個固定大小的線程池爲無限數量的 actor 提供服務。結果,須要爲給定的工做負載建立的線程更少。這很重要,由於線程是重量級的對象,經常被主機操做系統限制於相對較少數量的實例。μJavaActors 庫正是由於這一點而與爲每一個 actor 分配一個線程的 actor 系統區分開來;若是 actor 沒有消息要處理,而且可能限制了可存在的 actor 實例數量,這麼作可讓線程實際空閒下來。

在線程切換方面,μJavaActors 實現有很大不一樣。若是在消息處理完成時有一條新消息須要處理,則不會發生線程切換;而是會重複一個簡單循環來處理該新消息。所以,若是等待的消息數量至少與線程同樣多,則沒有線程是空閒線程,所以不須要進行切換。若是存在足夠的處理器(至少一個線程一個),則能夠有效地將每一個線程分配給一個處理器,而從不會發生線程切換。若是緩衝的消息不足,那麼線程將會休眠,但這並不明顯,由於只有在沒有工做掛起時纔會出現負載太重的現象。

用於 JVM 的其餘 actor 庫

還存在其餘用於 JVM 的 actor 解決方案。表 1 簡短介紹了它們與 μJavaActors 庫的對比特徵:

表 1. 對比 JVM actor 庫與 μJavaActors
名稱 訪問地址 描述 與 μJavaActors 對比
Kilim http://www.malhar.net/sriram/kilim/ 一個支持基於輕型線程的多生成者、單使用者郵箱模型的 Java 庫。 Kilim 須要字節代碼調整。在 μJavaActors 中,每一個 actor 也是其自身的郵箱,因此不須要獨立的郵箱對象。
Akka http://akka.io/ 嘗試使用函數語言模擬 actor 的模式匹配,通常使用 instanceof 類型檢查(但 μJavaActors 通常使用字符串同等性或正則表達式匹配)。 Akka 功能更多(好比支持分佈式 actor),所以比 μJavaActors 更大且有可能更復雜。
GPars http://gpars.codehaus.org/Actor Groovy Actor 庫。 相似於 μJavaActors,但更適合 Groovy 開發人員。

請注意,表 1 中的一些 JVM actor 解決方案添加了同步發送功能(也就是發送者須要等待回覆)。儘管很方便,但這可能致使更低的消息處理公平性和/或對 actor 的更少的從新輸入調用。μJavaActors 使用了 POJT(純舊 Java 線程)和標準線程顯示器,它是一種更加傳統的實現。其餘這些方法中的一些方法爲提供它們本身的線程模型提供了專門支持。μJavaActors 是一個純 Java 庫;要使用它,僅需確保它的 JAR 位於類路徑上便可,此外,它不須要字節代碼操做或其餘特殊操做。

加強 μJavaActors

固然,還有改進或擴展 μJavaActors 庫的空間。若是您感興趣,我總結了如下可能性:

  • 在一個類別中從新分配掛起的消息:目前,在發送時會爲消息分配 round-robin,而不會在之後從新均衡。
  • 容許基於優先級的 actor 執行:目前,全部 actor 都在具備同等優先級的線程上執行;若是存在具備不一樣優先級的線程(或線程池)而且可在條件更改後向這些線程分配 actor,那麼系統可能更加靈活。
  • 容許優先級消息:目前,消息一般按發送順序處理,容許優先級處理將支持更靈活的處理。
  • 容許 actor 處理來自多個類別的消息:目前,一次僅容許處理一個類別的消息。
  • 能夠經過實現優化來減小線程切換,進而提升潛在的消息處理速率:這樣作的代價將是更高的複雜性。
  • 分佈式 actor:目前,actor 必須都在一個 JVM 中運行;跨 JVM 執行將是一種強大的擴展。
相關文章
相關標籤/搜索