Scala編程實戰

 

項目概述

需求

目前大多數的分佈式架構底層通訊都是經過RPC實現的,RPC框架很是多,好比前咱們學過的Hadoop項目的RPC通訊框架,可是Hadoop在設計之初就是爲了運行長達數小時的批量而設計的,在某些極端的狀況下,任務提交的延遲很高,因此Hadoop的RPC顯得有些笨重。html

Spark 的RPC是經過Akka類庫實現的,Akka用Scala語言開發,基於Actor併發模型實現,Akka具備高可靠、高性能、可擴展等特色,使用Akka能夠輕鬆實現分佈式RPC功能。java

Akka簡介

友情連接:  Actors介紹:   https://www.iteblog.com/archives/1154.html編程

Akka基於Actor模型,提供了一個用於構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平臺。架構

Actor模型:在計算機科學領域,Actor模型是一個並行計算(Concurrent Computation)模型,它把actor做爲並行計算的基本元素來對待:爲響應一個接收到的消息,一個actor可以本身作出一些決策,如建立更多的actor,或發送更多的消息,或者肯定如何去響應接收到的下一個消息。併發

 

 

Actor是Akka中最核心的概念,它是一個封裝了狀態和行爲的對象,Actor之間能夠經過交換消息的方式進行通訊,每一個Actor都有本身的收件箱(Mailbox)。經過Actor可以簡化鎖及線程管理,能夠很是容易地開發出正確地併發程序和並行系統,Actor具備以下特性:框架

 

  (1)、提供了一種高級抽象,可以簡化在併發(Concurrency)/並行(Parallelism)應用場景下的編程開發dom

(2)、提供了異步非阻塞的、高性能的事件驅動編程模型異步

(3)、超級輕量級事件處理(每GB堆內存幾百萬Actor)tcp

 

項目實現

實戰一:

利用Akkaactor編程模型,實現2個進程間的通訊。分佈式

架構圖

 

重要類介紹

ActorSystem在Akka中,ActorSystem是一個重量級的結構,他須要分配多個線程,因此在實際應用中,ActorSystem一般是一個單例對象,咱們能夠使用這個ActorSystem建立不少Actor。

注意

(1)、ActorSystem是一個進程中的老大,它負責建立和監督actor

(2)、ActorSystem是一個單例對象

(3)、actor負責通訊

 

Actor

在Akka中,Actor負責通訊,在Actor中有一些重要的生命週期方法。

(1)preStart()方法:該方法在Actor對象構造方法執行後執行,整個Actor生命週期中僅執行一次。

(2)receive()方法:該方法在Actor的preStart方法執行完成後執行,用於接收消息,會被反覆執行。

具體代碼

① Master

 

package cn.itcast.rpc
  
  import akka.actor.{Actor, ActorRef, ActorSystem, Props}
  import com.typesafe.config.ConfigFactory
  
  //todo:利用akka的actor模型實現2個進程間的通訊-----Master端  
  class Master  extends Actor{
  //構造代碼塊先被執行
  println("master constructor invoked")
  
  //prestart方法會在構造代碼塊執行後被調用,而且只被調用一次
  override def preStart(): Unit = {
    println("preStart method invoked")
  }
  
  //receive方法會在prestart方法執行後被調用,表示不斷的接受消息
  override def receive: Receive = {
    case "connect" =>{
      println("a client connected")

      //master發送註冊成功信息給worker
      sender ! "success"
    }
  }
}

  object Master{
  def main(args: Array[String]): Unit = {
    //master的ip地址
    val host=args(0)
    //master的port端口
    val port=args(1)
  
    //準備配置文件信息
    val configStr=
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
      """.stripMargin
  
    //配置config對象 利用ConfigFactory解析配置文件,獲取配置信息
    val config=ConfigFactory.parseString(configStr)
  
      // 一、建立ActorSystem,它是整個進程中老大,它負責建立和監督actor,它是單例對象
    val masterActorSystem = ActorSystem("masterActorSystem",config)
     // 二、經過ActorSystem來建立master actor
      val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
    // 三、向master actor發送消息
    //masterActor ! "connect"
  }
}

 

 


  

② Worker

 

package cn.itcast.rpc
 
  import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
  import com.typesafe.config.ConfigFactory

  //todo:利用akka中的actor實現2個進程間的通訊-----Worker端
  class Worker  extends Actor{
  println("Worker constructor invoked")
  
  //prestart方法會在構造代碼塊以後被調用,而且只會被調用一次
  override def preStart(): Unit = {
      println("preStart method invoked")

    //獲取master actor的引用
    //ActorContext全局變量,能夠經過在已經存在的actor中,尋找目標actor
    //調用對應actorSelection方法,
    // 方法須要一個path路徑:一、通訊協議、二、master的IP地址、三、master的端口 四、建立master actor老大 五、actor層級
    val master: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@172.16.43.63:8888/user/masterActor")

    //向master發送消息
    master ! "connect"  
  }
  
  //receive方法會在prestart方法執行後被調用,不斷的接受消息
  override def receive: Receive = {
    case "connect" =>{
      println("a client connected")
    }

    case "success" =>{
      println("註冊成功")
    }
  }
}

  object Worker{
  def main(args: Array[String]): Unit = {
    //定義worker的IP地址
    val host=args(0)
    //定義worker的端口
    val port=args(1)

    //準備配置文件
    val configStr=
      s"""
        |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
        |akka.remote.netty.tcp.hostname = "$host"
        |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //經過configFactory來解析配置信息
    val config=ConfigFactory.parseString(configStr)

     // 一、建立ActorSystem,它是整個進程中的老大,它負責建立和監督actor
    val workerActorSystem = ActorSystem("workerActorSystem",config)
    // 二、經過actorSystem來建立 worker actor
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker),"workerActor")

    //向worker actor發送消息
    workerActor ! "connect"
  }
}

 

③ 運行

  使用idea開發工具,配置參數時,多個參數之間用空格隔開

 

啓動Master

 

啓動Worker

 


  

實戰二

使用Akka實現一個簡易版的spark通訊框架 

架構圖

 

具體代碼

① Master

 

package cn.itcast.spark

  import akka.actor.{Actor, ActorRef, ActorSystem, Props}
  import com.typesafe.config.ConfigFactory
  import scala.collection.mutable
  import scala.collection.mutable.ListBuffer
  import scala.concurrent.duration._

  //todo:利用akka實現簡易版的spark通訊框架-----Master端
  class Master  extends Actor{
  //構造代碼塊先被執行
  println("master constructor invoked")

  //定義一個map集合,用於存放worker信息
  private val workerMap = new mutable.HashMap[String,WorkerInfo]()

  //定義一個list集合,用於存放WorkerInfo信息,方便後期按照worker上的資源進行排序
  private val workerList = new ListBuffer[WorkerInfo]

  //master定時檢查的時間間隔
  val CHECK_OUT_TIME_INTERVAL=15000 //15秒

  //prestart方法會在構造代碼塊執行後被調用,而且只被調用一次
  override def preStart(): Unit = {
    println("preStart method invoked")

      //master定時檢查超時的worker
    //須要手動導入隱式轉換
    import context.dispatcher
    context.system.scheduler.schedule(0 millis,CHECK_OUT_TIME_INTERVAL millis,self,CheckOutTime)
  }

  //receive方法會在prestart方法執行後被調用,表示不斷的接受消息
  override def receive: Receive = {
    //master接受worker的註冊信息
    case RegisterMessage(workerId,memory,cores) =>{
        //判斷當前worker是否已經註冊
      if(!workerMap.contains(workerId)){
        //保存信息到map集合中
        val workerInfo = new WorkerInfo(workerId,memory,cores)
        workerMap.put(workerId,workerInfo)

        //保存workerinfo到list集合中
        workerList +=workerInfo
  
        //master反饋註冊成功給worker
        sender ! RegisteredMessage(s"workerId:$workerId 註冊成功")
      }
    }

      //master接受worker的心跳信息
    case SendHeartBeat(workerId)=>{
      //判斷worker是否已經註冊,master只接受已經註冊過的worker的心跳信息
      if(workerMap.contains(workerId)){
        //獲取workerinfo信息
        val workerInfo: WorkerInfo = workerMap(workerId)

        //獲取當前系統時間
        val lastTime: Long = System.currentTimeMillis()
  
        workerInfo.lastHeartBeatTime=lastTime
      }
    }

    case CheckOutTime=>{
      //過濾出超時的worker 判斷邏輯: 獲取當前系統時間 - worker上一次心跳時間 >master定時檢查的時間間隔
        val outTimeWorkers: ListBuffer[WorkerInfo] = workerList.filter(x => System.currentTimeMillis() -x.lastHeartBeatTime > CHECK_OUT_TIME_INTERVAL)
      //遍歷超時的worker信息,而後移除掉超時的worker
      for(workerInfo <- outTimeWorkers){
        //獲取workerid
        val workerId: String = workerInfo.workerId
        //從map集合中移除掉超時的worker信息
        workerMap.remove(workerId)
        //從list集合中移除掉超時的workerInfo信息
        workerList -= workerInfo
        println("超時的workerId:" +workerId)
      }
      println("活着的worker總數:" + workerList.size)
  
      //master按照worker內存大小進行降序排列
     println(workerList.sortBy(x => x.memory).reverse.toList)
    }
  }
}

  object Master{
  def main(args: Array[String]): Unit = {
    //master的ip地址
    val host=args(0)
    //master的port端口
    val port=args(1)
  
    //準備配置文件信息
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    //配置config對象 利用ConfigFactory解析配置文件,獲取配置信息
    val config=ConfigFactory.parseString(configStr)
  
    // 一、建立ActorSystem,它是整個進程中老大,它負責建立和監督actor,它是單例對象
    val masterActorSystem = ActorSystem("masterActorSystem",config)
    // 二、經過ActorSystem來建立master actor
    val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor")
    // 三、向master actor發送消息
    //masterActor ! "connect"
  }
}

 


② Worker

package cn.itcast.spark

  import java.util.UUID
  import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
  import com.typesafe.config.ConfigFactory
  import scala.concurrent.duration._

  //todo:利用akka實現簡易版的spark通訊框架-----Worker端
  class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String)  extends Actor{
  println("Worker constructor invoked")

  //定義workerId
  private val workerId: String = UUID.randomUUID().toString

  //定義發送心跳的時間間隔
  val SEND_HEART_HEAT_INTERVAL=10000  //10秒
 
  //定義全局變量
  var master: ActorSelection=_
  
  //prestart方法會在構造代碼塊以後被調用,而且只會被調用一次
  override def preStart(): Unit = {
    println("preStart method invoked")
    //獲取master actor的引用
    //ActorContext全局變量,能夠經過在已經存在的actor中,尋找目標actor
    //調用對應actorSelection方法,
    // 方法須要一個path路徑:一、通訊協議、二、master的IP地址、三、master的端口 四、建立master actor老大 五、actor層級
     master= context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor")

    //向master發送註冊信息,將信息封裝在樣例類中,主要包含:workerId,memory,cores
    master ! RegisterMessage(workerId,memory,cores)
  }  

  //receive方法會在prestart方法執行後被調用,不斷的接受消息
  override def receive: Receive = {
    //worker接受master的反饋信息
    case RegisteredMessage(message) =>{
      println(message)

      //向master按期的發送心跳
      //worker先本身給本身發送心跳
      //須要手動導入隱式轉換
      import context.dispatcher
      context.system.scheduler.schedule(0 millis,SEND_HEART_HEAT_INTERVAL millis,self,HeartBeat)
    }

      //worker接受心跳
    case HeartBeat =>{
      //這個時候纔是真正向master發送心跳
      master ! SendHeartBeat(workerId)
    }
  }
}
 
  object Worker{
  def main(args: Array[String]): Unit = {
    //定義worker的IP地址
    val host=args(0)
    //定義worker的端口
    val port=args(1) 
    //定義worker的內存
    val memory=args(2).toInt
    //定義worker的核數
    val cores=args(3).toInt
    //定義master的ip地址
    val masterHost=args(4)
    //定義master的端口
    val masterPort=args(5)

    //準備配置文件
    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin 

    //經過configFactory來解析配置信息
    val config=ConfigFactory.parseString(configStr)
    // 一、建立ActorSystem,它是整個進程中的老大,它負責建立和監督actor
    val workerActorSystem = ActorSystem("workerActorSystem",config)
    // 二、經過actorSystem來建立 worker actor
    val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores,masterHost,masterPort)),"workerActor")

    //向worker actor發送消息
    workerActor ! "connect"
  }
}

 

  

③ WorkerInfo

package cn.itcast.spark

  //封裝worker信息
  class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) {
        //定義一個變量用於存放worker上一次心跳時間
      var lastHeartBeatTime:Long=_

  override def toString: String = {
    s"workerId:$workerId , memory:$memory , cores:$cores"
  }
}

 


④ 
樣例類 

 

package cn.itcast.spark

  trait RemoteMessage  extends Serializable{}

  //worker向master發送註冊信息,因爲不在同一進程中,須要實現序列化
  case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage

  //master反饋註冊成功信息給worker,因爲不在同一進程中,也須要實現序列化
  case class RegisteredMessage(message:String) extends RemoteMessage

  //worker向worker發送心跳 因爲在同一進程中,不須要實現序列化
  case object HeartBeat

  //worker向master發送心跳,因爲不在同一進程中,須要實現序列化
  case class SendHeartBeat(val workerId:String) extends RemoteMessage

  //master本身向本身發送消息,因爲在同一進程中,不須要實現序列化
  case object CheckOutTime

 

⑤ 運行

配置參數時,多個參數之間用空格隔開

 

 

首先啓動Master_Spark 

 

 啓動work_spark-01

 

 

 

  啓動work_spark-02,而後關閉

 

 

 觀察Master_Spark 輸出

 

相關文章
相關標籤/搜索