Scala進階之路-併發編程模型Akka入門篇

               Scala進階之路-併發編程模型Akka入門篇程序員

                                  做者:尹正傑apache

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。編程

 

 

一.Akka Actor介紹小程序

1>.Akka介紹多線程

   寫併發程序很難。程序員不得不處理線程、鎖和競態條件等等,這個過程很容易出錯,並且會致使程序代碼難以閱讀、測試和維護。Akka 是 JVM 平臺上構建高併發、分佈式和容錯應用的工具包和運行時。Akka 用 Scala 語言寫成,同時提供了 Scala 和 JAVA 的開發接口。架構

 

2>.Akka 中 中 Actor  模型併發

  Akka 處理併發的方法基於 Actor 模型。在基於 Actor 的系統裏,全部的事物都是 Actor,就好像在面向對象設計裏面全部的事物都是對象同樣。可是有一個重要區別,那就是 Actor 模型是做爲一個併發模型設計和架構的,而面向對象模式則不是。Actor 與 Actor 之間只能經過消息通訊。異步

 

3>.Akaka的特色maven

  第一:它是對併發模型進行了更高的抽象;
  第二:它是異步、非阻塞、高性能的事件驅動編程模型;
  第三:它是輕量級事件處理(1GB 內存可容納百萬級別個 Actor);tcp

4>.爲何 Actor 模型是一種處理併發問題的解決方案?

  處理併發問題就是如何保證共享數據的一致性和正確性,爲何會有保持共享數據正確性這個問題呢?無非是咱們的程序是多線程的,多個線程對同一個數據進行修改,若不加同步條件,勢必會形成數據污染。那麼咱們是否是能夠轉換一下思惟,用單線程去處理相應的請求,可是又有人會問了,如果用單線程處理,那系統的性能又如何保證。Actor 模型的出現解決了這個問題,簡化併發編程,提高程序性能。

 

5>.Maven依賴

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>cn.org.yinzhengjie</groupId>
 8     <artifactId>MyActor</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11     <!-- 定義一下常量 -->
12     <properties>
13         <encoding>UTF-8</encoding>
14         <scala.version>2.11.8</scala.version>
15         <scala.compat.version>2.11</scala.compat.version>
16         <akka.version>2.4.17</akka.version>
17     </properties>
18 
19     <dependencies>
20         <!-- 添加scala的依賴 -->
21         <dependency>
22             <groupId>org.scala-lang</groupId>
23             <artifactId>scala-library</artifactId>
24             <version>${scala.version}</version>
25         </dependency>
26 
27         <!-- 添加akka的actor依賴 -->
28         <dependency>
29             <groupId>com.typesafe.akka</groupId>
30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
31             <version>${akka.version}</version>
32         </dependency>
33 
34         <!-- 多進程之間的Actor通訊 -->
35         <dependency>
36             <groupId>com.typesafe.akka</groupId>
37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
38             <version>${akka.version}</version>
39         </dependency>
40     </dependencies>
41 
42     <!-- 指定插件-->
43     <build>
44         <!-- 指定源碼包和測試包的位置 -->
45         <sourceDirectory>src/main/scala</sourceDirectory>
46         <testSourceDirectory>src/test/scala</testSourceDirectory>
47         <plugins>
48             <!-- 指定編譯scala的插件 -->
49             <plugin>
50                 <groupId>net.alchim31.maven</groupId>
51                 <artifactId>scala-maven-plugin</artifactId>
52                 <version>3.2.2</version>
53                 <executions>
54                     <execution>
55                         <goals>
56                             <goal>compile</goal>
57                             <goal>testCompile</goal>
58                         </goals>
59                         <configuration>
60                             <args>
61                                 <arg>-dependencyfile</arg>
62                                 <arg>${project.build.directory}/.scala_dependencies</arg>
63                             </args>
64                         </configuration>
65                     </execution>
66                 </executions>
67             </plugin>
68 
69 
70         </plugins>
71     </build>
72 </project>

 

  自定義默認的源代碼包和測試包的位置,須要手動穿件Source Root目錄喲,以下圖:

 

 

二.編寫HelloActor

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.actor
 7 
 8 import akka.actor.{Actor, ActorSystem, Props}
 9 
10 import scala.io.StdIn
11 
12 class HelloActor extends Actor{
13     // 重寫接受消息的偏函數,其功能是接受消息並處理
14     override def receive: Receive = {
15         case "你好帥" => println("竟說實話,我喜歡你這種人!")
16         case "醜八怪" => println("滾犢子 !")
17         case "stop" => {
18             context.stop(self) // 中止本身的actorRef
19             context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)
20         }
21     }
22 }
23 
24 object HelloActor {
25     /**
26       * 建立線程池對象MyFactory,用來建立actor的對象的
27       */
28     private val MyFactory = ActorSystem("myFactory")    //裏面的"myFactory"參數爲線程池的名稱
29     /**
30       *     經過MyFactory.actorOf方法來建立一個actor,注意,Props方法的第一個參數須要傳遞咱們自定義的HelloActor類,
31       * 第二個參數是給actor起個名字
32       */
33     private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")
34 
35     def main(args: Array[String]): Unit = {
36         var flag = true
37         while(flag){
38             /**
39               * 接受用戶輸入的字符串
40               */
41             print("請輸入您想發送的消息:")
42             val consoleLine:String = StdIn.readLine()
43             /**
44               * 使用helloActorRef來給本身發送消息,helloActorRef有一個叫作感嘆號("!")的方法來發送消息
45               */
46             helloActorRef ! consoleLine
47             if (consoleLine.equals("stop")){
48                 flag = false
49                 println("程序即將結束!")
50             }
51             /**
52               * 爲了避免讓while的運行速度在receive方法之上,咱們可讓他休眠0.1秒
53               */
54             Thread.sleep(100)
55         }
56     }
57 }

  以上代碼執行結果以下:

 

三.兩個actor通訊案例-模擬下棋對話

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.actor
 7 
 8 import akka.actor.{ActorSystem, Props}
 9 
10 import akka.actor.{Actor, ActorRef}
11 
12 /**
13   * 定義玩家1
14   */
15 class player1Actor(val p2: ActorRef) extends Actor{
16     // receive方法是負責處理消息的
17     override def receive: Receive = {
18         case "start" => {
19             println("棋聖:I'm OK !")
20             p2 ! "該你了"
21         }
22         case "將軍" => {
23             println("棋聖:你真猛!")
24             Thread.sleep(1000)
25             p2 ! "該你了"
26         }
27     }
28 }
29 
30 
31 /**
32   * 定義玩家2
33   */
34 class player2Actor extends Actor{
35 
36     override def receive: Receive = {
37         case "start" => println("棋仙說:I'm OK !")
38         case "該你了" => {
39             println("棋仙:那必須滴!")
40             Thread.sleep(1000)
41             /**
42               * 注意,這個「sender()」,其實就是對ActorRef的一個引用。它指的是給發送"該你了"的這個對象自己!
43               */
44             sender() ! "將軍"
45         }
46     }
47 }
48 
49 
50 object ChineseChess extends App{
51     // 建立 actorSystem的工廠,用來生產ActorRef對象!
52     private val ChineseChessActorSystem = ActorSystem("Chinese-chess")
53     /**
54       * 經過actorSystem建立ActorRef
55       */
56     private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2")            //建立player2Actor對象
57     private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1")   //建立player1Actor對象
58 
59     p2 ! "start"
60     p1 ! "start"
61 }

  運行以上代碼輸出結果以下:

 

四.服務端和客戶端交互的小程序

1>.服務端代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.robot
 7 
 8 import akka.actor.{Actor, ActorSystem, Props}
 9 import com.typesafe.config.ConfigFactory
10 
11 class ServerActor extends Actor{
12     /**
13       * receive方法是用來處理客戶端發送過來的問題的
14       */
15     override def receive: Receive = {
16         case "start" => println("天貓系統已啓動...")
17 
18         case ClientMessage(msg) => {
19             println(s"收到客戶端消息:$msg")
20             msg match {
21                 /**
22                   * sender()發送端的代理對象, 發送到客戶端的mailbox中 -> 客戶端的receive
23                    */
24                 case "你叫啥" =>
25                     sender() ! ServerMessage("本寶寶是天貓精靈")
26                 case "你是男是女" =>
27                     sender() ! ServerMessage("本寶寶非男非女")
28                 case "你有男票嗎" =>
29                     sender() ! ServerMessage("本寶寶還小喲")
30                 case "stop" =>
31                     context.stop(self) // 中止本身的actorRef
32                     context.system.terminate() // 關閉ActorSystem,即關閉其內部的線程池(ExcutorService)
33                     println("天貓系統已中止...")
34                 case _ =>
35                     sender() ! ServerMessage("對不起,主人,我不知道你在說什麼.......")
36             }
37         }
38     }
39 }
40 
41 object ServerActor {
42     def main(args: Array[String]): Unit = {
43         //定義服務端的ip和端口
44         val host = "127.0.0.1"
45         val port = 8088
46         /**
47           * 使用ConfigFactory的parseString方法解析字符串,指定服務端IP和端口
48           */
49         val config = ConfigFactory.parseString(
50             s"""
51                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
52                |akka.remote.netty.tcp.hostname=$host
53                |akka.remote.netty.tcp.port=$port
54         """.stripMargin)
55         /**
56           * 將config對象傳遞給ActorSystem並起名爲"Server",爲了是建立服務端工廠對象(ServerActorSystem)。
57           */
58         val ServerActorSystem = ActorSystem("Server", config)
59         /**
60           * 經過工廠對象建立服務端的ActorRef
61           */
62         val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao")
63         /**
64           * 到本身的mailbox -》 receive方法
65           */
66         serverActorRef ! "start"
67     }
68 }

2>.客戶端代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.robot
 7 
 8 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
 9 import com.typesafe.config.ConfigFactory
10 
11 import scala.io.StdIn
12 
13 class ClientActor(host: String, port: Int) extends Actor{
14 
15     var serverActorRef: ActorSelection = _ // 服務端的代理對象
16 
17     // 在receive方法以前調用
18     override def preStart(): Unit = {
19         // akka.tcp://Server@127.0.0.1:8088
20         serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao")
21     }
22     // mailbox ->receive
23     override def receive: Receive = { // shit
24         case "start" => println("2018天貓精靈爲您服務!")
25         case msg: String => { // shit
26             serverActorRef ! ClientMessage(msg) // 把客戶端輸入的內容發送給 服務端(actorRef)--》服務端的mailbox中 -> 服務端的receive
27         }
28         case ServerMessage(msg) => println(s"收到服務端消息:$msg")
29     }
30 }
31 
32 object ClientActor  {
33     def main(args: Array[String]): Unit = {
34 
35         //指定客戶端的IP和端口
36         val host = "127.0.0.1"
37         val port  = 8089
38 
39         //指定服務端的IP和端口
40         val serverHost = "127.0.0.1"
41         val serverPort = 8088
42 
43         /**
44           * 使用ConfigFactory的parseString方法解析字符串,指定客戶端IP和端口
45           */
46         val config = ConfigFactory.parseString(
47             s"""
48                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
49                |akka.remote.netty.tcp.hostname=$host
50                |akka.remote.netty.tcp.port=$port
51         """.stripMargin)
52 
53         /**
54           * 將config對象傳遞給ActorSystem並起名爲"Server",爲了是建立客戶端工廠對象(clientActorSystem)。
55           */
56         val clientActorSystem = ActorSystem("client", config)
57 
58         // 建立dispatch | mailbox
59         val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client")
60 
61         clientActorRef ! "start" // 本身給本身發送了一條消息 到本身的mailbox => receive
62 
63         /**
64           * 接受用戶的輸入信息並傳送給服務端
65           */
66         while (true) {
67             Thread.sleep(500)
68             /**
69               * StdIn.readLine方法是同步阻塞的
70               */
71             val question = StdIn.readLine("請問有什麼我能夠幫你的嗎?>>>")
72             clientActorRef ! question
73             if (question.equals("stop")){
74                 Thread.sleep(500)
75                 println("程序即將結束")
76                 System.exit(0)
77             }
78         }
79     }
80 }

3>.先執行服務端再執行客戶端並輸入相應信息測試結果以下:

  客戶端運行結果以下:

  服務端運行結果以下:

相關文章
相關標籤/搜索