package com.vdncloud.las.da.akkajava
import java.util.concurrent.TimeUnit算法
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.routing.RoundRobinRouterapp
import scala.concurrent.duration._異步
/**
*
* Calculate – 發送給 主 actor 來啓動計算。
* Work – 從 主 actor 發送給各 工做 actor,包含工做分配的內容。
* Result – 從 工做 actors 發送給 主 actor,包含工做actor的計算結果。
* PiApproximation – 從 主 actor發送給 監聽器 actor,包含pi的最終計算結果和整個計算耗費的時間。
*
* 發送給actor的消息應該永遠是不可變的,以免共享可變狀態。
* 在scala裏咱們有 ‘case classes’ 來構造完美的消息。
* 如今讓咱們用case class建立3種消息。
* 咱們還爲消息們建立一個通用的基礎trait(定義爲sealed以防止在咱們不可控的地方建立消息):
*/
sealed trait PiMessageide
case object Calculate extends PiMessagethis
case class Work(start: Int, nrOfElements: Int) extends PiMessage.net
case class Result(value: Double) extends PiMessage命令行
case class PiApproximation(pi: Double, duration: Duration) {
override def toString: String = "Pi approximation: \t\t%s\tCalculation time: \t%s".format(pi, duration)
}scala
/**
* 如今建立工做 actor。 方法是混入 Actor trait 並定義其中的 receive 方法.
* receive 方法定義咱們的消息處理器。咱們讓它可以處理 Work 消息,因此添加一個針對這種消息的處理器:
*/
class Worker extends Actor {
/**
* 能夠看到咱們如今建立了一個 Actor 和一個 receive 方法做爲 Work 消息的處理器.
* 在這個處理器中咱們調用calculatePiFor(..) 方法, 將結果包在 Result 消息裏並使用sender異步發送回消息的原始發送者。
* 在Akka裏,sender引用是與消息一塊兒隱式發送的,這樣接收者能夠隨時回覆或將sender引用保存起來以備未來使用。
*
* 如今在咱們的 Worker actor 中惟一缺乏的就是實現 calculatePiFor(..) 方法。
* 雖然在Scala裏咱們能夠有不少方法來實現這個算法,在這個入門指南中咱們選擇了一種命令式的風格,使用了for寫法和一個累加器:
*/
def calculatePiFor(start: Int, nrOfElements: Int): Double = {
var acc = 0.0
for (i ← start until (start + nrOfElements))
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1)
acc
}orm
def receive = {
case Work(start, nrOfElements) ⇒
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
/**
* 如今咱們有了一個路由,能夠在一個單一的抽象中表達全部的工做actor。如今讓咱們建立主actor. 傳遞給它三個整數變量:
*
* 咱們還缺乏 主 actor的消息處理器. 這個處理器須要可以對兩種消息進行響應:
* Calculate – 用來啓動計算過程
* Result – 用來彙總不一樣的計算結果
*
* @param nrOfWorkers – 定義咱們會啓動多少工做actor
* @param nrOfMessages – 定義會有多少整數段發送給工做actor
* @param nrOfElements – 定義發送給工做actor的每一個整數段的大小
* @param replyTo – 用來向外界報告最終的計算結果。
*/
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, replyTo: ActorRef)
extends Actor {
var pi: Double = _
var nrOfResults: Int = _
val start: Long = System.currentTimeMillis
/**
* 主actor會稍微複雜一些。
* 在它的構造方法裏咱們建立一個round-robin的路由器來簡化將工做平均地分配給工做actor們的過程,先作這個:
*/
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
// val workerRouter = context.actorOf(Props[Worker].withRouter(FromConfig()), "workerRouter")
/**
* Calculate 處理器會經過其路由器向全部的工做 actor 發送工做內容.
* Result 處理器從 Result 消息中獲取值並彙總到咱們的 pi 成員變量中.
*
* 咱們還會記錄已經接收的結果數據的數量,它是否與發送出去的任務數量一致 .
* 主 actor 發現計算完成了,會將最終結果發送給監聽者.
* 當整個過程都完成了,它會調用 context.stop(self) 方法來終止本身 和 它所監管的全部actor.
* 在本例中,主actor監管一個actor,咱們的路由器,而路由器監管着全部 nrOfWorkers 個工做actors.
* 全部的actor都會在其監管者的stop方法被調用時自動終止,並會傳遞給全部它監管的子actor。
*/
def receive = {
case Calculate ⇒
for (i ← 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
case Result(value) ⇒
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) {
// Send the result to the listener
val pa = PiApproximation(pi, duration = Duration(System.currentTimeMillis - start, TimeUnit.MILLISECONDS))
replyTo ! pa
// sender ! pa // not use sender
// Stops this actor and all its supervised children
//context.stop(self)
context.system.shutdown()
}
}
}
/**
* 建立計算結果監聽者
* 監聽者很簡單,當它接收到從 Master發來的PiApproximation ,就將結果打印出來並關閉整個 Actor系統。
*/
class Listener extends Actor {
def receive = {
// PiApproximation(pi, duration)
case a: PiApproximation ⇒
println("Listener " + a)
// context.system.terminate()
context.system.shutdown()
}
}
/**
* Created by hdfs on 17-2-21.
* ref:http://blog.csdn.net/beijicy/article/details/50587180
*
*/
object TestAkka {
def main(args: Array[String]): Unit = {
testAkkConcurrent()
}
/**
* 如今只剩下實現啓動和運行計算的執行者了。
* 咱們建立一個調用 Pi的對象, 這裏咱們能夠繼承Scala中的 Apptrait, 這個trait使咱們可以在命令行上直接運行這個應用.
*/
def testAkkConcurrent(): Unit = {
//Pi 對象是咱們的actor和消息的很好的容器。
// 因此咱們把它們都放在這兒。咱們還建立一個 calculate 方法來啓動 主 actor 並等待它結束:
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
}
def testFuture(): Unit = {
val s = "Hello"
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
val f: Future[String] = Future {
s + " future!"
}
f onSuccess {
case msg => println(msg)
}
println("done future" + s)
}
/**
* calculate 方法建立一個 Actor系統,這是包括全部建立出的actor的 「上下文」。
* 如何在容器中建立actor的例子在calculate方法的 ‘system.actorOf(...)’ 這一行。
* 這裏咱們建立兩個頂級actor. 若是你是在一個actor上下文(i.e. 在一個建立其它actor的actor中),
* 你應該使用 context.actorOf(...). 這在以上的主actor代碼中有所體現。
*
* @param nrOfWorkers
* @param nrOfElements
* @param nrOfMessages
*/
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// Create an Akka system
val system = ActorSystem("PiSystem")
// val system = ActorSystem("MasterApp", ConfigFactory.load.getConfig("multiThread"))
// create the result listener, which will print the result and shutdown the system
val listener = system.actorOf(Props[Listener], name = "listener")
// create the master
val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener)),
name = "master")
master ! Calculate
/*
system.actorOf(Props(new Actor() {
val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, self)),
name = "master")
master ! Calculate
def receive = {
case a: PiApproximation ⇒
println("calculate " + a)
context.system.shutdown()
}
*/
/*
import scala.concurrent.ExecutionContext.Implicits.global
val replyFuture = Future {
master ! Calculate
}
replyFuture onSuccess {
case msg => println("msg:" + msg)
}
*/
// }))
}}