Spark集羣 + Akka + Kafka + Scala 開發(3) : 開發一個Akka + Spark的應用

前言

Spark集羣 + Akka + Kafka + Scala 開發(1) : 配置開發環境中,咱們已經部署好了一個Spark的開發環境。
Spark集羣 + Akka + Kafka + Scala 開發(2) : 開發一個Spark應用中,咱們已經寫好了一個Spark的應用。
本文的目標是寫一個基於akka的scala工程,在一個spark standalone的集羣環境中運行。html

akka是什麼?

akka的做用

akka的名字是action kernel的迴文。根據官方定義:akka用於resilient elastic distributed real-time transaction processing。
我的理解是:
resilient:是指對需求和安全性等方面(來自於外部的)的一種適應力(彈性)。
elastic:是指對資源利用方面的彈性。
所以,akka是一個知足需求彈性、資源分配彈性的分佈式實時事務處理系統。
akka只是一個類庫,一個工具,並無提供一個平臺。java

akka的運行模式和用例

  • akka有兩種運行模式:
    • As a library: 一個使用於web應用,把akka做爲一個普通的jar包放到classpath或者WEB-INF/lib
    • As an application: 也稱爲micro system。
  • akka的用例
    akka的用例不少,能夠參照Examples of use-cases for Akka.

本文中的用例

在本文中,一個Spark + akka的環境裏,akka被用於as an application模式下。
咱們會建立一個akka工程,含有兩個應用:git

  • akka host application
    創建一個actor system, 定義了全部的任務(actors)。等待客戶端的請求。
    部分actor使用了spark的雲計算功能。
    這是一個spark的應用。
  • akka client application
    調用host application上特定的actor。

咱們看出,這裏咱們把akka做爲一個任務處理器,並經過spark來完成任務。github

項目結構和文件說明

說明

這個工程包含了兩個應用。
一個Consumer應用:CusomerApp:實現了經過Spark的Stream+Kafka的技術來實現處理消息的功能。
一個Producer應用:ProducerApp:實現了向Kafka集羣發消息的功能。web

文件結構

AkkaSampleApp    # 項目目錄
|-- build.bat    # build文件    
|-- src
    |-- main
        |-- resources
            |-- application.conf   # Akka Server應用的配置文件
            |-- client.conf        # Akka Client應用的配置文件
        |-- scala
            |-- ClientActor.scala       # Akka Client的Actor:提供了一種調用Server Actor的方式。
            |-- ClientApp.scala         # Akka Client應用
            |-- ProductionReaper.scala  # Akka Shutdown pattern的實現者
            |-- Reaper.scala            # Akka Shutdown pattern的Reaper抽象類
            |-- ServerActor.scala       # Akka Server的Actor,提供一個求1到n的MapReduce計算。使用了Spark。
            |-- ServerApp.scala         # Akka Server應用

構建工程目錄

能夠運行:shell

mkdir AkkaSampleApp
mkdir -p /AkkaSampleApp/src/main/resources
mkdir -p /AkkaSampleApp/src/main/scala

代碼

build.sbt

name := "akka-sample-app"
 
version := "1.0"
 
scalaVersion := "2.11.8"

scalacOptions += "-feature"
scalacOptions += "-deprecation"
scalacOptions += "-language:postfixOps"
 
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-actor" % "2.4.10",
  "com.typesafe.akka" %% "akka-remote" % "2.4.10",
  "org.apache.spark" %% "spark-core" % "2.0.0"
)

resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

application.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

cient.conf

akka {
  #loglevel = "DEBUG"
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
    #log-sent-messages = on
    #log-received-messages = on
  }
}

注:port = 0表示這個端口號會自動生成一個。apache

ClientActor.scala

import akka.actor._
import akka.event.Logging

class ClientActor(serverPath: String) extends Actor {
  val log = Logging(context.system, this)
  val serverActor = context.actorSelection(serverPath)

  def receive = {
    case msg: String =>
        log.info(s"ClientActor received message '$msg'")
        serverActor ! 10000L
  }
}

ClientApp.scala

import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.RemoteScope
import akka.util._

import java.util.concurrent.TimeUnit

import scala.concurrent._
import scala.concurrent.duration._

object ClientApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
    
    // get the remote actor via the server actor system's address
    val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
    val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))

    // invoke the remote actor via a client actor.
    // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
    // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")

    buildReaper(system, actor)

    // tell
    actor ! 10000L
    
    waitShutdown(system, actor)
  }

  private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
    import Reaper._
    val reaper = system.actorOf(Props(classOf[ProductionReaper]))
    
    // Watch the action
    reaper ! WatchMe(actor)
  }

  private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
    // trigger the shutdown operation in ProductionReaper
    system.stop(actor)
    
    // wait to shutdown
    Await.result(system.whenTerminated, 60.seconds)
  }
}

ProductionReaper.scala

當全部的Actor中止後,終止Actor System。安全

class ProductionReaper extends Reaper {
  // Shutdown
  def allSoulsReaped(): Unit = {
    context.system.terminate()
  }
}

Reaper.scala

import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer

object Reaper {
  // Used by others to register an Actor for watching
  case class WatchMe(ref: ActorRef)
}

abstract class Reaper extends Actor {
  import Reaper._

  // Keep track of what we're watching
  val watched = ArrayBuffer.empty[ActorRef]

  // Derivations need to implement this method.  It's the
  // hook that's called when everything's dead
  def allSoulsReaped(): Unit

  // Watch and check for termination
  final def receive = {
    case WatchMe(ref) =>
      context.watch(ref)
      watched += ref
    case Terminated(ref) =>
      watched -= ref
      if (watched.isEmpty) allSoulsReaped()
  }
}

ServerActor.scala

提供一個求1到n平方和的MapReduce計算。app

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

class ServerActor extends Actor {
  val log = Logging(context.system, this)

  def receive = {
    case n: Long =>
        squareSum(n)
  }

  private def squareSum(n: Long): Long = {
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)

    val squareSum = sc.parallelize(1L until n).map { i => 
      i * i
    }.reduce(_ + _)

    log.info(s"============== The square sum of $n is $squareSum. ==============")

    squareSum
  }
}

ServerApp.scala

import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props

object ServerApp {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("ServerActorSystem")
    val actor = system.actorOf(Props[ServerActor], name = "serverActor")
  }
}

構建工程

進入目錄AkkaSampleApp。運行:tcp

sbt package

第一次運行時間會比較長。

測試應用

啓動Spark服務

  • 啓動spark集羣master server
$SPARK_HOME/sbin/start-master.sh

master服務,默認會使用7077這個端口。能夠經過其日誌文件查看實際的端口號。

  • 啓動spark集羣slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

啓動Akka Server應用

運行:

$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

若是出現java.lang.NoClassDefFoundError錯誤,
請參照Spark集羣 + Akka + Kafka + Scala 開發(1) : 配置開發環境
確保akka的包在Spark中設置好了。
注:能夠使用Ctrl+C來中斷這個Server應用。

啓動Akka Client應用

新啓動一個終端,運行:

java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp
# or
# $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

而後:看看Server應用是否開始處理了。

總結

Server應用須要Spark的技術,所以,是在Spark環境中運行。
Clinet應用,能夠是一個普通的Java應用。

下面請看

至此,咱們已經寫好了一個spark集羣+akka+scala的應用。下一步請看:
Spark集羣 + Akka + Kafka + Scala 開發(4) : 開發一個Kafka + Spark的應用

參照

相關文章
相關標籤/搜索