在Spark集羣 + Akka + Kafka + Scala 開發(1) : 配置開發環境中,咱們已經部署好了一個Spark的開發環境。
在Spark集羣 + Akka + Kafka + Scala 開發(2) : 開發一個Spark應用中,咱們已經寫好了一個Spark的應用。
本文的目標是寫一個基於akka的scala工程,在一個spark standalone的集羣環境中運行。html
akka的名字是action kernel的迴文。根據官方定義:akka用於resilient elastic distributed real-time transaction processing。
我的理解是:
resilient:是指對需求和安全性等方面(來自於外部的)的一種適應力(彈性)。
elastic:是指對資源利用方面的彈性。
所以,akka是一個知足需求彈性、資源分配彈性的分佈式實時事務處理系統。
akka只是一個類庫,一個工具,並無提供一個平臺。java
WEB-INF/lib
。在本文中,一個Spark + akka的環境裏,akka被用於as an application
模式下。
咱們會建立一個akka工程,含有兩個應用:git
咱們看出,這裏咱們把akka做爲一個任務處理器,並經過spark來完成任務。github
這個工程包含了兩個應用。
一個Consumer應用:CusomerApp:實現了經過Spark的Stream+Kafka的技術來實現處理消息的功能。
一個Producer應用:ProducerApp:實現了向Kafka集羣發消息的功能。web
AkkaSampleApp # 項目目錄 |-- build.bat # build文件 |-- src |-- main |-- resources |-- application.conf # Akka Server應用的配置文件 |-- client.conf # Akka Client應用的配置文件 |-- scala |-- ClientActor.scala # Akka Client的Actor:提供了一種調用Server Actor的方式。 |-- ClientApp.scala # Akka Client應用 |-- ProductionReaper.scala # Akka Shutdown pattern的實現者 |-- Reaper.scala # Akka Shutdown pattern的Reaper抽象類 |-- ServerActor.scala # Akka Server的Actor,提供一個求1到n的MapReduce計算。使用了Spark。 |-- ServerApp.scala # Akka Server應用
能夠運行:shell
mkdir AkkaSampleApp mkdir -p /AkkaSampleApp/src/main/resources mkdir -p /AkkaSampleApp/src/main/scala
name := "akka-sample-app" version := "1.0" scalaVersion := "2.11.8" scalacOptions += "-feature" scalacOptions += "-deprecation" scalacOptions += "-language:postfixOps" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.10", "com.typesafe.akka" %% "akka-remote" % "2.4.10", "org.apache.spark" %% "spark-core" % "2.0.0" ) resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
akka { #loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 2552 } #log-sent-messages = on #log-received-messages = on } }
akka { #loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } #log-sent-messages = on #log-received-messages = on } }
注:
port = 0
表示這個端口號會自動生成一個。apache
import akka.actor._ import akka.event.Logging class ClientActor(serverPath: String) extends Actor { val log = Logging(context.system, this) val serverActor = context.actorSelection(serverPath) def receive = { case msg: String => log.info(s"ClientActor received message '$msg'") serverActor ! 10000L } }
import com.typesafe.config.ConfigFactory import akka.actor._ import akka.remote.RemoteScope import akka.util._ import java.util.concurrent.TimeUnit import scala.concurrent._ import scala.concurrent.duration._ object ClientApp { def main(args: Array[String]): Unit = { val system = ActorSystem("LocalSystem", ConfigFactory.load("client")) // get the remote actor via the server actor system's address val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552") val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress)))) // invoke the remote actor via a client actor. // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor" // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor") buildReaper(system, actor) // tell actor ! 10000L waitShutdown(system, actor) } private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = { import Reaper._ val reaper = system.actorOf(Props(classOf[ProductionReaper])) // Watch the action reaper ! WatchMe(actor) } private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = { // trigger the shutdown operation in ProductionReaper system.stop(actor) // wait to shutdown Await.result(system.whenTerminated, 60.seconds) } }
當全部的Actor中止後,終止Actor System。安全
class ProductionReaper extends Reaper { // Shutdown def allSoulsReaped(): Unit = { context.system.terminate() } }
import akka.actor.{Actor, ActorRef, Terminated} import scala.collection.mutable.ArrayBuffer object Reaper { // Used by others to register an Actor for watching case class WatchMe(ref: ActorRef) } abstract class Reaper extends Actor { import Reaper._ // Keep track of what we're watching val watched = ArrayBuffer.empty[ActorRef] // Derivations need to implement this method. It's the // hook that's called when everything's dead def allSoulsReaped(): Unit // Watch and check for termination final def receive = { case WatchMe(ref) => context.watch(ref) watched += ref case Terminated(ref) => watched -= ref if (watched.isEmpty) allSoulsReaped() } }
提供一個求1到n平方和的MapReduce計算。app
import akka.actor.Actor import akka.actor.Props import akka.event.Logging import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf class ServerActor extends Actor { val log = Logging(context.system, this) def receive = { case n: Long => squareSum(n) } private def squareSum(n: Long): Long = { val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val squareSum = sc.parallelize(1L until n).map { i => i * i }.reduce(_ + _) log.info(s"============== The square sum of $n is $squareSum. ==============") squareSum } }
import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.ActorSystem import akka.actor.Props object ServerApp { def main(args: Array[String]): Unit = { val system = ActorSystem("ServerActorSystem") val actor = system.actorOf(Props[ServerActor], name = "serverActor") } }
進入目錄AkkaSampleApp。運行:tcp
sbt package
第一次運行時間會比較長。
$SPARK_HOME/sbin/start-master.sh
master服務,默認會使用
7077
這個端口。能夠經過其日誌文件查看實際的端口號。
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
運行:
$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
若是出現java.lang.NoClassDefFoundError錯誤,
請參照Spark集羣 + Akka + Kafka + Scala 開發(1) : 配置開發環境,
確保akka的包在Spark中設置好了。
注:能夠使用Ctrl+C來中斷這個Server應用。
新啓動一個終端,運行:
java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp # or # $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
而後:看看Server應用是否開始處理了。
Server應用須要Spark的技術,所以,是在Spark環境中運行。
Clinet應用,能夠是一個普通的Java應用。
至此,咱們已經寫好了一個spark集羣+akka+scala的應用。下一步請看:
Spark集羣 + Akka + Kafka + Scala 開發(4) : 開發一個Kafka + Spark的應用