Spring與Akka的集成

概述

       近年來隨着Spark的火熱,Spark自己使用的開發語言Scala、用到的分佈式內存文件系統Tachyon(現已改名爲Alluxio)以及基於Actor併發編程模型的Akka都引發了你們的注意。瞭解過Akka或者Actor的人應該知道,這的確是一個很不錯的框架,按照Akka官網的描述——使用Akka使得構建強有力的併發與分佈式應用將更加容易。因爲歷史緣由,不少Web系統在開發分佈式服務時首先會選擇RMI(Remote Method Invoke ,遠程方法調用)、RPC(Remote Procedure Call Protocol,遠程過程調用)或者使用JMS(Java Messaging Service,Java消息服務)。html

       可是使用RMI只能使用java語言,並且開發、執行效率都不高;RPC框架雖然能夠經過匹配方法簽名的方式比RMI更靈活,可是其存在調用超時、調用丟失等缺點;JMS方式雖然能夠經過At Least Delivery Once、消息持久化等機制保證消息不會丟失,可是隻能做爲一種跨服務的生產者、消費者編程模型使用。Akka不但處理了以上問題,並且還可使用Actor做爲併發編程模型,減小java多線程編程的阻塞、調度、上下文開銷甚至死鎖等問題。此外,Akka還提供了集羣Sharding、流處理等功能的支持,更易於實現有限狀態自動機等功能。因此有心的開發者勢必會關心如何在最多見的Java系統中使用它,如何與Spring集成?java

       本文參考Akka官方使用文檔,根據自身的經驗和理解,提供Akka與Spring集成的方案。本文不說明Spring框架的具體使用,並從Spring已經配置完備的狀況開始敘述。spring

Actor系統——ActorSystem

       什麼是ActorSystem?根據Akka官網的描述——ActorSystem是一個重量級的結構體,能夠用於分配1到N個線程,因此每一個應用都須要建立一個ActorSystem。一般而言,使用如下代碼來建立ActorSystem。編程

ActorSystem system = ActorSystem.create("Hello");

不過對於接入Spring而言,由IOC(Inversion of Control,控制反轉)方式會更接地氣,你能夠這樣:多線程

    <!-- AKKA System Setup -->
    <bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton">
        <constructor-arg value="helloAkkaSystem"/>
    </bean>

而後在你須要的地方依賴注入便可。架構

Actor編程模型

       有關Actor編程模型的具體介紹能夠看個人另外一篇博文——《Spark如何使用Akka實現進程、節點通訊的簡明介紹》,裏面有更多的介紹。須要補充的是,在最新的Scala官方網站上已經決定廢棄Scala自身的Actor編程模型,轉而全面擁抱Akka提供的Actor編程模型。併發

       咱們能夠經過如下代碼(代碼片斷借用了Akka官網的例子)建立一個簡單的Actor例子。app

       Greeter是表明問候者的Actor:框架

public class Greeter extends UntypedActor {

  public static enum Msg {
    GREET, DONE;
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Msg.GREET) {
      System.out.println("Hello World!");
      getSender().tell(Msg.DONE, getSelf());
    } else
      unhandled(msg);
  }

}

通常狀況下咱們的Actor都須要繼承自UntypedActor,並實現其onReceive方法。onReceive用於接收消息,你能夠在其中實現對消息的匹配並作不一樣的處理。分佈式

HelloWorld是用於向Greeter發送問候消息的訪客:

public class HelloWorld extends UntypedActor {

  @Override
  public void preStart() {
    // create the greeter actor
    final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter");
    // tell it to perform the greeting
    greeter.tell(Greeter.Msg.GREET, getSelf());
  }

  @Override
  public void onReceive(Object msg) {
    if (msg == Greeter.Msg.DONE) {
      // when the greeter is done, stop this actor and with it the application
      getContext().stop(getSelf());
    } else
      unhandled(msg);
  }
}

有了Actor以後,咱們能夠這樣使用它:

ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");

       在HelloWorld的preStart實現中,獲取了Greeter的ActorRef(Actor的引用)並向Greeter發送了問候的消息,Greeter收到問候消息後,會先打印Hello World!,而後向HelloWorld回覆完成的消息,HelloWorld得知Greeter完成了向世界問好這個偉大的任務後,就結束了本身的生命。HelloWorld的例子用編程API的方式告訴了咱們如何使用Actor及發送、接收消息。爲了便於描述與Spring的集成,下面再介紹一個例子。

       CountingActor(代碼主體借用自Akka官網)是用於計數的Actor,見代碼清單1所示。

代碼清單1

@Named("CountingActor")
@Scope("prototype")
public class CountingActor extends UntypedActor {

    public static class Count {
    }

    public static class Get {
    }

    // the service that will be automatically injected
    @Resource
    private CountingService countingService;

    private int count = 0;

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof Count) {
            count = countingService.increment(count);
        } else if (message instanceof Get) {
            getSender().tell(count, getSelf());
        } else {
            unhandled(message);
        }
    }
}
CountingActor用於接收Count消息進行計數,接收Get消息回覆給發送者當前的計數值。CountingService是用於計數的接口,其定義以下:
public interface CountingService {
    
    /**
     * 計數
     * @param count
     * @return
     */
    int increment(int count);

}
CountingService的具體實現是CountingServiceImpl,其實現以下:
@Service("countingService")
public class CountingServiceImpl implements CountingService {

    private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class);

    /*
     * (non-Javadoc)
     * 
     * @see com.elong.sentosa.metadata.service.CountingService#increment(int)
     */
    @Override
    public int increment(int count) {
        logger.info("increase " + count + "by 1.");
        return count + 1;
    }

}
CountingActor經過註解方式注入了CountingService,CountingActor的計數實際是由CountingService完成。
        細心的同窗可能發現了CountingActor使用了註解Named,這裏爲何沒有使用@Service或者@Component等註解呢?因爲Akka的Actor在初始化的時候必須使用System或者Context的工廠方法actorOf建立新的Actor實例,不能使用構造器來初始化,而使用Spring的Service或者Component註解,會致使使用構造器初始化Actor,因此會拋出如下異常:
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.

若是咱們不能使用@Service或者@Component,也不能使用XML配置的方式使用(與註解一個道理),那麼咱們如何使用CountingActor提供的服務呢?

 

IndirectActorProducer接口

        IndirectActorProducer是Akka提供的Actor生成接口,從其名字咱們知道Akka給咱們指出了另外一條道路——石頭大了繞着走!經過實現IndirectActorProducer接口咱們能夠定製一些Actor的生成方式,與Spring集成能夠這樣實現它,見代碼清單2所示。

代碼清單2

public class SpringActorProducer implements IndirectActorProducer {
    private final ApplicationContext applicationContext;
    private final String actorBeanName;
    private final Object[] args;

    public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
        this.args = args;
    }

    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName, args);
    }

    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}

SpringActorProducer的實現主要借鑑了Akka官方文檔,我這裏對其做了一些擴展以便於支持構造器帶有多個參數的狀況。從其實現看到實際是利用了ApplicationContext提供的getBean方式實例化Actor。
       這裏還有兩個問題:1、ApplicationContext如何獲取和設置?2、如何使用SpringActorProducer生成Spring須要的Actor實例?

       對於第一個問題,咱們能夠經過封裝SpringActorProducer並實現ApplicationContextAware接口的方式獲取ApplicationContext;對於第二個問題,咱們知道Akka中的全部Actor實例都是以Props做爲配置參數開始的,這裏以SpringActorProducer爲代理生成咱們須要的Actor的Props。

       SpringExt實現了以上思路,見代碼清單3所示。

代碼清單3

@Component("springExt")
public class SpringExt implements Extension, ApplicationContextAware {

    private ApplicationContext applicationContext;

    /**
     * Create a Props for the specified actorBeanName using the
     * SpringActorProducer class.
     *
     * @param actorBeanName
     *            The name of the actor bean to create Props for
     * @return a Props that will create the named actor bean using Spring
     */
    public Props props(String actorBeanName, Object ... args) {
        return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args);
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

應用例子

        通過了以上的鋪墊,如今你可使用建立好的CountingActor了,首先你須要在你的業務類中注入ActorSystem和SpringExt。

        @Autowired
    private ActorSystem actorSystem;

    @Autowired
    private SpringExt springExt;

而後咱們使用CountingActor進行計數,代碼以下:

    ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter");

    // Create the "actor-in-a-box"
        final Inbox inbox = Inbox.create(system);
        
    // tell it to count three times
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());
        inbox.send(counter, new Count());

    // print the result
    FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS);
    Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration));
    try {
        System.out.println("Got back " + Await.result(result, duration));
    } catch (Exception e) {
        System.err.println("Failed getting result: " + e.getMessage());
        throw e;
    }

輸出結果爲:

Got back 3

總結

       本文只是最簡單的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等機制均可以應用。

後記:通過近一年的準備,《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:
 
售賣連接以下:
相關文章
相關標籤/搜索