大數據學習——akka自定義RPC

 

 

實現java

package cn.itcast.akka

import akka.actor.{Actor, ActorSystem, Props}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory

import scala.collection.mutable

import scala.concurrent.duration._

class Master(val host: String, val port: Int) extends Actor {

  //保存WorkerID 到 WorkerInfo的映射
  val idToWorker = new mutable.HashMap[String, WorkerInfo]()
  //保存所的WorkerInfo信息
  val workers = new mutable.HashSet[WorkerInfo]()

  val CHECK_INTERVAL = 15000

  override def preStart(): Unit = {
    //導入隱式轉換
    import context.dispatcher
    context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)
  }

  override def receive: Receive = {
    //Worker發送個Mater的註冊消息
    case RegisterWorker(workerId, cores, memory) => {
      if (!idToWorker.contains(workerId)) {
        //封裝worker發送的信息
        val workerInfo = new WorkerInfo(workerId, cores, memory)
        //保存workerInfo
        idToWorker(workerId) = workerInfo
        workers += workerInfo
        //Master向Worker反饋註冊成功的消息
        sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}@$host:$port/user/${Master.MASTER_NAME}")
      }
    }

    //Worker發送給Master的心跳信息
    case Heartbeat(workerId) => {
      if (idToWorker.contains(workerId)) {
        val workerInfo = idToWorker(workerId)
        val currentTime = System.currentTimeMillis()
        //更新上一次心跳時間
        workerInfo.lastHeartbeatTime = currentTime
      }
    }

    //檢測超時的Worker
    case CheckTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val deadWorkers: mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
      //      for(w <- deadWorkers) {
      //        idToWorker -= w.id
      //        workers -= w
      //      }
      deadWorkers.foreach(w => {
        idToWorker -= w.id
        workers -= w
      })
      println("alive worker size : " + workers.size)
    }
  }
}

object Master {

  val MASTER_SYSTEM = "MaterActorSystem"
  val MASTER_NAME = "Master"

  def main(args: Array[String]) {

    //    val host = args(0)
    //    val port = args(1).toInt
    val host = "127.0.0.1"
    val port = 8888
    val confStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val conf = ConfigFactory.parseString(confStr)
    //ActorSystem是單例的,用於建立Acotor並監控actor
    val actorSystem = ActorSystem(MASTER_SYSTEM, conf)
    //經過ActorSystem建立Actor
    actorSystem.actorOf(Props(new Master(host, port)), MASTER_NAME)
    actorSystem.awaitTermination()

  }
}
package cn.itcast.akka

trait Message extends Serializable

//Worker -> Master
case class RegisterWorker(id: String, cores: Int, memory: Int) extends Message

//Master -> Worker
case class RegisteredWorker(masterUrl: String) extends Message

//Worker -> Master
case class Heartbeat(id: String) extends Message

//Worker internal message
case object SendHeartbeat

//Master internal message
case object CheckTimeOutWorker
package cn.itcast.akka

import java.util.UUID
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._

class Worker(val cores: Int, val memory: Int, val masterHost: String, val masterPort: Int) extends Actor {

  //Master的引用
  var master: ActorSelection = _
  //Worker的ID
  val workerId = UUID.randomUUID().toString
  //masterUrl
  var masterUrl: String = _

  val HEARTBEAT_INTERVAL = 10000

  //preStart在構造器以後receive以前執行
  override def preStart(): Unit = {
    //首先跟Master創建鏈接
    master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_NAME}")
    //經過master的引用向Master發送註冊消息
    master ! RegisterWorker(workerId, cores, memory)
  }

  override def receive: Receive = {
    //Master發送給Worker註冊成功的消息
    case RegisteredWorker(masterUrl) => {
      this.masterUrl = masterUrl
      //啓動定時任務,向Master發送心跳
      //導入隱式轉換
      import context.dispatcher
      context.system.scheduler.schedule(0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat)
    }

    case SendHeartbeat => {
      //向Master發送心跳
      master ! Heartbeat(workerId)
    }
  }
}

object Worker {
  def main(args: Array[String]) {

    //Worker的地址和端口
    //    val host = args(0)
    //    val port = args(1).toInt
    //    val cores = args(2).toInt
    //    val memory = args(3).toInt
    val host = "127.0.0.1"
    val port = 9999
    val cores = 8
    val memory = 1024

    //Master的地址和端口
    //    val masterHost = args(4)
    //    val masterPort = args(5).toInt
    val masterHost = "127.0.0.1"
    val masterPort = 8888

    val confStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val conf = ConfigFactory.parseString(confStr)
    //單例的ActorSystem
    val actorSystem = ActorSystem("WorkerActorSystem", conf)
    //經過actorSystem來建立Actor
    val worker = actorSystem.actorOf(Props(new Worker(cores, memory, masterHost, masterPort)), "Worker")
    actorSystem.awaitTermination()
  }
}
package cn.itcast.akka


class WorkerInfo(val id: String, val cores: Int, val memory: Int) {

  //TODO
  var lastHeartbeatTime: Long = _

}

 

相關文章
相關標籤/搜索