大數據學習——actor編程

1 概念java

Scala中的Actor可以實現並行編程的強大功能,它是基於事件模型的併發機制,Scala是運用消息(message)的發送、接收來實現多線程的。使用Scala可以更容易地實現多線程應用的開發。react

 

2 傳統java併發編程與scala actor編程的區別編程

 

 

對於Java,咱們都知道它的多線程實現須要對共享資源(變量、對象等)使用synchronized 關鍵字進行代碼塊同步、對象鎖互斥等等。並且,經常一大塊的try…catch語句塊中加上wait方法、notify方法、notifyAll方法是讓人很頭疼的。緣由就在於Java中多數使用的是可變狀態的對象資源,對這些資源進行共享來實現多線程編程的話,控制好資源競爭與防止對象狀態被意外修改是很是重要的,而對象狀態的不變性也是較難以保證的。 而在Scala中,咱們能夠經過複製不可變狀態的資源(即對象,Scala中一切都是對象,連函數、方法也是)的一個副本,再基於Actor的消息發送、接收機制進行並行編程多線程

 

3 actor方法執行順序併發

1.首先調用start()方法啓動Actorapp

2.調用start()方法後其act()方法會被執行異步

3.向Actor發送消息async

 

發送消息的方式ide

!函數

發送異步消息,沒有返回值。

!?

發送同步消息,等待返回值。

!!

發送異步消息,返回值是 Future[Any]。

 

例子

添加依賴

<!--scala actor--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-actors</artifactId> <version>2.10.5</version> </dependency>

package main.scala.com

import scala.actors.Actor

/**
  * Created by Administrator on 2019/6/4.
  */
object MyActor1 extends Actor {

  //重寫act方法

  def act(): Unit = {
    for (i <- 1 to 10) {
      println("actor-1" + i)
      Thread.sleep(2000)
    }
  }
}

object MyActor2 extends Actor {
  //重寫act方法
  def act() {
    for (i <- 1 to 10) {
      println("actor-2 " + i)
      Thread.sleep(2000)
    }
  }
}
object ActorTest extends App{
  //啓動Actor
  MyActor1.start()
  MyActor2.start()
}

 

運行結果

說明:上面分別調用了兩個單例對象的start()方法,他們的act()方法會被執行,相同與在java中開啓了兩個線程,線程的run()方法會被執行

注意:這兩個Actor是並行執行的,act()方法中的for循環執行完成後actor程序就退出了

 

 

可能碰見的問題

1 Exception in thread "main" java.lang.NoSuchMethodError: scala.actors.AbstractActor.$init$(Lscala/actors/AbstractActor;)V

解決辦法

使用scala2.12.x的版本運行Actor,會報這種錯誤。

報錯緣由:scala版本不匹配,

解決方法:建立新工程,選擇scala2.10.x的版本

 

 

解決方案:項目->open module setting->Modules->Dependencies  加上scala sdk的library

 

 

 

package main.scala.com

import scala.actors.Actor

/**
  * Created by Administrator on 2019/6/4.
  */
class MyActor extends Actor {

  override def act(): Unit = {
    while (true) {
      receive {
        case "start" => {
          println("starting ...")
          Thread.sleep(5000)
          println("started")
        }
        case "stop" => {
          println("stopping ...")
          Thread.sleep(5000)
          println("stopped ...")
        }
      }
    }
  }
}

object MyActor {
  def main(args: Array[String]) {
    val actor = new MyActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("消息發送完成!")
  }
}

說明:在act()方法中加入了while (true) 循環,就能夠不停的接收消息

注意:發送start消息和stop的消息是異步的,可是Actor接收到消息執行的過程是同步的按順序執行

 

 

(react方式會複用線程,比receive更高效)

package main.scala.com

import scala.actors.Actor

/**
  * Created by Administrator on 2019/6/4.
  */
class YourActor extends Actor {

  override def act(): Unit = {
    loop {
      react {
        case "start" => {
          println("starting ...")
          Thread.sleep(5000)
          println("started")
        }
        case "stop" => {
          println("stopping ...")
          Thread.sleep(8000)
          println("stopped ...")
        }
      }
    }
  }
}


object YourActor {
  def main(args: Array[String]) {
    val actor = new YourActor
    actor.start()
    actor ! "start"
    actor ! "stop"
    println("消息發送完成!")
  }
}

說明: react 若是要反覆執行消息處理,react外層要用loop,不能用while

 

 

4

package main.scala.com

import scala.actors.Actor

/**
  * Created by Administrator on 2019/6/4.
  */
class AppleActor extends Actor {

  def act(): Unit = {
    while (true) {
      receive {
        case "start" => println("starting ...")
        case SyncMsg(id, msg) => {
          println(id + ",sync " + msg)
          Thread.sleep(5000)
          sender ! ReplyMsg(3, "finished")
        }
        case AsyncMsg(id, msg) => {
          println(id + ",async " + msg)
          Thread.sleep(5000)
        }
      }
    }
  }
}

object AppleActor {
  def main(args: Array[String]) {
    val a = new AppleActor
    a.start()
    //異步消息
    a ! AsyncMsg(1, "hello actor")
    println("異步消息發送完成")
    //同步消息
    //val content = a.!?(1000, SyncMsg(2, "hello actor"))
    //println(content)
    val reply = a !! SyncMsg(2, "hello actor")
    println(reply.isSet)
    //println("123")
    val c = reply.apply()
    println(reply.isSet)
    println(c)
  }
}

case class SyncMsg(id: Int, msg: String)

case class AsyncMsg(id: Int, msg: String)

case class ReplyMsg(id: Int, msg: String)

 

 

5  用actor併發編程寫一個單機版的WorldCount,將多個文件做爲輸入,計算完成後將多個任務彙總,獲得最終的結果

package main.scala.com

import java.io.File

import scala.actors.{Actor, Future}
import scala.collection.mutable
import scala.io.Source


/**
  * Created by Administrator on 2019/6/4.
  */
class Task extends Actor {

  override def act(): Unit = {
    loop {
      react {
        case SubmitTask(fileName) => {
          val contents = Source.fromFile(new File(fileName)).mkString
          val arr = contents.split("\r\n")
          val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
          //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
          sender ! ResultTask(result)
        }
        case StopTask => {
          exit()
        }
      }
    }
  }
}

object WorkCount {
  def main(args: Array[String]) {
    val files = Array("c://words.txt", "c://words.log")

    val replaySet = new mutable.HashSet[Future[Any]]
    val resultList = new mutable.ListBuffer[ResultTask]

    for (f <- files) {
      val t = new Task
      val replay = t.start() !! SubmitTask(f)
      replaySet += replay
    }

    while (replaySet.size > 0) {
      val toCumpute = replaySet.filter(_.isSet)
      for (r <- toCumpute) {
        val result = r.apply()
        resultList += result.asInstanceOf[ResultTask]
        replaySet.remove(r)
      }
      Thread.sleep(100)
    }
    val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
    println(finalResult)
  }
}

case class SubmitTask(fileName: String)

case object StopTask

case class ResultTask(result: Map[String, Int])
相關文章
相關標籤/搜索