Scala學習筆記--Actor和併發

感謝博主lyrebing  博文地址:http://blog.csdn.net/lyrebing/article/details/20446061java

 

1.  Actor用法node

1.1 Actor的基本使用react

Scala會創建一個線程池共全部Actor來使用。數組

receive模型是Actor從池中取一個線程一直使用;安全

react模型是Actor從池中取一個線程用完給其餘Actor用服務器

 

例1-1 基本運行方式1數據結構

//actor是一個相似線程的實體,它有一個用來接收消息的信箱。
//實現actor的方法是繼承Scala.actors.Actor並完成其act方法
//經過調用actor的start方法來啓動它
class SillyActor extends Actor{
  def act(){
    for(i<- 1 to 5){
      println("運行次數:"+i)
      Thread.sleep(1000);
    }
  }
}

object SimpleActor{
  def main(args:Array[String]):Unit = {
   
    val sactor1 = new SillyActor();
    sactor1.start(); //啓動actor
   }
}

運行結果:多線程

運行次數: 1
運行次數: 2
運行次數: 3
運行次數: 4
運行次數: 5異步

 

例1-2: 基本運行方式2函數

object SimpleActor{
  def main(args:Array[String]):Unit = {
    //另外一種方法:使用Actor中名爲actor工具的方法來建立actor
    //actor在定義後當即啓動,無需在調用start()
    val sactor2 = Actor.actor{
      for(i<-1 to 5){
        println("Another Actor"+i)
        Thread.sleep(1000);
      }
    }
  }
}   

運行結果:

Another Actor1
Another Actor2
Another Actor3
Another Actor4
Another Actor5

 

1.2 發送消息

提示:

發送異步消息,沒有返回值。

!?

發送同步消息,等待返回值。(會阻塞發送消息語句所在的線程)

!!

發送異步消息,返回值是 Future[Any]。

不帶參數。查看 mailbox 中的下一條消息。

 

2.  接收消息

2.1  方式1:接受receive

特色:要反覆處理消息,receive外層用while(..), 不用的話只處理一次。

 

例2-1: 使用receive接收、處理消息 

//經過調用Actor.receive來接收消息
//actor發送消息時,它並不會阻塞,當actor接收消息時,它也不會被打斷。
//發送的消息在接收actor的郵箱中等待處理,知道actor調用receive方法

val sactor3 = Actor.actor{
  var work = true;
  while(true){
    Actor.receive{
      case x :Int =>println("got an Int: "+x)
      case _ =>println("not an Int");
    }
  }
}    

sactor3 ! 12
sactor3 ! 13
sactor3 ! 1.5

 運行結果:

got an Int: 12
got an Int: 13
not an Int

 

2.2  方式2 接受react

特色:

(1) 從不返回;
(2) 要反覆執行消息處理,react外層用loop,不能用while(..);
(3) 經過複用線程,比receive更高效,應儘量使用react;

 

例2-2: 得到並顯示IP網站的地址

ReactActor.java

object ReactActor{
  def main(args:Array[String]):Unit={
    NameResolver.start();
    NameResolver ! ("www.baidu.com",Actor.self)    
    NameResolver ! "msg1";
    NameResolver ! "EXIT"
    NameResolver ! "msg2";//已經結束,不會顯示
  }
}

object NameResolver extends Actor{
  def act(){
    loop{
	    react{
	      case (name:String, actor:Actor)=>
	        println(getIp(name))
	      case "EXIT" =>
	        println("Name resorver exit!");
	        exit;  //跳出loop
	      case msg =>
	        println("Unhandled message" + msg);
	    }
    }
  }
/* //不使用loop的方法 def act(){ react{ case (name:String, actor:Actor)=> println(getIp(name)) //actor ! getIp(name); act(); //再次調用act函數 case "EXIT" => println("Name resorver exit!"); case msg => println("Unhandled message" + msg); act(); } } */ //獲取IP地址 def getIp(name:String):Option[InetAddress]={ try{ Some(InetAddress.getByName(name)) }catch{ case _:UnknownHostException =>None } } }

運行結果:

Some(www.baidu.com/119.75.217.109)
Unhandled messagemsg1
Name resorver exit!

  

3. 良好的Actor風格

3.1 Actor不該阻塞

編寫良好的actor在處理消息時不該阻塞。阻塞的問題是,在actor阻塞時,另外一個actor可能會對其發起另外一個它可以處理的請求。若是actor在首個請求時阻塞了,那麼它將不會留意到第二個請求。最壞的情形是可能帶來死鎖,多個actor都在等待另外一個阻塞的actor的響應。

actor{
    Thread.sleep(time)
    mainActor ! "WAKEUP"
}

這個助手actor的確阻塞了,但因爲它永遠不會受到消息,所以在這種狀況下是能夠的。主actor能夠繼續響應新的請求,下面程序emoteLater方法展現了這種處理方式的用法。它建立一個新的actor來執行sleep以便主actor不阻塞,以確保它向正確的actor發送"Emote"消息,咱們必須當心地在主actor中對self求值而不是在助手actor中。

object SimpleActor2Copy{
  
  def emoteLater(){ //輔助actor,用於休眠
        val mainActor = Actor.self;
        Actor.actor{
          Thread.sleep(1000);
          mainActor ! "Emote"
        }
  }
  
  def main(args:Array[String]):Unit={  
    val sActor2 = Actor.actor{      
      var emoted = 0;
      var con = true;
      emoteLater();  //啓動一次輔助actor
      Actor.loopWhile(con){//Actor.loop用來重複執行一個代碼塊 ,loopWhile能夠加上判斷條件
        Actor.react{
          case "Emote" =>
            println("I am acting "+emoted)
            emoted+=1;
            if(emoted<5){
              emoteLater();
            }else{
              con = false
            }  

          case msg =>
            println("received" + msg)
        }
      }
    }
  }
}

運行結果:

I am acting 0
I am acting 1
I am acting 2
I am acting 3
I am acting 4

因爲這個actor並不在sleep方法中阻塞--它的助手actor會阻塞--他能夠在等待下次表演以前繼續作其餘事。

 

3.2 actor之間只經過消息進行通訊。

actor模型讓咱們寫多線程程序時只用關注各個獨立的單線程程序(actor),他們之間經過消息來通信。例如,若是BadActor中有一個GoodActor的引用:

class BadActor(a:GoodActor) extends Actor {...}

那在BadActor中便可以經過該引用來直接調用GoodActor的方法,也能夠經過「!」來傳遞消息。選擇後者!由於一旦BadActor經過引用讀取GoodActor實例的私有數據,而這些數據可能正被其餘線程改寫值,結果就避免不了「共享數據-鎖」模型中的麻煩事:即必須保證BadActor線程讀取GoodActor的私有數據時,GoodActor線程在這塊成爲「共享數據」的操做上加鎖。GoodActor只要有了共享數據,就必須來加鎖防範競用衝突和死鎖,你又得從actor模型退回到「共享數據-鎖」模型(注:actor對消息是順序處理的,原本不用考慮共享數據)。

 

3.3 採用不可變消息

因爲Scala的actor模型提供了在每一個actor的act 方法中的單線程環境,沒必要擔憂在這個方法的實現中使用的對象是不是線程安全的。
每一個act方法實際上被侷限在一個線程中,在act方法中你能夠盡情使用非同步、可變對象,actor模型被稱做share-nothing的模型,由於數據侷限於一個線程中,而不是被多個線程共享。
有一個例外:用於在actor間發送消息的對象中的數據由多個actor「共享」。這時要關注消息對象是否安全。

保證消息對象線程安全的最好方法就是保證只使用不可變對象做爲消息對象。消息類中只定義val字段,且只能指向不可變對象。定義這種不可變消息類的簡單方法就是使用case class, 並保證其全部的val字段都是不可變的。Scala API中提供了不少不可變對象可用,例如基本類型、String、Tuple、List,不可變Set、不可變Map等。

若是你發現確實須要把一個可變對象obj1發送給其餘actor,也因該是發送一份拷貝對象obj1.clone過去,而不是把obj1直接發過去。例如,數據對象Array是可變且未作同步的,因此Array只應該由一個actor同時存取,若是須要發送數組arr,就發送arr.clone(arr中的元素也應該是不可變對象),或者直接發送一個不可變對象arr.toList更好。

總結:大部分時候使用不可變對象很方便,不可變對象是並行系統的曙光,它們是易使用、低風險的線程安全對象。當你未來要設計一個和並行相關的程序時,不管是否使用actor,都應該儘可能使用不可變的數據結構。

 

3.4 讓消息自說明

對每一種消息建立一個對應的case class,而不是使用上面的tuple數據結構。雖然這種包裝在不少狀況下並不是必須,但該作法能使actor程序易於理解,例如:

// 不易理解,由於傳遞的是個通常的字符串,很難指出那個actor來響應這個消息
lookerUpper ! ("www.scala-lang.org", self)
// 改成以下,則指出只有react能處理LoopupIP的actor來處理:
case class LookupIP(hostname: String, requester: Actor)
lookerUpper ! LookupIP("www.scala-lang.org", self)

 

 

4. 不一樣JVM間的消息訪問

服務器端:

object MyServer{
  def main(args:Array[String]):Unit={

    Actor.actor { // 建立並啓動一個 actor
      // 當前 actor 監聽的端口: 3000
      RemoteActor.alive(3000) 
// 在 3000 端口註冊本 actor,取名爲 server1。 // 第一個參數爲 actor 的標識,它以單引號開頭,是 Scala 中的 Symbol 量, // Symbol 量和字符串類似,但 Symbol 相等是基於字符串比較的。 // self 指代當前 actor (注意此處不能用 this) RemoteActor.register('server1, Actor.self); // 收到消息後的響應 Actor.loop { Actor.react { case msg => println("server1 get: " + msg) } } } } }

客戶端:

 

import scala.actors.Actor
import scala.actors.remote.RemoteActor
import scala.actors.remote.Node

object SimpleActor2Copy{  
  def main(args:Array[String]):Unit={  
    Actor.actor { 
      // 取得一個節點(ip:port 惟一標識一個節點) 
      // Node 是個 case class,因此不須要 new 
      val node = Node("127.0.0.1", 3000)  

      // 取得節點對應的 actor 代理對象 
      val remoteActor = RemoteActor.select(node, 'server1)  

      // 如今 remoteActor 就和普通的 actor 同樣,能夠向它發送消息了! 
      println("-- begin to send message")
      remoteActor ! "ActorClient的消息"       
      println("-- end")
      
    } 
  }
}

 

運行結果:

控制檯輸出客戶端發送的消息  「server1 get: ActorClient的消息」

相關文章
相關標籤/搜索