Akka 和 Storm 都是實現低延時, 高吞吐量計算的重要工具. 不過它們並不是徹底的競品,
若是說 Akka 是 linux 內核的話, storm 更像是相似 Ubuntu 的發行版.然而 Storm
並不是 Akka 的發行版, 或許說 Akka 比做 BSD, Storm 比做 Ubuntu 更合適.html
Akka 包括了一套 API 和執行引擎.
Storm 除了 API 和執行引擎以外,還包括了監控數據,WEB界面,集羣管理,消息傳遞保障機制.
此文討論 Akka 和 Storm 重合的部分,也就是 API 和 執行引擎的異同.java
咱們看下 Storm 兩個主要的 APIpython
public interface ISpout extends Serializable { /** * Called when a task for this component is initialized within a worker on the cluster. * It provides the spout with the environment in which the spout executes. * * <p>This includes the:</p> * * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * Called when an ISpout is going to be shutdown. There is no guarentee that close * will be called, because the supervisor kill -9's worker processes on the cluster. * * <p>The one context where close is guaranteed to be called is a topology is * killed when running Storm in local mode.</p> */ void close(); /** * Called when a spout has been activated out of a deactivated mode. * nextTuple will be called on this spout soon. A spout can become activated * after having been deactivated when the topology is manipulated using the * `storm` client. */ void activate(); /** * Called when a spout has been deactivated. nextTuple will not be called while * a spout is deactivated. The spout may or may not be reactivated in the future. */ void deactivate(); /** * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. */ void nextTuple(); /** * Storm has determined that the tuple emitted by this spout with the msgId identifier * has been fully processed. Typically, an implementation of this method will take that * message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** * The tuple emitted by this spout with the msgId identifier has failed to be * fully processed. Typically, an implementation of this method will put that * message back on the queue to be replayed at a later time. */ void fail(Object msgId); }
以及react
public interface IBasicBolt extends IComponent { void prepare(Map stormConf, TopologyContext context); /** * Process the input tuple and optionally emit new tuples based on the input tuple. * * All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); void cleanup(); }
和 akka 中 actor 的 apilinux
trait Actor { import Actor._ // to make type Receive known in subclasses without import type Receive = Actor.Receive /** * Stores the context for this actor, including self, and sender. * It is implicit to support operations such as `forward`. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! * * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a * [[akka.actor.UntypedActorContext]], which is the Java API of the actor * context. */ implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get if ((contextStack.isEmpty) || (contextStack.head eq null)) throw ActorInitializationException( s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.") val c = contextStack.head ActorCell.contextStack.set(null :: contextStack) c } /** * The 'self' field holds the ActorRef for this actor. * <p/> * Can be used to send messages to itself: * <pre> * self ! message * </pre> */ implicit final val self = context.self //MUST BE A VAL, TRUST ME /** * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, * else `deadLetters` in [[akka.actor.ActorSystem]]. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! */ final def sender(): ActorRef = context.sender() /** * This defines the initial actor behavior, it must return a partial function * with the actor logic. */ //#receive def receive: Actor.Receive //#receive /** * INTERNAL API. * * Can be overridden to intercept calls to this actor's current behavior. * * @param receive current behavior. * @param msg current message. */ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled) /** * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default. */ protected[akka] def aroundPreStart(): Unit = preStart() /** * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default. */ protected[akka] def aroundPostStop(): Unit = postStop() /** * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default. */ protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message) /** * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default. */ protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason) /** * User overridable definition the strategy to use for supervising * child actors. */ def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy /** * User overridable callback. * <p/> * Is called when an Actor is started. * Actors are automatically started asynchronously when created. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preStart(): Unit = () //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postStop(): Unit = () //#lifecycle-hooks /** * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' * @param reason the Throwable that caused the restart to happen * @param message optionally the current message the actor processed when failing, if applicable * <p/> * Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child ⇒ context.unwatch(child) context.stop(child) } postStop() } //#lifecycle-hooks /** * User overridable callback: By default it calls `preStart()`. * @param reason the Throwable that caused the restart to happen * <p/> * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postRestart(reason: Throwable): Unit = { preStart() } //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called when a message isn't handled by the current behavior of the actor * by default it fails with either a [[akka.actor.DeathPactException]] (in * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]] * to the actor's system's [[akka.event.EventStream]] */ def unhandled(message: Any): Unit = { message match { case Terminated(dead) ⇒ throw new DeathPactException(dead) case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self)) } } }
能夠說 Storm 主要的 API 和 Actor 很是相像, 不過從時間線上看 Storm 和 Akka
都是從差很少的時間開始開發的,所以頗有可能 Storm 是做者受了 Erlang 的 Actor 實現啓發而寫的.
從目前的情況看來, 頗有可能做者想用 Clojure 語言寫一個"樸素"的 Actor 實現, 然而這個"樸素"實現已經知足了 Storm 的設計目標, 因此做者也沒有繼續把 Storm 變成一個 Actor 在 clojure 上的完整實現.git
那麼,僅僅是從 API 上看的話 Spout/Bolt 和 Actor 的差別有哪些呢?github
Storm 在 API 上比 Actor 多了 ack 和 fail 兩個接口. 有這兩個接口主要是由於 Storm 比 Akka 的應用場景更加細分(基本上只是用於統計), 因此已經作好了容錯機制,能讓在這個細分領域的用戶達到開箱可用.算法
另外,在 Storm 的 Tuple 類中存儲着一些 context 信息,也是出於目標使用場景的需求封裝的.apache
context: Spout 的 open 方法裏也有 context, 然而 context 在 actor 中是隨時能夠調用的,代表 Actor 比 Spout 更加鼓勵用戶使用 context, context 中的數據也會動態更新.編程
self: Actor對自身的引用,能夠理解爲 Actor 模型更加支持下游收到數據的組件往上游回發數據的行爲,甚至本身對本身發數據也能夠.在 Storm 中,咱們默認數據發送是單向的,下游接收的組件不會對上游有反饋(除了系統定義的ack,和fail)
postRestart: 區分 Actor 的第一次啓動和重啓, 仍是蠻有用的,Storm 沒有應該是最初懶得寫或者沒想到,後來又不想改核心 API.
unhandled: 對沒有預期到會發送給自身的消息作處理,默認是傳到一個系統 stream,由於 Actor 自己是開放的,外部應用只要知道這個 Actor 的地址就能發消息給它.Storm 自己只接收你爲它設計好的消息,因此沒有這個需求.
Actor 和 Task 的比較, 線程調度模型的不一樣, 以及代碼熱部署,Storm 的 ack 機制對異步代碼的限制等.
Component 是 Spout 和 Bolt 的總稱,是 Storm 中執行用戶代碼的基本組件. 共同點是都根據消息作出響應,也可以存儲內容,一次只有一個線程進入,除非你手動另外開啓線程.主要的區別在於 Actor 是很是輕量的組件,你能夠在一個程序裏建立幾萬個 Actor, 或者每十行代碼都在一個 Actor 裏, 這都沒有問題. 然而換成 Storm 的Component, 狀況都不同了,你最好只用若干個 Component 來描述頂層抽象.
API 很類似,爲何 Actor 能夠隨便開新的, Component 就要儘可能少開呢? 祕密都在 Akka 的調度器(Dispatchers)裏. Akka 程序的全部異步代碼,包括 Actor,Future,Runnable 甚至ParIterable,能夠說除了你要用主線程啓動ActorSystem外,其餘全部線程均可以交給Dispatcher管理.Dispatcher 能夠自定義,默認的狀況下采用了 "fork-join-executor",相對於通常的線程池,fork-join-executor 特別適合 Actor模型,能夠提供至關優異的性能.
相比較的, Storm 的線程調度模型就要"樸素"不少,就是每一個 Component 一個線程,或者若干個Component輪流共用一個線程,這也就是爲何Component不能開太多的緣由.
實時計算方面,熱部署的需求主要是諸如修改排序算法之類的,替換某個算法模塊,其餘東西不變.
由於 Storm 是能夠經過 Thrift 支持任何語言編程的,因此你若是是用python之類的腳本語言寫的算法,想要換掉算法而不重啓,那隻要把每臺機器上相應位置的py文件替換掉就行了.不過這樣就會讓程序限定在用此類語言實現.
Akka 方面, 由於 Actor 模型對進程內和進程間的通訊接口都是統一的, 能夠負責算法的一類 Actor 做爲單獨的進程啓動,代碼更新了就重啓這個進程. 雖然系統中有一個進程重啓了,可是整個系統仍是能夠一刻不停地運轉.
Storm 的消息保障機制是具備首創性的, 利用位亦或可以用很是小的內存,高性能地掌握數據處理過程當中的成功或失敗狀況. 默認的狀況下,在用戶的代碼中只須要指定一個MessageId, Ack 機制就能愉快地跑起來了. 因此一般用戶不用關心這塊內容, 可是默認接口的問題就是, 一旦使用了異步程序, ack 機制就會失效,包括 schedule 和 submit runnable 等行爲,都不會被 Ack 機制關心,也就是說異步邏輯執行失敗了,acker也不知道. 如何能讓 Storm 的 Ack 機制與異步代碼和諧相處,仍是一個待探討的問題.
我認爲 Storm 的 API 是優秀的, 可靠性也是在若干年的實踐中獲得證明的, 然而其核心運起色制過於樸素又給人一種烈士暮年的感受. Storm 最初的使用者 Twitter 也在不久前公佈了他們兼容 Storm 接口的新的解決方案 Heron, 不過並無開源. 若是有開源方案可以基於 Akka "從新實現" 一個 Storm,那將是很是使人期待的事情. 我目前發現 gearpump是其中一個.