Akka和μJavaActorsμJavaActors均是java的Actor庫,其中Akka提供了叫爲完整的Actor開發框架,較爲龐大,學習成本很高,μJavaActors 是一個輕量級Actor庫,大約僅有1200行代碼,比較適合入門。java
Akka是一個至關成熟、強大的庫,github上download下的是Akka的源碼,應該使用sbt構建的工程,若是沒有使用sbt經驗,想導出jar還挺不容易的,推薦Akka官網下載akka各個組件的jar去使用,簡單介紹一下helloworld 級別Akka的demo。git
1.Akka的主要組件github
akka-actor.jar : Actor核心組件了,定義了Acotr核心類正則表達式
akka-slf4f.jar : SLF4F Logger的支持,一個打log的組件,不用太關注
redis
akka-remote.jar : Actor作遠程調用的jar,相似RFC吧
數組
akka-cluster : actor作集羣管理組件
框架
akka-camel.jar : 對Apache Camel 集成接口
eclipse
scala-library-2.11.8.jar : akka核心應該是Scala寫的,這個組件就是對akka的核心支持ide
Akka還有不少組件,不過對於hello world級的程序簡單瞭解幾個就ok了。工程是基於eclipse的,須要包含下面幾個基礎的組件:學習
編寫兩個Actor:
package demo02; import akka.actor.UntypedActor; /* * UntypedAcotr是無類型Actor的一個抽象類,繼承與核心類Actor */ public class Greeter extends UntypedActor { public static enum Msg{ GREET , DONE; } /** * 每一個Actor必須實現OnReceive,當該Actor收到消息調用該方法 */ @Override public void onReceive(Object msg) throws Throwable { if(msg == Msg.GREET){ System.out.println("Hello world"); /** * 這裏吐槽一下Akka對於發消息的設計,發送消息的設計居然是: * receiver.tell(msg , sender) * 也許沒理解akka設計的理念,可是正常人設計不該該是: * sender.tell(msg , receiver) * 汗…… */ getSender().tell(Msg.DONE, getSelf()); }else{ unhandled(msg); } } }
package demo02; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedActor; public class HelloWorld extends UntypedActor { @Override public void preStart(){ final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class)); greeter.tell(Greeter.Msg.GREET, getSelf()); } @Override public void onReceive(Object msg) throws Throwable { if(msg == Greeter.Msg.DONE){ getContext().stop(getSelf()); }else{ unhandled(msg); } } }
下面是Main方法:
package demo02; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; public class Main { public static void main(String[] args) { //ActorSystem 至關於ActorManager,管理各類Acor調度、線程管理等 ActorSystem system = ActorSystem.create("hello"); //建立一個HelloWorld 類型的Actor,在Actor啓動前,會調preStart(),此時會想Greeter發消息 ActorRef actor = system.actorOf(Props.create(HelloWorld.class)); //添加結束終結Actor,當ActorSystem調Stop時,會向每一個Actor發送Terminated消息 system.actorOf(Props.create(Terminator.class, actor), "terminator"); } public static class Terminator extends UntypedActor{ private final LoggingAdapter log = Logging.getLogger(getContext().system(),this); private ActorRef actorRef = null; public Terminator(ActorRef ref){ System.out.println("Terminator Init !!!"); actorRef = ref; getContext().watch(actorRef); } @Override public void onReceive(Object msg) throws Throwable { if (msg instanceof Terminated) { log.info("{} has terminated, shutting down system", actorRef.path()); getContext().system().terminate(); } else { unhandled(msg); } } } }
上面代碼在akka的源碼中sample均可以找到的,從上面看Akka對消息的識別是根據類型處理的,在我這種菜鳥看來,並非很合適,當我消息類型較多時,消息類豈不是要爆炸,固然也能夠作分級Actor,再加一層轉發層解決這個問題哈
μJavaActors 是一個十分輕量級的Actor庫,實現核心的Actor調度,不涉及複雜的框架,簡單分析一下它的源碼吧
1.Actor核心接口
Actor:定義了一個標準的Actor應該具備行爲
ActorManager:Actor管理器接口,提供線程管理,Actor調度等
Messager : Actor相互間傳遞傳遞的消息接口,固然附帶的接口還有MessageEvent和MessageListener
簡單引用做者對這個概念的描述:
Actor 是一個執行單元,一次處理一條消息。Actor
具備如下關鍵行爲或特徵:
name
,該名稱在每一個 ActorManager
中必須是唯一的。 category
;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬於一個類別。 ActorManager
能夠提供一個執行 actor 的線程,系統就會調用 receive()
。爲了保持最高效率,actor 應該迅速處理消息,而不要進入漫長的等待狀態(好比等待人爲輸入)。 willReceive()
容許 actor 過濾潛在的消息主題。 peek()
容許該 actor 和其餘 actor 查看是否存在掛起的消息(或許是爲了選擇主題)。 remove()
容許該 actor 和其餘 actor 刪除或取消任何還沒有處理的消息。 getMessageCount()
容許該 actor 和其餘 actor 獲取掛起的消息數量。 getMaxMessageCount()
容許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。大部分程序都有許多 actor,這些 actor 經常具備不一樣的類型。actor 可在程序啓動時建立或在程序執行時建立(和銷燬)。本文中的 actor 包 包含一個名爲 AbstractActor
的抽象類,actor 實現基於該類。
ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager
擁有如下關鍵行爲或特徵:
createActor()
建立一個 actor 並將它與此管理器相關聯。 startActor()
啓動一個 actor。 detachActor()
中止一個 actor 並將它與此管理器斷開。 send()/broadcast()
將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或全部 actor。在大部分程序中,只有一個 ActorManager
,但若是您但願管理多個線程和/或 actor 池,也能夠有多個 ActorManager
。此接口的默認實現是 DefaultActorManager
。
消息 是在 actor 之間發送的消息。Message
是 3 個(可選的)值和一些行爲的容器:
source
是發送 actor。 subject
是定義消息含義的字符串(也稱爲命令)。 data
是消息的任何參數數據;一般是一個映射、列表或數組。參數能夠是要處理和/或其餘 actor 要與之交互的數據。 subjectMatches()
檢查消息主題是否與字符串或正則表達式匹配。μJavaActors 包的默認消息類是 DefaultMessage
。
ActorManager其實只要簡單瀏覽一下μJavaActors源碼就能夠理解Actor設計思路啦,主要分析一下ActorManager中的Actor調度源碼:
public class ActorRunnable implements Runnable { public boolean hasThread; public AbstractActor actor; public void run() { // logger.trace("procesNextActor starting"); int delay = 1; while (running) { try { if (!procesNextActor()) { // logger.trace("procesNextActor waiting on actor"); // sleep(delay * 1000); synchronized (actors) { // TOOD: adjust this delay; possible parameter // we want to minizmize overhead (make bigger); // but it has a big impact on message processing // rate (makesmaller) // actors.wait(delay * 1000); actors.wait(100); } delay = Math.max(5, delay + 1); } else { delay = 1; } } catch (InterruptedException e) { } catch (Exception e) { logger.error("procesNextActor exception", e); } } // logger.trace("procesNextActor ended"); } protected boolean procesNextActor() { boolean run = false, wait = false, res = false; actor = null; synchronized (actors) { for (String key : runnables.keySet()) { actor = runnables.remove(key); break; } } if (actor != null) { // first run never started run = true; actor.setHasThread(true); hasThread = true; try { actor.run(); } finally { actor.setHasThread(false); hasThread = false; } } else { synchronized (actors) { for (String key : waiters.keySet()) { actor = waiters.remove(key); break; } } if (actor != null) { // then waiting for responses wait = true; actor.setHasThread(true); hasThread = true; try { res = actor.receive(); if (res) { incDispatchCount(); } } finally { actor.setHasThread(false); hasThread = false; } } } // if (!(!run && wait && !res) && a != null) { // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a); // } return run || res; } }