近年來隨着Spark的火熱,Spark自己使用的開發語言Scala、用到的分佈式內存文件系統Tachyon(現已改名爲Alluxio)以及基於Actor併發編程模型的Akka都引發了你們的注意。瞭解過Akka或者Actor的人應該知道,這的確是一個很不錯的框架,按照Akka官網的描述——使用Akka使得構建強有力的併發與分佈式應用將更加容易。因爲歷史緣由,不少Web系統在開發分佈式服務時首先會選擇RMI(Remote Method Invoke ,遠程方法調用)、RPC(Remote Procedure Call Protocol,遠程過程調用)或者使用JMS(Java Messaging Service,Java消息服務)。html
可是使用RMI只能使用java語言,並且開發、執行效率都不高;RPC框架雖然能夠經過匹配方法簽名的方式比RMI更靈活,可是其存在調用超時、調用丟失等缺點;JMS方式雖然能夠經過At Least Delivery Once、消息持久化等機制保證消息不會丟失,可是隻能做爲一種跨服務的生產者、消費者編程模型使用。Akka不但處理了以上問題,並且還可使用Actor做爲併發編程模型,減小java多線程編程的阻塞、調度、上下文開銷甚至死鎖等問題。此外,Akka還提供了集羣Sharding、流處理等功能的支持,更易於實現有限狀態自動機等功能。因此有心的開發者勢必會關心如何在最多見的Java系統中使用它,如何與Spring集成?java
本文參考Akka官方使用文檔,根據自身的經驗和理解,提供Akka與Spring集成的方案。本文不說明Spring框架的具體使用,並從Spring已經配置完備的狀況開始敘述。spring
什麼是ActorSystem?根據Akka官網的描述——ActorSystem是一個重量級的結構體,能夠用於分配1到N個線程,因此每一個應用都須要建立一個ActorSystem。一般而言,使用如下代碼來建立ActorSystem。編程
ActorSystem system = ActorSystem.create("Hello");
不過對於接入Spring而言,由IOC(Inversion of Control,控制反轉)方式會更接地氣,你能夠這樣:多線程
<!-- AKKA System Setup --> <bean id="actorSystem" class="akka.actor.ActorSystem" factory-method="create" destroy-method="shutdown" scope="singleton"> <constructor-arg value="helloAkkaSystem"/> </bean>
而後在你須要的地方依賴注入便可。架構
有關Actor編程模型的具體介紹能夠看個人另外一篇博文——《Spark如何使用Akka實現進程、節點通訊的簡明介紹》,裏面有更多的介紹。須要補充的是,在最新的Scala官方網站上已經決定廢棄Scala自身的Actor編程模型,轉而全面擁抱Akka提供的Actor編程模型。併發
咱們能夠經過如下代碼(代碼片斷借用了Akka官網的例子)建立一個簡單的Actor例子。app
Greeter是表明問候者的Actor:框架
public class Greeter extends UntypedActor { public static enum Msg { GREET, DONE; } @Override public void onReceive(Object msg) { if (msg == Msg.GREET) { System.out.println("Hello World!"); getSender().tell(Msg.DONE, getSelf()); } else unhandled(msg); } }
通常狀況下咱們的Actor都須要繼承自UntypedActor,並實現其onReceive方法。onReceive用於接收消息,你能夠在其中實現對消息的匹配並作不一樣的處理。分佈式
HelloWorld是用於向Greeter發送問候消息的訪客:
public class HelloWorld extends UntypedActor { @Override public void preStart() { // create the greeter actor final ActorRef greeter = getContext().actorOf(Props.create(Greeter.class), "greeter"); // tell it to perform the greeting greeter.tell(Greeter.Msg.GREET, getSelf()); } @Override public void onReceive(Object msg) { if (msg == Greeter.Msg.DONE) { // when the greeter is done, stop this actor and with it the application getContext().stop(getSelf()); } else unhandled(msg); } }
有了Actor以後,咱們能夠這樣使用它:
ActorRef a = system.actorOf(Props.create(HelloWorld.class), "helloWorld");
在HelloWorld的preStart實現中,獲取了Greeter的ActorRef(Actor的引用)並向Greeter發送了問候的消息,Greeter收到問候消息後,會先打印Hello World!,而後向HelloWorld回覆完成的消息,HelloWorld得知Greeter完成了向世界問好這個偉大的任務後,就結束了本身的生命。HelloWorld的例子用編程API的方式告訴了咱們如何使用Actor及發送、接收消息。爲了便於描述與Spring的集成,下面再介紹一個例子。
CountingActor(代碼主體借用自Akka官網)是用於計數的Actor,見代碼清單1所示。
代碼清單1
@Named("CountingActor") @Scope("prototype") public class CountingActor extends UntypedActor { public static class Count { } public static class Get { } // the service that will be automatically injected @Resource private CountingService countingService; private int count = 0; @Override public void onReceive(Object message) throws Exception { if (message instanceof Count) { count = countingService.increment(count); } else if (message instanceof Get) { getSender().tell(count, getSelf()); } else { unhandled(message); } } }
public interface CountingService { /** * 計數 * @param count * @return */ int increment(int count); }
@Service("countingService") public class CountingServiceImpl implements CountingService { private static Logger logger = LoggerFactory.getLogger(CountingServiceImpl.class); /* * (non-Javadoc) * * @see com.elong.sentosa.metadata.service.CountingService#increment(int) */ @Override public int increment(int count) { logger.info("increase " + count + "by 1."); return count + 1; } }
akka.actor.ActorInitializationException: You cannot create an instance of [com.elong.metadata.akka.actor.CountingActor] explicitly using the constructor (new). You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.
若是咱們不能使用@Service或者@Component,也不能使用XML配置的方式使用(與註解一個道理),那麼咱們如何使用CountingActor提供的服務呢?
IndirectActorProducer是Akka提供的Actor生成接口,從其名字咱們知道Akka給咱們指出了另外一條道路——石頭大了繞着走!經過實現IndirectActorProducer接口咱們能夠定製一些Actor的生成方式,與Spring集成能夠這樣實現它,見代碼清單2所示。
代碼清單2
public class SpringActorProducer implements IndirectActorProducer { private final ApplicationContext applicationContext; private final String actorBeanName; private final Object[] args; public SpringActorProducer(ApplicationContext applicationContext, String actorBeanName, Object ... args) { this.applicationContext = applicationContext; this.actorBeanName = actorBeanName; this.args = args; } public Actor produce() { return (Actor) applicationContext.getBean(actorBeanName, args); } public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(actorBeanName); } }
SpringActorProducer的實現主要借鑑了Akka官方文檔,我這裏對其做了一些擴展以便於支持構造器帶有多個參數的狀況。從其實現看到實際是利用了ApplicationContext提供的getBean方式實例化Actor。
這裏還有兩個問題:1、ApplicationContext如何獲取和設置?2、如何使用SpringActorProducer生成Spring須要的Actor實例?
對於第一個問題,咱們能夠經過封裝SpringActorProducer並實現ApplicationContextAware接口的方式獲取ApplicationContext;對於第二個問題,咱們知道Akka中的全部Actor實例都是以Props做爲配置參數開始的,這裏以SpringActorProducer爲代理生成咱們須要的Actor的Props。
SpringExt實現了以上思路,見代碼清單3所示。
代碼清單3
@Component("springExt") public class SpringExt implements Extension, ApplicationContextAware { private ApplicationContext applicationContext; /** * Create a Props for the specified actorBeanName using the * SpringActorProducer class. * * @param actorBeanName * The name of the actor bean to create Props for * @return a Props that will create the named actor bean using Spring */ public Props props(String actorBeanName, Object ... args) { return Props.create(SpringActorProducer.class, applicationContext, actorBeanName, args); } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
通過了以上的鋪墊,如今你可使用建立好的CountingActor了,首先你須要在你的業務類中注入ActorSystem和SpringExt。
@Autowired private ActorSystem actorSystem; @Autowired private SpringExt springExt;
而後咱們使用CountingActor進行計數,代碼以下:
ActorRef counter = actorSystem.actorOf(springExt.props("CountingActor"), "counter"); // Create the "actor-in-a-box" final Inbox inbox = Inbox.create(system); // tell it to count three times inbox.send(counter, new Count()); inbox.send(counter, new Count()); inbox.send(counter, new Count()); // print the result FiniteDuration duration = FiniteDuration.create(3, TimeUnit.SECONDS); Future<Object> result = ask(counter, new Get(), Timeout.durationToTimeout(duration)); try { System.out.println("Got back " + Await.result(result, duration)); } catch (Exception e) { System.err.println("Failed getting result: " + e.getMessage()); throw e; }
輸出結果爲:
Got back 3
本文只是最簡單的Akka集成Spring的例子,Akka的remote、cluster、persistence、router等機制均可以應用。