Scala併發編程

scala支持Java的多線程模型, 也繼承了多線程固有的資源競爭和死鎖問題.html

做爲一種函數式編程語言, scala的actor消息模型提供了一種更便捷更安全的併發編程方案.編程

線程模型

scala的線程模型來自於Java. 首先咱們要拓展一個Runable或Callable, 並重寫run方法安全

trait Runnable {
  def run(): Unit
}

Callable與Runable相似,可是有一個返回值:多線程

trait Callable[V] {
  def call(): V
}

Thread須要一個Runable實例做爲參數來建立:併發

scala> val thread = new Thread(new Runnable {
     |   def run() {
     |     println("hello world")
     |   }
     | })
thread: Thread = Thread[Thread-2,5,main]

scala> thread.start()
hello world

線程同步

synchronized是JVM中最簡單的使用互斥鎖的方式:負載均衡

class User {
  var name: String = "";
  def setName(nameArg :String) {
    this.synchronized {
      this.name = nameArg;
    }
  }
}

當線程開始執行obj.synchronized塊中的代碼前, 它將嘗試得到對象obj的鎖, 若獲取失敗則線程進入阻塞狀態.異步

當某個線程得到了對象的鎖後, 其它線程就沒法訪問或修改該對象. 當obj.synchronized塊中的代碼執行完成時, 線程會解除鎖, 另外一個線程就能夠加鎖並訪問對象了.編程語言

Future模型

scala提供了Promise-Future-Callback異步模型:函數式編程

  • Future 表示一個尚未完成的任務的結果, Future對象能夠在任務完成前訪問函數

  • Promise 表示一個尚未執行的任務, 能夠經過Promise標記任務的狀態

  • Callback 回調用於在任務完成或其它狀況下執行的操做

Future

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object FutureDemo extends App {

  val f = Future {
    println("working on future task")
    Thread.sleep(100)
    1+1
  }
  
  println("waiting for future task complete")
  val result = Await.result(f, 1 second)
  println(result)
}

執行異步任務須要上下文, ExecutionContext.Implicits.global是使用當前的全局上下文做爲隱式上下文.

引入.duration._容許咱們使用1 second, 200 milli, 2 minute這樣的時間間隔字面值.

上述示例中Await.result使用阻塞的方式等待Future任務完成, 若Future超時未完成則拋出TimeoutException異常.

屢次運行上述示例就會發現, 兩條提示輸出順序是不肯定的. 這是由於Future中的代碼是在獨立線程中執行的.

更好的方式是採用回調的方式來處理Future結果:

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}

object FutureDemo2 extends App {

  val f = Future {
    1 + 2
  }

  
  f.onComplete{
    case Success(value) => println(value)
    case Failure(e) => e.printStackTrace
  }

}

或者定義onSuccessonFailure兩個回調.

import scala.concurrent.{Future}
import scala.concurrent.ExecutionContext.Implicits.global

object FutureDemo2 extends App {

  val f = Future {
    1 + 2
  }

  
  f.onSuccess {
    case value => println(value)
  }

  f.onFailure {
    case e => e.printStackTrace
  }

}

Actor模型

Actor是一個基於消息機制的併發模型, 自Scala 2.11以後Akka Actor已成爲Scala事實上的Actor標準.

akka不是scala的默認包, 這裏咱們使用SBT來管理外部包依賴. 關於sbt的使用能夠參見做者的另外一篇博文Scala構建工具SBT.

build.sbt中添加下列代碼, 引入akka依賴.

scalaVersion := "2.12.1"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

libraryDependencies +=
  "com.typesafe.akka" %% "akka-actor" % "2.4.17"

更多關於引入akka的內容能夠參見akka官網.

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props

class HelloActor extends Actor {
  def receive() = {
    case "hello" => println("Hi, I am an actor.");
    case _       => println("?");
  }
}

object Main extends App {
  val system = ActorSystem("HelloSystem");
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor");
  helloActor ! "hello";
  helloActor ! "bye";
  system.shutdown();
}

自定義類繼承Actor並重寫receive方法處理不一樣類型的消息. 這裏使用String類進行模式匹配, 使用case class進行模式匹配能夠傳遞更多信息.

Actor須要ActorSystem的事件循環提供支持, 初始化一個ActorSystem後事件循環開始運行.最後必須執行system.shutdown();不然scala程序會一直運行下去.

!是用於發送消息的操做符, helloActor ! "hello";將消息"hello"發送給了helloActor.

receive方法的返回值類型是PartialFunction[Any, Unit]. 全部發送給Actor的消息都將被receive返回的偏函數處理.

偏函數的返回值類型爲Unit, 也就是說處理消息時必須依賴反作用而不能有返回值; 偏函數的參數類型爲Any, 也就是說全部消息在傳入的時候都會發生類型丟失.

非類型化的消息便於設計消息轉發, 負載均衡和代理Actor等機制, 且由於基於模式匹配的消息處理, 非類型化並不會產生問題.

基於事件循環的非阻塞機制已經被廣爲使用, 這裏簡單說明Actor與線程的問題.Actor並不是與線程一一對應, 一個線程能夠爲多個Actor服務. ActorSystem會根據實際狀況選擇線程數.

相關文章
相關標籤/搜索