1 概念java
Scala中的Actor可以實現並行編程的強大功能,它是基於事件模型的併發機制,Scala是運用消息(message)的發送、接收來實現多線程的。使用Scala可以更容易地實現多線程應用的開發。react
2 傳統java併發編程與scala actor編程的區別編程
對於Java,咱們都知道它的多線程實現須要對共享資源(變量、對象等)使用synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。並且,經常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。緣由就在於Java中多數使用的是可變狀態的對象資源,對這些資源進行共享來實現多線程編程的話,控制好資源競爭與防止對象狀態被意外修改是很是重要的,而對象狀態的不變性也是較難以保證的。 而在Scala中,咱們能夠經過複製不可變狀態的資源(即對象,Scala中一切都是對象,連函數、方法也是)的一個副本,再基於Actor的消息發送、接收機制進行並行編程多線程
3 actor方法執行順序併發
1.首先調用start()方法啓動Actorapp
2.調用start()方法後其act()方法會被執行異步
3.向Actor發送消息async
發送消息的方式ide
!函數 |
發送異步消息,沒有返回值。 |
!? |
發送同步消息,等待返回值。 |
!! |
發送異步消息,返回值是 Future[Any]。 |
例子
添加依賴
<!--scala actor--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-actors</artifactId> <version>2.10.5</version> </dependency>
1
package main.scala.com import scala.actors.Actor /** * Created by Administrator on 2019/6/4. */ object MyActor1 extends Actor { //重寫act方法 def act(): Unit = { for (i <- 1 to 10) { println("actor-1" + i) Thread.sleep(2000) } } } object MyActor2 extends Actor { //重寫act方法 def act() { for (i <- 1 to 10) { println("actor-2 " + i) Thread.sleep(2000) } } } object ActorTest extends App{ //啓動Actor MyActor1.start() MyActor2.start() }
運行結果
說明:上面分別調用了兩個單例對象的start()方法,他們的act()方法會被執行,相同與在java中開啓了兩個線程,線程的run()方法會被執行
注意:這兩個Actor是並行執行的,act()方法中的for循環執行完成後actor程序就退出了
可能碰見的問題
1 Exception in thread "main" java.lang.NoSuchMethodError: scala.actors.AbstractActor.$init$(Lscala/actors/AbstractActor;)V
解決辦法
使用scala2.12.x的版本運行Actor,會報這種錯誤。
報錯緣由:scala版本不匹配,
解決方法:建立新工程,選擇scala2.10.x的版本
2
解決方案:項目->open module setting->Modules->Dependencies 加上scala sdk的library
2
package main.scala.com import scala.actors.Actor /** * Created by Administrator on 2019/6/4. */ class MyActor extends Actor { override def act(): Unit = { while (true) { receive { case "start" => { println("starting ...") Thread.sleep(5000) println("started") } case "stop" => { println("stopping ...") Thread.sleep(5000) println("stopped ...") } } } } } object MyActor { def main(args: Array[String]) { val actor = new MyActor actor.start() actor ! "start" actor ! "stop" println("消息發送完成!") } }
說明:在act()方法中加入了while (true) 循環,就能夠不停的接收消息
注意:發送start消息和stop的消息是異步的,可是Actor接收到消息執行的過程是同步的按順序執行
3
(react方式會複用線程,比receive更高效)
package main.scala.com import scala.actors.Actor /** * Created by Administrator on 2019/6/4. */ class YourActor extends Actor { override def act(): Unit = { loop { react { case "start" => { println("starting ...") Thread.sleep(5000) println("started") } case "stop" => { println("stopping ...") Thread.sleep(8000) println("stopped ...") } } } } } object YourActor { def main(args: Array[String]) { val actor = new YourActor actor.start() actor ! "start" actor ! "stop" println("消息發送完成!") } }
說明: react 若是要反覆執行消息處理,react外層要用loop,不能用while
4
package main.scala.com import scala.actors.Actor /** * Created by Administrator on 2019/6/4. */ class AppleActor extends Actor { def act(): Unit = { while (true) { receive { case "start" => println("starting ...") case SyncMsg(id, msg) => { println(id + ",sync " + msg) Thread.sleep(5000) sender ! ReplyMsg(3, "finished") } case AsyncMsg(id, msg) => { println(id + ",async " + msg) Thread.sleep(5000) } } } } } object AppleActor { def main(args: Array[String]) { val a = new AppleActor a.start() //異步消息 a ! AsyncMsg(1, "hello actor") println("異步消息發送完成") //同步消息 //val content = a.!?(1000, SyncMsg(2, "hello actor")) //println(content) val reply = a !! SyncMsg(2, "hello actor") println(reply.isSet) //println("123") val c = reply.apply() println(reply.isSet) println(c) } } case class SyncMsg(id: Int, msg: String) case class AsyncMsg(id: Int, msg: String) case class ReplyMsg(id: Int, msg: String)
5 用actor併發編程寫一個單機版的WorldCount,將多個文件做爲輸入,計算完成後將多個任務彙總,獲得最終的結果
package main.scala.com import java.io.File import scala.actors.{Actor, Future} import scala.collection.mutable import scala.io.Source /** * Created by Administrator on 2019/6/4. */ class Task extends Actor { override def act(): Unit = { loop { react { case SubmitTask(fileName) => { val contents = Source.fromFile(new File(fileName)).mkString val arr = contents.split("\r\n") val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length) //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)) sender ! ResultTask(result) } case StopTask => { exit() } } } } } object WorkCount { def main(args: Array[String]) { val files = Array("c://words.txt", "c://words.log") val replaySet = new mutable.HashSet[Future[Any]] val resultList = new mutable.ListBuffer[ResultTask] for (f <- files) { val t = new Task val replay = t.start() !! SubmitTask(f) replaySet += replay } while (replaySet.size > 0) { val toCumpute = replaySet.filter(_.isSet) for (r <- toCumpute) { val result = r.apply() resultList += result.asInstanceOf[ResultTask] replaySet.remove(r) } Thread.sleep(100) } val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2)) println(finalResult) } } case class SubmitTask(fileName: String) case object StopTask case class ResultTask(result: Map[String, Int])