Scala進階之路-併發編程模型Akka入門篇程序員
做者:尹正傑apache
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。編程
一.Akka Actor介紹小程序
1>.Akka介紹多線程
寫併發程序很難。程序員不得不處理線程、鎖和競態條件等等,這個過程很容易出錯,並且會致使程序代碼難以閱讀、測試和維護。Akka 是 JVM 平臺上構建高併發、分佈式和容錯應用的工具包和運行時。Akka 用 Scala 語言寫成,同時提供了 Scala 和 JAVA 的開發接口。架構
2>.Akka 中 中 Actor 模型併發
Akka 處理併發的方法基於 Actor 模型。在基於 Actor 的系統裏,全部的事物都是 Actor,就好像在面向對象設計裏面全部的事物都是對象同樣。可是有一個重要區別,那就是 Actor 模型是做爲一個併發模型設計和架構的,而面向對象模式則不是。Actor 與 Actor 之間只能經過消息通訊。異步
3>.Akaka的特色maven
第一:它是對併發模型進行了更高的抽象;
第二:它是異步、非阻塞、高性能的事件驅動編程模型;
第三:它是輕量級事件處理(1GB 內存可容納百萬級別個 Actor);tcp
4>.爲何 Actor 模型是一種處理併發問題的解決方案?
處理併發問題就是如何保證共享數據的一致性和正確性,爲何會有保持共享數據正確性這個問題呢?無非是咱們的程序是多線程的,多個線程對同一個數據進行修改,若不加同步條件,勢必會形成數據污染。那麼咱們是否是能夠轉換一下思惟,用單線程去處理相應的請求,可是又有人會問了,如果用單線程處理,那系統的性能又如何保證。Actor 模型的出現解決了這個問題,簡化併發編程,提高程序性能。
5>.Maven依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.org.yinzhengjie</groupId> 8 <artifactId>MyActor</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- 定義一下常量 --> 12 <properties> 13 <encoding>UTF-8</encoding> 14 <scala.version>2.11.8</scala.version> 15 <scala.compat.version>2.11</scala.compat.version> 16 <akka.version>2.4.17</akka.version> 17 </properties> 18 19 <dependencies> 20 <!-- 添加scala的依賴 --> 21 <dependency> 22 <groupId>org.scala-lang</groupId> 23 <artifactId>scala-library</artifactId> 24 <version>${scala.version}</version> 25 </dependency> 26 27 <!-- 添加akka的actor依賴 --> 28 <dependency> 29 <groupId>com.typesafe.akka</groupId> 30 <artifactId>akka-actor_${scala.compat.version}</artifactId> 31 <version>${akka.version}</version> 32 </dependency> 33 34 <!-- 多進程之間的Actor通訊 --> 35 <dependency> 36 <groupId>com.typesafe.akka</groupId> 37 <artifactId>akka-remote_${scala.compat.version}</artifactId> 38 <version>${akka.version}</version> 39 </dependency> 40 </dependencies> 41 42 <!-- 指定插件--> 43 <build> 44 <!-- 指定源碼包和測試包的位置 --> 45 <sourceDirectory>src/main/scala</sourceDirectory> 46 <testSourceDirectory>src/test/scala</testSourceDirectory> 47 <plugins> 48 <!-- 指定編譯scala的插件 --> 49 <plugin> 50 <groupId>net.alchim31.maven</groupId> 51 <artifactId>scala-maven-plugin</artifactId> 52 <version>3.2.2</version> 53 <executions> 54 <execution> 55 <goals> 56 <goal>compile</goal> 57 <goal>testCompile</goal> 58 </goals> 59 <configuration> 60 <args> 61 <arg>-dependencyfile</arg> 62 <arg>${project.build.directory}/.scala_dependencies</arg> 63 </args> 64 </configuration> 65 </execution> 66 </executions> 67 </plugin> 68 69 70 </plugins> 71 </build> 72 </project>
自定義默認的源代碼包和測試包的位置,須要手動穿件Source Root目錄喲,以下圖:
二.編寫HelloActor
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.actor 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 10 import scala.io.StdIn 11 12 class HelloActor extends Actor{ 13 // 重寫接受消息的偏函數,其功能是接受消息並處理 14 override def receive: Receive = { 15 case "你好帥" => println("竟說實話,我喜歡你這種人!") 16 case "醜八怪" => println("滾犢子 !") 17 case "stop" => { 18 context.stop(self) // 中止本身的actorRef 19 context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService) 20 } 21 } 22 } 23 24 object HelloActor { 25 /** 26 * 建立線程池對象MyFactory,用來建立actor的對象的 27 */ 28 private val MyFactory = ActorSystem("myFactory") //裏面的"myFactory"參數爲線程池的名稱 29 /** 30 * 經過MyFactory.actorOf方法來建立一個actor,注意,Props方法的第一個參數須要傳遞咱們自定義的HelloActor類, 31 * 第二個參數是給actor起個名字 32 */ 33 private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor") 34 35 def main(args: Array[String]): Unit = { 36 var flag = true 37 while(flag){ 38 /** 39 * 接受用戶輸入的字符串 40 */ 41 print("請輸入您想發送的消息:") 42 val consoleLine:String = StdIn.readLine() 43 /** 44 * 使用helloActorRef來給本身發送消息,helloActorRef有一個叫作感嘆號("!")的方法來發送消息 45 */ 46 helloActorRef ! consoleLine 47 if (consoleLine.equals("stop")){ 48 flag = false 49 println("程序即將結束!") 50 } 51 /** 52 * 爲了避免讓while的運行速度在receive方法之上,咱們可讓他休眠0.1秒 53 */ 54 Thread.sleep(100) 55 } 56 } 57 }
以上代碼執行結果以下:
三.兩個actor通訊案例-模擬下棋對話
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.actor 7 8 import akka.actor.{ActorSystem, Props} 9 10 import akka.actor.{Actor, ActorRef} 11 12 /** 13 * 定義玩家1 14 */ 15 class player1Actor(val p2: ActorRef) extends Actor{ 16 // receive方法是負責處理消息的 17 override def receive: Receive = { 18 case "start" => { 19 println("棋聖:I'm OK !") 20 p2 ! "該你了" 21 } 22 case "將軍" => { 23 println("棋聖:你真猛!") 24 Thread.sleep(1000) 25 p2 ! "該你了" 26 } 27 } 28 } 29 30 31 /** 32 * 定義玩家2 33 */ 34 class player2Actor extends Actor{ 35 36 override def receive: Receive = { 37 case "start" => println("棋仙說:I'm OK !") 38 case "該你了" => { 39 println("棋仙:那必須滴!") 40 Thread.sleep(1000) 41 /** 42 * 注意,這個「sender()」,其實就是對ActorRef的一個引用。它指的是給發送"該你了"的這個對象自己! 43 */ 44 sender() ! "將軍" 45 } 46 } 47 } 48 49 50 object ChineseChess extends App{ 51 // 建立 actorSystem的工廠,用來生產ActorRef對象! 52 private val ChineseChessActorSystem = ActorSystem("Chinese-chess") 53 /** 54 * 經過actorSystem建立ActorRef 55 */ 56 private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2") //建立player2Actor對象 57 private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1") //建立player1Actor對象 58 59 p2 ! "start" 60 p1 ! "start" 61 }
運行以上代碼輸出結果以下:
四.服務端和客戶端交互的小程序
1>.服務端代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.robot 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 11 class ServerActor extends Actor{ 12 /** 13 * receive方法是用來處理客戶端發送過來的問題的 14 */ 15 override def receive: Receive = { 16 case "start" => println("天貓系統已啓動...") 17 18 case ClientMessage(msg) => { 19 println(s"收到客戶端消息:$msg") 20 msg match { 21 /** 22 * sender()發送端的代理對象, 發送到客戶端的mailbox中 -> 客戶端的receive 23 */ 24 case "你叫啥" => 25 sender() ! ServerMessage("本寶寶是天貓精靈") 26 case "你是男是女" => 27 sender() ! ServerMessage("本寶寶非男非女") 28 case "你有男票嗎" => 29 sender() ! ServerMessage("本寶寶還小喲") 30 case "stop" => 31 context.stop(self) // 中止本身的actorRef 32 context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService) 33 println("天貓系統已中止...") 34 case _ => 35 sender() ! ServerMessage("對不起,主人,我不知道你在說什麼.......") 36 } 37 } 38 } 39 } 40 41 object ServerActor { 42 def main(args: Array[String]): Unit = { 43 //定義服務端的ip和端口 44 val host = "127.0.0.1" 45 val port = 8088 46 /** 47 * 使用ConfigFactory的parseString方法解析字符串,指定服務端IP和端口 48 */ 49 val config = ConfigFactory.parseString( 50 s""" 51 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 52 |akka.remote.netty.tcp.hostname=$host 53 |akka.remote.netty.tcp.port=$port 54 """.stripMargin) 55 /** 56 * 將config對象傳遞給ActorSystem並起名爲"Server",爲了是建立服務端工廠對象(ServerActorSystem)。 57 */ 58 val ServerActorSystem = ActorSystem("Server", config) 59 /** 60 * 經過工廠對象建立服務端的ActorRef 61 */ 62 val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao") 63 /** 64 * 到本身的mailbox -》 receive方法 65 */ 66 serverActorRef ! "start" 67 } 68 }
2>.客戶端代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.robot 7 8 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 11 import scala.io.StdIn 12 13 class ClientActor(host: String, port: Int) extends Actor{ 14 15 var serverActorRef: ActorSelection = _ // 服務端的代理對象 16 17 // 在receive方法以前調用 18 override def preStart(): Unit = { 19 // akka.tcp://Server@127.0.0.1:8088 20 serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao") 21 } 22 // mailbox ->receive 23 override def receive: Receive = { // shit 24 case "start" => println("2018天貓精靈爲您服務!") 25 case msg: String => { // shit 26 serverActorRef ! ClientMessage(msg) // 把客戶端輸入的內容發送給 服務端(actorRef)--》服務端的mailbox中 -> 服務端的receive 27 } 28 case ServerMessage(msg) => println(s"收到服務端消息:$msg") 29 } 30 } 31 32 object ClientActor { 33 def main(args: Array[String]): Unit = { 34 35 //指定客戶端的IP和端口 36 val host = "127.0.0.1" 37 val port = 8089 38 39 //指定服務端的IP和端口 40 val serverHost = "127.0.0.1" 41 val serverPort = 8088 42 43 /** 44 * 使用ConfigFactory的parseString方法解析字符串,指定客戶端IP和端口 45 */ 46 val config = ConfigFactory.parseString( 47 s""" 48 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 49 |akka.remote.netty.tcp.hostname=$host 50 |akka.remote.netty.tcp.port=$port 51 """.stripMargin) 52 53 /** 54 * 將config對象傳遞給ActorSystem並起名爲"Server",爲了是建立客戶端工廠對象(clientActorSystem)。 55 */ 56 val clientActorSystem = ActorSystem("client", config) 57 58 // 建立dispatch | mailbox 59 val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client") 60 61 clientActorRef ! "start" // 本身給本身發送了一條消息 到本身的mailbox => receive 62 63 /** 64 * 接受用戶的輸入信息並傳送給服務端 65 */ 66 while (true) { 67 Thread.sleep(500) 68 /** 69 * StdIn.readLine方法是同步阻塞的 70 */ 71 val question = StdIn.readLine("請問有什麼我能夠幫你的嗎?>>>") 72 clientActorRef ! question 73 if (question.equals("stop")){ 74 Thread.sleep(500) 75 println("程序即將結束") 76 System.exit(0) 77 } 78 } 79 } 80 }
3>.先執行服務端再執行客戶端並輸入相應信息測試結果以下:
客戶端運行結果以下:
服務端運行結果以下: