Akka 和 μJavaActors入門

     AkkaμJavaActorsμJavaActors均是java的Actor庫,其中Akka提供了叫爲完整的Actor開發框架,較爲龐大,學習成本很高,μJavaActors 是一個輕量級Actor庫,大約僅有1200行代碼,比較適合入門。java

一.Akka Demo 

Akka是一個至關成熟、強大的庫,github上download下的是Akka的源碼,應該使用sbt構建的工程,若是沒有使用sbt經驗,想導出jar還挺不容易的,推薦Akka官網下載akka各個組件的jar去使用,簡單介紹一下helloworld 級別Akka的demo。git

1.Akka的主要組件github

akka-actor.jar : Actor核心組件了,定義了Acotr核心類正則表達式

akka-slf4f.jar : SLF4F Logger的支持,一個打log的組件,不用太關注redis

akka-remote.jar : Actor作遠程調用的jar,相似RFC吧數組

akka-cluster : actor作集羣管理組件框架

akka-camel.jar : 對Apache Camel 集成接口eclipse

scala-library-2.11.8.jar : akka核心應該是Scala寫的,這個組件就是對akka的核心支持ide

Akka還有不少組件,不過對於hello world級的程序簡單瞭解幾個就ok了。工程是基於eclipse的,須要包含下面幾個基礎的組件:學習

4YQ$GDJYW$F]Q~]K`XCL@YU

編寫兩個Actor:

package demo02;

import akka.actor.UntypedActor;
/*
 * UntypedAcotr是無類型Actor的一個抽象類,繼承與核心類Actor
 */
public class Greeter extends UntypedActor {

    public static enum Msg{
        GREET , DONE;
    }
    /**
     * 每一個Actor必須實現OnReceive,當該Actor收到消息調用該方法
     */
    @Override
    public void onReceive(Object msg) throws Throwable {
        if(msg == Msg.GREET){
            System.out.println("Hello world");
            /**
             * 這裏吐槽一下Akka對於發消息的設計,發送消息的設計居然是:
             * receiver.tell(msg , sender)
             * 也許沒理解akka設計的理念,可是正常人設計不該該是:
             *  sender.tell(msg , receiver)
             *  汗……
             */
            getSender().tell(Msg.DONE, getSelf());
        }else{
            unhandled(msg);
        }
        
    }

}
package demo02;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;

public class HelloWorld extends UntypedActor {

    @Override
    public void preStart(){
        final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class));
        greeter.tell(Greeter.Msg.GREET, getSelf());
    }
    
    
    @Override
    public void onReceive(Object msg) throws Throwable {
    
        if(msg == Greeter.Msg.DONE){
            getContext().stop(getSelf());
        }else{
            unhandled(msg);
        }
        
    }

    
}

下面是Main方法:

package demo02;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class Main {

    public static void main(String[] args) {
        //ActorSystem 至關於ActorManager,管理各類Acor調度、線程管理等
        ActorSystem system = ActorSystem.create("hello");
        //建立一個HelloWorld 類型的Actor,在Actor啓動前,會調preStart(),此時會想Greeter發消息
        ActorRef actor = system.actorOf(Props.create(HelloWorld.class));
        //添加結束終結Actor,當ActorSystem調Stop時,會向每一個Actor發送Terminated消息    
        system.actorOf(Props.create(Terminator.class, actor), "terminator");
        
    }
    public static class Terminator extends UntypedActor{

        private final LoggingAdapter log = Logging.getLogger(getContext().system(),this);
        private ActorRef actorRef = null;
        
        public Terminator(ActorRef ref){
            System.out.println("Terminator Init !!!");
            actorRef = ref;
            getContext().watch(actorRef);
        }
        
        @Override
        public void onReceive(Object msg) throws Throwable {
              if (msg instanceof Terminated) {
                  log.info("{} has terminated, shutting down system", actorRef.path());
                  getContext().system().terminate();
                } else {
                  unhandled(msg);
                }
            
        }
        
    }
    
}

上面代碼在akka的源碼中sample均可以找到的,從上面看Akka對消息的識別是根據類型處理的,在我這種菜鳥看來,並非很合適,當我消息類型較多時,消息類豈不是要爆炸,固然也能夠作分級Actor,再加一層轉發層解決這個問題哈

二.μJavaActors

    μJavaActors 是一個十分輕量級的Actor庫,實現核心的Actor調度,不涉及複雜的框架,簡單分析一下它的源碼吧

1.Actor核心接口

Actor:定義了一個標準的Actor應該具備行爲

ActorManager:Actor管理器接口,提供線程管理,Actor調度等

Messager : Actor相互間傳遞傳遞的消息接口,固然附帶的接口還有MessageEvent和MessageListener

簡單引用做者對這個概念的描述:

Actor 是一個執行單元,一次處理一條消息。Actor 具備如下關鍵行爲或特徵:

  • 每一個 actor 有一個 name,該名稱在每一個 ActorManager 中必須是唯一的。
  • 每一個 actor 屬於一個 category;類別是一種向一組 actor 中的一個成員發送消息的方式。一個 actor 一次只能屬於一個類別。
  • 只要 ActorManager 能夠提供一個執行 actor 的線程,系統就會調用 receive()。爲了保持最高效率,actor 應該迅速處理消息,而不要進入漫長的等待狀態(好比等待人爲輸入)。
  • willReceive() 容許 actor 過濾潛在的消息主題。
  • peek() 容許該 actor 和其餘 actor 查看是否存在掛起的消息(或許是爲了選擇主題)。
  • remove() 容許該 actor 和其餘 actor 刪除或取消任何還沒有處理的消息。
  • getMessageCount() 容許該 actor 和其餘 actor 獲取掛起的消息數量。
  • getMaxMessageCount() 容許 actor 限制支持的掛起消息數量;此方法可用於預防不受控制地發送。

大部分程序都有許多 actor,這些 actor 經常具備不一樣的類型。actor 可在程序啓動時建立或在程序執行時建立(和銷燬)。本文中的 actor 包 包含一個名爲 AbstractActor 的抽象類,actor 實現基於該類。

 

ActorManager 是一個 actor 管理器。它負責向 actor 分配線程(進而分配處理器)來處理消息。ActorManager 擁有如下關鍵行爲或特徵:

  • createActor() 建立一個 actor 並將它與此管理器相關聯。
  • startActor() 啓動一個 actor。
  • detachActor() 中止一個 actor 並將它與此管理器斷開。
  • send()/broadcast() 將一條消息發送給一個 actor、一組 actor、一個類別中的任何 actor 或全部 actor。

在大部分程序中,只有一個 ActorManager,但若是您但願管理多個線程和/或 actor 池,也能夠有多個 ActorManager。此接口的默認實現是 DefaultActorManager

 

消息 是在 actor 之間發送的消息。Message 是 3 個(可選的)值和一些行爲的容器:

  • source 是發送 actor。
  • subject 是定義消息含義的字符串(也稱爲命令)。
  • data 是消息的任何參數數據;一般是一個映射、列表或數組。參數能夠是要處理和/或其餘 actor 要與之交互的數據。
  • subjectMatches() 檢查消息主題是否與字符串或正則表達式匹配。

μJavaActors 包的默認消息類是 DefaultMessage

 

ActorManager其實只要簡單瀏覽一下μJavaActors源碼就能夠理解Actor設計思路啦,主要分析一下ActorManager中的Actor調度源碼:

public class ActorRunnable implements Runnable {
        public boolean hasThread;
        public AbstractActor actor;

        public void run() {
            // logger.trace("procesNextActor starting");
            int delay = 1;
            while (running) {
                try {
                    if (!procesNextActor()) {
                        // logger.trace("procesNextActor waiting on actor");
                        // sleep(delay * 1000);
                        synchronized (actors) {
                            // TOOD: adjust this delay; possible parameter
                            // we want to minizmize overhead (make bigger);
                            // but it has a big impact on message processing
                            // rate (makesmaller)
                            // actors.wait(delay * 1000);
                            actors.wait(100);
                        }
                        delay = Math.max(5, delay + 1);
                    } else {
                        delay = 1;
                    }
                } catch (InterruptedException e) {
                } catch (Exception e) {
                    logger.error("procesNextActor exception", e);
                }
            }
            // logger.trace("procesNextActor ended");
        }

        protected boolean procesNextActor() {
            boolean run = false, wait = false, res = false;
            actor = null;
            synchronized (actors) {
                for (String key : runnables.keySet()) {
                    actor = runnables.remove(key);
                    break;
                }
            }
            if (actor != null) {
                // first run never started
                run = true;
                actor.setHasThread(true);
                hasThread = true;
                try {
                    actor.run();
                } finally {
                    actor.setHasThread(false);
                    hasThread = false;
                }
            } else {
                synchronized (actors) {
                    for (String key : waiters.keySet()) {
                        actor = waiters.remove(key);
                        break;
                    }
                }
                if (actor != null) {
                    // then waiting for responses
                    wait = true;
                    actor.setHasThread(true);
                    hasThread = true;
                    try {
                        res = actor.receive();
                        if (res) {
                            incDispatchCount();
                        }
                    } finally {
                        actor.setHasThread(false);
                        hasThread = false;
                    }
                }
            }
            // if (!(!run && wait && !res) && a != null) {
            // logger.trace("procesNextActor %b/%b/%b: %s", run, wait, res, a);
            // }
            return run || res;
        }
    }
ActorMgr中有一個線程隊列維護了一些ActorRunnable對象,每一個ActorRunnable對象有都在無線循環調度Actor,這也就簡單使得每一個Actor在不一樣的線程中執行。固然此時會有個問題,若是有一些Actor出現資源競爭會不會出現問題,答案確定是會的。Actor僅僅是抽象了線程調度問題並給出了一下Actor的原則,並不能徹底避免資源競爭現象的出現,只能說準守Actor模式規範,,固然也能夠用redis去作公共內存塊,避免直接的全局資源讀寫。
相關文章
相關標籤/搜索