Scala的actor提供了一種基於事件的輕量級線程。只要使用scala.actors.Actor伴生對象的actor方法,就能夠建立一個actor。它接受一個函數值/閉包作參數,一建立好就開始運行。用!()方法給actor發消息,用receive()方法從actor接收消息。receive()也能夠閉包爲參數,一般用模式匹配處理接收到的消息。react
Scala的actor提供了一種基於事件的輕量級線程。只要使用scala.actors.Actor伴生對象的actor方法,就能夠建立一個actor。它接受一個函數值/閉包作參數,一建立好就開始運行。用!()方法給actor發消息,用receive()方法從actor接收消息。receive()也能夠閉包爲參數,一般用模式匹配處理接收到的消息。編程
咱們看個例子,假定咱們須要斷定一個給定的數是不是徹底數(徹底數是一個正整數,其因子之和是該數的兩倍):安全
非併發編程的實現:閉包
def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) println("6 is perfect? " + isPerfect(6)) println("33550336 is perfect? " + isPerfect(33550336)) println("33550337 is perfect? " + isPerfect(33550337))
併發編程的實現,將從1到candidate數這個範圍內的數劃分紅多個區間,把每一個區間內求和的任務分配給單獨的進程。併發
import scala.actors.Actor._ class FasterPerfectNumberFinder { def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } println("6 is perfect? " + isPerfectConcurrent(6)) println("33550336 is perfect? " + isPerfectConcurrent(33550336)) println("33550337 is perfect? " + isPerfectConcurrent(33550337)) } object FasterPerfectNumberFinder extends App{ new FasterPerfectNumberFinder() }
程序運行結果以下:less
6 is perfect? true 33550336 is perfect? true 33550337 is perfect? false
比較兩種方法用時的程序以下:異步
import scala.actors.Actor._ class FindPerfectNumberOverRange { //普通實現 def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) //併發實現 def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } //比較時間花費 def countPerfectNumbersInRange(start:Int, end:Int, isPerfectFinder:Int => Boolean)={ val startTime = System.nanoTime() val numberOfPerfectNumbers = (0 /: (start to end)){(count, candidate) => if(isPerfectFinder(candidate)) count + 1 else count } val endTime = System.nanoTime() println("Found " + numberOfPerfectNumbers + " perfect numbers in given range, took " + (endTime-startTime)/1000000000.0 + " secs") } } object FindPerfectNumberOverRange extends App{ val fpn = new FindPerfectNumberOverRange() val startNumber = 33550300 val endNumber = 33550400 fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfect) fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfectConcurrent) }
程序運行結果以下:ide
Found 1 perfect numbers in given range, took 53.505288657 secs Found 1 perfect numbers in given range, took 35.739131734 secs
下面看一下消息是如何從一個actor傳到另外一個actor。函數
import scala.actors.Actor._ class MessagePassing { var startTime : Long = 0 val caller = self val engrossedActor = actor { println("Number of messages received so far? " + mailboxSize) caller ! "send" Thread.sleep(3000) println("Number of messages received while I was busy? " + mailboxSize) receive { case msg => val receivedTime = System.currentTimeMillis() - startTime println("Received message " + msg + "after " + receivedTime + " ms") } caller ! "received" } receive { case _ =>} println("Sending Message ") startTime = System.currentTimeMillis() engrossedActor ! "hello buddy" val endTime = System.currentTimeMillis() - startTime printf("Took less than %dms to send message\n", endTime) receive { case _ => } } object MessagePassing extends App { new MessagePassing() }
程序運行結果以下:oop
Number of messages received so far? 0 Sending Message Took less than 0ms to send message Number of messages received while I was busy? 0 Received message hello buddyafter 2997 ms
從輸出能夠看出,發送不阻塞,接收不中斷。在actor調用receive()方法接收以前,消息會一直等在那裏。
異步地發送和接收消息是一項好的實踐——能夠最大限度的利用併發。不過,若是對同步的發送消息和接收響應有興趣,能夠用!?()方法。在接收發消息的目標actor給出響應以前,她會一直阻塞在那裏。這會引發潛在的死鎖。一個已經失敗的actor會致使其餘actor的失敗,而後就輪到應用失敗了。因此,即使要用這個方法,至少要用有超時參數的變體,像這樣:
package com.cn.gao import scala.actors._ import Actor._ class AskFortune { val fortuneTeller = actor { for(i <- 1 to 4) { Thread.sleep(1000); receive { case _ => sender ! "your day will rock! "+ i //case _ => reply("your day will rock! " + i) // same as above } } } println(fortuneTeller !? (2000, "what's ahead")) println(fortuneTeller !? (500, "what's ahead")) val aPrinter = actor { receive { case msg => println("Ah, fortune message for you-"+ msg)} } fortuneTeller.send("What's up", aPrinter) fortuneTeller ! "How's my future?" Thread.sleep(3000) receive{ case msg : String => println("Received "+ msg)} println("Let's get that lost message") receive { case !(channel,msg) => println("Received belated message "+ msg)} } object AskFortune extends App{ new AskFortune() }
在超時以前,若是actor發送回消息,!?()方法就會返回結果。不然,它會返回None,因此,這個方法的返回類型是Option[Any]。在上面的代碼中,sender所引用的是最近一個發送消息的actor。程序運行結果以下:
Some(your day will rock! 1) None Ah, fortune message for you-your day will rock! 3 Received your day will rock! 4 Let's get that lost message Received belated message your day will rock! 2
若是想在actor啓動時進行顯式控制,但願在actor裏存入更多信息,能夠建立一個對象,混入Actor trait。這是對的——Scala的Actor只是個trait,能夠在任何喜歡的地方混入它。下面是個例子:
AnsweringService.scala
package com.cn.gao import scala.actors._ import Actor._ class AnsweringService(val folks:String*) extends Actor { def act(){ while(true){ receive{ case(caller: Actor, name:String, msg:String) => caller ! ( if(folks.contains(name)) String.format("Hey it's %s got message %s", name, msg) else String.format("Hey there's no one with the name %s here",name) ) case "ping" => println("ping!") case "quit" => println("existing actor") exit } } } } object AnsweringService extends App{ val answeringService1 = new AnsweringService("Sara", "Kara", "John") answeringService1 ! (self, "Sara", "In town") answeringService1 ! (self, "Kara", "Go shopping?") answeringService1.start() answeringService1 ! (self, "John", "Bug fixed?") answeringService1 ! (self, "Bill", "What's up") for(i <- 1 to 4) { receive { case msg => println(msg)}} answeringService1 ! "ping" answeringService1 ! "quit" answeringService1 ! "ping" Thread.sleep(2000) println("The last ping was not processed") }
程序運行結果以下:
Hey it's Sara got message In town Hey it's Kara got message Go shopping? Hey it's John got message Bug fixed? Hey there's no one with the name Bill here ping! existing actor The last ping was not processed
開始,咱們給actor發送了一些元組消息。這些消息不會當即獲得處理,由於actor尚未啓動。它們會進入隊列,等待後續處理。而後調用start()方法,再發送一些消息。只要調用了start()方法,就會有一個單獨的線程調用actor的act()方法。這時,曾經發出去的全部消息都開始進行處理。而後,咱們循環接收對方發出的四條消息的應答。
調用exit()方法能夠中止actor。不過這個方法只是拋出異常,試圖終止當前線程的執行,因此,在act()方法裏調用挺不錯。
若是對顯式啓動actor並不真的那麼關注,那麼可使用actor()方法。在actor間傳遞數據,能夠用!()和receive()方法。下面從一個使用actor方法的例子開始,而後重構,使其併發。
這個方法isPrime()告訴咱們給定的數是否是素數。爲了達到說明的目的,我在方法里加了一些打印語句:
package com.cn.gao import scala.actors._ import Actor._ class PrimeTeller { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } }
調用上面這段代碼的話,接收到應答以前,就會阻塞在那裏。以下所示,這裏把調用這個方法的職責委託給一個actor。這個actor會肯定一個數是不是素數,而後,用一個異步響應發回給調用者。
package com.cn.gao import scala.actors._ import Actor._ object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive { case (caller: Actor, number:Int) => caller ! (number, isPrime(number)) case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit" }
primeTeller是一個引用,它指向了用actor()方法建立的一個匿名actor。它會不斷循環,直到接收到「quit」消息。除了退出消息,它還能接收一個包含caller和number的元組。收到這個消息時,它會判斷給定的數是不是素數,而後,給caller發回一個消息。
程序運行結果以下:
Going to find if 2 is prime done finding if 2 is prime Going to find if 131 is prime 2is prime? true done finding if 131 is prime Going to find if 132 is prime 131is prime? true done finding if 132 is prime 132is prime? false
上面的代碼處理了接收到的每一個數字;從輸出能夠看到這一點。在actor忙於判斷一個數是不是素數時,若是又接收到多個請求,它們就會進入隊列。所以,即使是將執行委託給了actor,它依然是順序的。
讓這個例子並行至關容易,在PrimeTeller actor的第6行,不要去調用isPrime(),而是把這個職責委託給另外一個actor,讓它給調用者回復應答,程序以下:
package com.cn.gao import scala.actors._ import Actor._ object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive { // case (caller: Actor, number:Int) => caller ! (number, // isPrime(number)) case (caller: Actor, number:Int) => actor {caller ! (number, isPrime(number))} case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit" }
再次運行上面的代碼,咱們會看到,多個請求併發地執行了,以下所示:
Going to find if 2 is prime Going to find if 131 is prime Going to find if 132 is prime done finding if 132 is prime done finding if 2 is prime done finding if 131 is prime 132is prime? false 131is prime? true 2is prime? true
receive()接收一個函數值/閉包,返回一個處理消息的應答。下面是個從receive()方法接收結果的例子:
package com.cn.gao import scala.actors.Actor._ object Receive extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receive { case number:Int => number case "quit" => continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 accumulator ! "quit" receive{case result => println("Total is " + result)} }
accumulator接收數字,對傳給它的數字求和。完成以後,它會發回一個消息,帶有求和的結果。上面代碼的輸出以下:
Total is 16
調用receive()方法會形成程序阻塞,直到實際接收到應答爲止。若是預期的actor應答一直沒有發過來就麻煩了。這會讓咱們一直等下去。用receiveWithin()方法修正這一點,它會接收一個timeout參數,以下:
package com.cn.gao import scala.actors._ import scala.actors.Actor._ object ReceiveWithin extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receiveWithin(1000) { case number:Int => number case TIMEOUT => println("Time out! Will return result now") continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(2000) { case result => println("Total is " + result) } }
在給定的超時期限內,若是什麼都沒有收到,receiveWithin()方法會收到一個TIMEOUT消息。若是不對其進行模式匹配,就會拋出異常。在上面的代碼裏,接收到TIMEOUT消息當作了完成值累加的信號。輸出以下:
Time out! Will return result now Total is 16
咱們應該傾向於使用receiveWithin()方法而非receive()方法,避免產生活性等待問題。
recevie()和receiveWithin()方法把函數值看成偏應用函數,調用代碼塊以前,會檢查它是否處理消息。因此,若是接收到一個非預期的消息,就會悄悄地忽略它。固然,若是想把忽略的消息顯示出來,能夠提供一個case_=>...語句。下面這個例子展現了忽略的無效消息:
package com.cn.gao import scala.actors._ import Actor._ object MessageIgnore extends App{ val expectStringOrInteger = actor { for(i <- 1 to 4) { receiveWithin(1000) { case str: String =>println("You said " + str) case num: Int => println("You gave " + num) case TIMEOUT => println("Time out!") } } } expectStringOrInteger ! "only constant is change" expectStringOrInteger ! 1024 expectStringOrInteger ! 22.22 expectStringOrInteger ! (self, 1024) receiveWithin(3000){case _ => } }
在代碼最後,放了一個receiveWithin()的調用。由於主線程退出時,程序就退出了,這個語句保證程序還活動着,給actor一個應答的機會。從輸出中能夠看出,actor處理了前兩個發送給它的消息,忽略了後兩個,由於它們沒有匹配上預期的消息模式。程序最終會超時,由於沒有再接收到任何能夠匹配的消息。輸出結果以下:
You said only constant is change You gave 1024 Time out! Time out!
在每一個actor裏,調用receive()的時候實際上會要求有一個單獨的線程。這個線程會一直持有,直到這個actor結束。也就是說,即使是在等待消息到達,程序也會持有這些線程,每一個actor一個,這絕對是一種資源浪費。Scala不得不持有這些線程的緣由在於,控制流的執行過程當中有一些具體狀態。若是在調用序列裏沒有須要保持和返回的狀態,Scala幾乎就能夠從線程池裏獲取任意線程執行消息處理——這偏偏就是使用react()所作的事情。react()不一樣於receive(),它並不返回任何結果。實際上,它並不從調用中返回。
若是處理了react()的當前消息後,還要處理更多的消息,就要在消息處理的末尾調用其餘方法。Scala會把這個調用執行交給線程池裏的任意線程。看一個這種行爲的例子:
package com.cn.gao import scala.actors.Actor._ import scala.actors._ object React extends App { def info(msg:String) = println(msg + " received by " + Thread.currentThread()) def receiveMessage(id:Int) { for(i <- 1 to 2) { receiveWithin(20000) { case msg:String => info("receive: " + id + msg) case TIMEOUT => } } } def reactMessage(id:Int){ react { case msg:String => info("react: " + id + msg) reactMessage(id) } } val actors = Array ( actor {info("react: 1 actor created"); reactMessage(1)}, actor {info("react: 2 actor created"); reactMessage(2)}, actor {info("receive: 3 actor created"); receiveMessage(3)}, actor {info("receive: 4 actor created"); receiveMessage(4)} ) Thread.sleep(1000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)} Thread.sleep(2000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)} }
上面的代碼輸出結果以下:
react: 1 actor created received by Thread[ForkJoinPool-1-worker-5,5,main] react: 2 actor created received by Thread[ForkJoinPool-1-worker-3,5,main] receive: 3 actor created received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 actor created received by Thread[ForkJoinPool-1-worker-7,5,main] react: 1 hello received by Thread[ForkJoinPool-1-worker-3,5,main] react: 2 hello received by Thread[ForkJoinPool-1-worker-3,5,main] receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main] react: 1 hello received by Thread[ForkJoinPool-1-worker-5,5,main] react: 2 hello received by Thread[ForkJoinPool-1-worker-5,5,main] receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main]
使用receiveWithin()方法的actor具備線程關聯性(thread affinity);他們會持續的使用分配給他們的同一個線程。從上面的輸出中就能夠看出。
另外一方面,使用react()的actor能夠自由的交換彼此的線程,能夠由任何可用的線程處理。
換句話說,使用react()的actor不具備線程關聯性,它們會放棄本身的線程,用一個新的線程(或許是同一個)進行後續的消息處理。這種作法對資源更爲友善,特別是在消息處理至關快的狀況下。因此,咱們鼓勵使用react()來代替receive()。
相似於receiveWithin(),若是在超時時段裏,沒有接到任何消息,reactWithin()就會超時——在這種狀況下,若是處理case TIMEOUT,能夠採起任何想採起的行動,也能夠從方法裏退出。下面是一個使用reactWithin()的例子,嘗試一下以前使用receiveWithin()實現累加器的例子,此次用reactWithin()方法:
package com.cn.gao import scala.actors._ import scala.actors.Actor._ object ReactWithin extends App { val caller = self def accumulate(sum:Int) { reactWithin(500){ case number:Int => accumulate(sum + number) case TIMEOUT => println("Timed out! Will send result now") caller ! sum } println("This will not be called...") } val accumulator = actor {accumulate(0)} accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(10000) { case result => println("Total is " + result) } }
上面的代碼輸出以下:
Timed out! Will send result now Total is 16
同使用receiveWithin()的方案比起來,這個方案更加優雅,等待接收消息時,它並不持有任何線程。
關於react()和reactWithin(),最後要記住的一點是,由於這兩個方法並非真的從調用裏返回(記住,Scala內部經過讓這些方法拋出異常來處理這個問題),放在這些方法後的任何代碼都不會執行(好比在accumulate()方法末尾加上打印語句)。因此,在調用這兩個方法以後,不要寫任何東西。
有兩件事情阻礙咱們充分使用react()和reactWithin()。第一個是遞歸調用。若是有多個case語句,典型狀況下,要在每一個case裏面重複調用。第二,彷佛沒有什麼好的方式跳出方法。第一個顧慮的答案是單例對象Actor的loop()方法。第二個的答案是loopWhile()方法。
相比於在reactWithin()裏遞歸的調用方法,能夠在loop()調用裏放一個對reactWithin()的調用。執行loop()方法的線程遇到reactWithin()的調用時,會放棄控制。消息到達時,任意的線程均可以繼續執行適當的case語句。case語句執行完畢,線程會繼續回到loop()塊的頂部。這會一直繼續下去。loopWhile()方法是相似的,可是隻有提供的參數是有效的,它纔會繼續循環下去。由於loopWhile()負責處理循環,因此,能夠把局部狀態放到循環以外,在reactWithin()方法裏訪問它。這樣的話,就給了咱們一個一箭雙鵰的選擇,既能夠像receiveWithin()那樣處理狀態,又能夠像reactWithin()那樣利用來自線程池的線程。下面看一個在loopWhile()裏使用reactWithin()的例子。
package com.cn.gao import scala.actors._ import Actor._ object Loop extends App { val caller = self val accumulator = actor { var continue = true var sum = 0 loopWhile(continue){ reactWithin(500){ case number:Int => sum += number case TIMEOUT => continue = false caller ! sum } } } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(1000){case result => println("Total is " + result)} }
上面的代碼沒有任何遞歸調用——這是由loopWhile()處理的。在退出消息處理的地方,只需簡單的設置標記,由它處理退出循環,進而退出actor執行。代碼輸出以下:
Total is 16
咱們已經見識到了,使用receive時,每一個actor是怎樣運行在本身的線程裏,react又如何讓actor共享來自線程池的線程。不過,有時咱們會想要更強的控制力。好比,結束一個長期運行的任務以後,須要更新UI,這時須要在一個單獨的線程裏運行任務,而後,在主線程裏更新UI。(由於UI組件時常不是線程安全的。)經過使用SingleThreadScheduler,可讓Scala在主線程裏運行actor。咱們用個例子看看如何作到這點:
package com.cn.gao import scala.actors._ import scala.actors.scheduler._ import Actor._ object InMainThread { def main(args:Array[String]){ if (args.length > 0 && args(0)== "Single") { println("Command-line argument Single found") Scheduler.impl = new SingleThreadedScheduler() } println("Main running in " + Thread.currentThread()) actor {println("Actor1 running in " + Thread.currentThread())} actor {println("Actor2 running in " + Thread.currentThread())} receiveWithin(3000){case _ => } } }
上面的代碼裏,建立了兩個actor。若是不傳任何命令行參數,兩個actor的代碼和主腳本的代碼會運行在各自的線程裏,輸出以下:
Main running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor2 running in Thread[ForkJoinPool-1-worker-5,5,main]
另外一方面,若是像scala InMainThread.scala Single 這樣運行以前的代碼,會獲得不一樣的結果:
Command-line argument Single found Main running in Thread[main,5,main] Actor1 running in Thread[main,5,main] Actor2 running in Thread[main,5,main]
不管actor什麼時候啓動,Scala都會讓單例對象Scheduler去運行它。經過是設置Scheduler的impl,就能夠控制整個應用的actor調度策略。
上面的方式影響深遠,它讓咱們能夠控制全部的actor的調度。不過,也許咱們想要讓一些線程運行在主線程中,而其它actor運行在各自線程裏。經過繼承Actor trait,改寫scheduler()方法,就能夠作到這一點。默認狀況下,這個方法爲要調度的actor返回單例對象Scheduler。改寫這個方法就能夠控制調度單獨的actor的方式,以下所示:
package com.cn.gao import scala.actors._ import scala.actors.scheduler._ import Actor._ object InMainThreadSelective extends App { trait SingleThreadActor extends Actor { override protected def scheduler() = new SingleThreadedScheduler() } class MyActor1 extends Actor { def act() = println("Actor1 running in " + Thread.currentThread()) } class MyActor2 extends SingleThreadActor { def act() = println("Actor2 running in " + Thread.currentThread()) } println("Main running in " + Thread.currentThread()) new MyActor1().start() new MyActor2().start() actor{println("Actor 3 running in " + Thread.currentThread())} receiveWithin(5000){case _ => } }
上面的代碼建立了三個actor,其中,兩個繼承自Actor trait,一個使用了更爲常規的actor()方法。經過改寫protected方法scheduler,就能夠控制MyActor2的線程。運行上述代碼時,使用actor()和MyActor1建立的actor運行於本身的線程。而使用MyActor2建立的actor則運行於主線程,以下所示:
Main running in Thread[main,5,main] Actor2 running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor 3 running in Thread[ForkJoinPool-1-worker-3,5,main]