Scala進階之路-Spark底層通訊小案例

           Scala進階之路-Spark底層通訊小案例java

                            做者:尹正傑linux

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。apache

 

 

 

 

一.Spark Master和worker通訊過程簡介dom

  1>.Worker會向master註冊本身;maven

  2>.Master收到worker的註冊信息以後,會告訴你已經註冊成功,並給worker發送啓動執行器的消息;tcp

  3>.Worker收到master的註冊消息以後,會按期向master彙報本身的狀態;ide

  4>.master收到worker的心跳信息後,按期的更新worker的狀態,由於worker在發送心跳的時候會攜帶心跳發送的時間,master會監測master發送過來的心跳信時間和當前時間的差,若是大於5分鐘,master會監測發送過來的心跳時間和當前時間的差,若是大於5分鐘,則認爲worker已死。而後master在分配任務的時候就不會給worker下發任務!測試

  關於Master和Worker之間的通訊機制,咱們能夠用如下一張圖介紹:ui

二.編寫源代碼spa

1>.Maven依賴

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>cn.org.yinzhengjie</groupId>
 8     <artifactId>MyActor</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11     <!-- 定義一下常量 -->
12     <properties>
13         <encoding>UTF-8</encoding>
14         <scala.version>2.11.8</scala.version>
15         <scala.compat.version>2.11</scala.compat.version>
16         <akka.version>2.4.17</akka.version>
17     </properties>
18 
19     <dependencies>
20         <!-- 添加scala的依賴 -->
21         <dependency>
22             <groupId>org.scala-lang</groupId>
23             <artifactId>scala-library</artifactId>
24             <version>${scala.version}</version>
25         </dependency>
26 
27         <!-- 添加akka的actor依賴 -->
28         <dependency>
29             <groupId>com.typesafe.akka</groupId>
30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
31             <version>${akka.version}</version>
32         </dependency>
33 
34         <!-- 多進程之間的Actor通訊 -->
35         <dependency>
36             <groupId>com.typesafe.akka</groupId>
37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
38             <version>${akka.version}</version>
39         </dependency>
40     </dependencies>
41 
42     <!-- 指定插件-->
43     <build>
44         <!-- 指定源碼包和測試包的位置 -->
45         <sourceDirectory>src/main/scala</sourceDirectory>
46         <testSourceDirectory>src/test/scala</testSourceDirectory>
47     </build>
48 </project>

2>.MessageProtocol源代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.spark
 7 
 8 /**
 9   * worker -> master ,即worker向master發送消息
10   */
11 case class RegisterWorkerInfo(id: String, core: Int, ram: Int)          // worker向master註冊本身(信息)
12 case class HearBeat(id: String)                                        // worker給master發送心跳信息
13 
14 /**
15   * master -> worker,即master向worker發送消息
16   */
17 case object RegisteredWorkerInfo                    // master向worker發送註冊成功消息
18 case object SendHeartBeat                           // worker 發送發送給本身的消息,告訴本身說要開始週期性的向master發送心跳消息
19 case object CheckTimeOutWorker                      //master本身給本身發送一個檢查超時worker的信息,並啓動一個調度器,週期新檢測刪除超時worker
20 case object RemoveTimeOutWorker                     // master發送給本身的消息,刪除超時的worker
21 
22 /**
23   * 定義存儲worker信息的類
24   * @param id          :    每一個worker的id是不變的且惟一的!
25   * @param core        :    機器的核數
26   * @param ram         :    內存大小
27   */
28 case class WorkerInfo(val id: String, core: Int, ram: Int) {
29     //定義最後一次的心跳時間,初始值爲null。
30     var lastHeartBeatTime: Long = _
31 }

3>.SparkWorker源代碼

 1 /*
 2 @author :yinzhengjie
 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
 4 EMAIL:y1053419035@qq.com
 5 */
 6 package cn.org.yinzhengjie.spark
 7 
 8 import java.util.UUID
 9 
10 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
11 import com.typesafe.config.ConfigFactory
12 import scala.concurrent.duration._          // 導入時間單位
13 
14 /**
15   * 定義主構造器,用於指定master的地址
16   */
17 class SparkWorker(masterUrl: String) extends Actor{
18     var masterProxy:ActorSelection = _                  //定義master的引用對象(actorRef)
19     val workId:String = UUID.randomUUID().toString      //定義worker的uuid,每一個worker的id是不變的且惟一的!
20     /**
21       * 經過preStart方法拿到master的引用對象(actorRef),咱們重寫該方法就會在receive方法執行以前執行!也就是拿到master對象只須要拿一次。
22       */
23     override def preStart(): Unit = {
24         masterProxy = context.actorSelection(masterUrl)
25     }
26     override def receive: Receive = {
27         case "started" => { // 本身已就緒
28             // 向master註冊本身的信息,id, core, ram
29             masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此時master會收到該條信息
30         }
31 
32         /**
33           * 處理master發送給本身的註冊成功消息
34           */
35         case RegisteredWorkerInfo => {
36             import context.dispatcher // 使用調度器時候必須導入dispatcher,由於該包涉及到隱式轉換的東西。
37             /**
38               *         worker經過"context.system.scheduler.schedule"啓動一個定時器,定時向master 發送心跳信息,須要指定
39               * 四個參數:
40               *         第一個參數是須要指定延時時間,此處指定的間隔時間爲0毫秒;
41               *         第二個參數是間隔時間,即指定定時器的週期性執行時間,咱們這裏指定爲1秒;
42               *         第三個參數是發送消息給誰,咱們這裏指定發送消息給本身,使用變量self便可;
43               *         第四個參數是指發送消息的具體內容;
44               *         注意:因爲咱們將消息週期性的發送給本身,所以咱們本身須要接受消息並處理,也就是須要定義下面的SendHeartBeat
45               */
46             context.system.scheduler.schedule(0 millis, 1000 millis, self, SendHeartBeat)
47         }
48         case SendHeartBeat => {
49             // 開始向master發送心跳了
50             println(s"------- $workId 發送心跳 -------")
51             masterProxy ! HearBeat(workId) // 此時master將會收到心跳信息
52         }
53     }
54 }
55 
56 
57 object SparkWorker {
58     def main(args: Array[String]): Unit = {
59         // 檢驗參數
60         if(args.length != 4) {
61             println(
62                 """
63                   |請輸入參數:<host> <port> <workName> <masterURL>
64                 """.stripMargin)
65             sys.exit() // 退出程序
66         }
67         /**
68           * 定義參數,主機,端口號,worker名稱以及master的URL。
69           */
70         val host = args(0)
71         val port = args(1)
72         val workName = args(2)
73         val masterURL = args(3)
74         /**
75           * 咱們使用ConfigFactory.parseString來建立讀取參數配置的對象config
76           */
77         val config = ConfigFactory.parseString(
78             s"""
79               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
80               |akka.remote.netty.tcp.hostname=$host
81               |akka.remote.netty.tcp.port=$port
82             """.stripMargin)
83         val actorSystem = ActorSystem("sparkWorker", config)
84         /**
85           * 建立worker的actorRef
86           */
87         val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName)
88         workerActorRef ! "started"      //給本身發送一個以啓動的消息,表示本身已經就緒了
89     }
90 }

4>.SparkMaster源代碼

  1 /*
  2 @author :yinzhengjie
  3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
  4 EMAIL:y1053419035@qq.com
  5 */
  6 package cn.org.yinzhengjie.spark
  7 
  8 import akka.actor.{Actor, ActorSystem, Props}
  9 import com.typesafe.config.ConfigFactory
 10 import scala.concurrent.duration._
 11 
 12 class SparkMaster extends Actor{
 13 
 14     // 定義存儲worker的信息的saveWorkerInfo對象
 15     val saveWorkerInfo = collection.mutable.HashMap[String, WorkerInfo]()
 16 
 17 //    override def preStart(): Unit = {
 18 //        context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker)
 19 //    }
 20 
 21     override def receive: Receive = {
 22         /**
 23           * 處理收到worker註冊過來的信息
 24           */
 25         case RegisterWorkerInfo(wkId, core, ram) => {
 26             /**
 27               * 存儲以前須要判斷以前是否已經存儲過了,若是沒有存儲就以wkId爲key將worker的信息存儲起來,存儲到HashMap,
 28               */
 29             if (!saveWorkerInfo.contains(wkId)) {
 30                 val workerInfo = new WorkerInfo(wkId, core, ram)
 31                 saveWorkerInfo += ((wkId, workerInfo))
 32                 /**
 33                   * master存儲完worker註冊的數據以後,要告訴worker說你已經註冊成功
 34                   */
 35                 sender() ! RegisteredWorkerInfo // 此時worker會收到註冊成功消息
 36             }
 37         }
 38         /**
 39           *  master收到worker的心跳消息以後,更新woker的上一次心跳時間
 40           */
 41         case HearBeat(wkId) => {
 42             val workerInfo = saveWorkerInfo(wkId)
 43             val currentTime = System.currentTimeMillis()
 44             workerInfo.lastHeartBeatTime = currentTime          // 更改心跳時間
 45         }
 46         case CheckTimeOutWorker => {
 47             import context.dispatcher // 使用調度器時候必須導入dispatcher,由於該包涉及到隱式轉換的東西。
 48             context.system.scheduler.schedule(0 millis, 5000 millis, self, RemoveTimeOutWorker)
 49         }
 50         case RemoveTimeOutWorker => {
 51             /**
 52               *         將hashMap中的全部的value都拿出來,而後查看當前時間和上一次心跳時間差是否超過三次心跳時間,
 53               * 即三次沒有發送心跳信息就認爲超時,每次心跳時間默認爲1000毫秒,三次則爲3000毫秒
 54               */
 55             val workerInfos = saveWorkerInfo.values
 56             val currentTime = System.currentTimeMillis()
 57 
 58 
 59             workerInfos
 60               .filter(workerInfo => currentTime - workerInfo.lastHeartBeatTime > 3000)        //過濾超時的worker
 61               .foreach(workerInfo => saveWorkerInfo.remove(workerInfo.id))                    //將過濾超時的worker刪除掉
 62             println(s"====== 還剩 ${saveWorkerInfo.size} 存活的Worker ======")
 63         }
 64     }
 65 }
 66 
 67 object SparkMaster {
 68     private var name = ""
 69     private val age = 100
 70     def main(args: Array[String]): Unit = {
 71         // 檢驗參數
 72         if(args.length != 3) {
 73             println(
 74                 """
 75                   |請輸入參數:<host> <port> <masterName>
 76                 """.stripMargin)
 77             sys.exit() // 退出程序
 78         }
 79         /**
 80           * 定義參數,主機,端口號,master名稱
 81           */
 82         val host = args(0)
 83         val port = args(1)
 84         val masterName = args(2)
 85         /**
 86           * 咱們使用ConfigFactory.parseString來建立讀取參數配置的對象config
 87           */
 88         val config = ConfigFactory.parseString(
 89             s"""
 90                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
 91                |akka.remote.netty.tcp.hostname=$host
 92                |akka.remote.netty.tcp.port=$port
 93             """.stripMargin)
 94 
 95         val actorSystem = ActorSystem("sparkMaster", config)
 96         val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName)
 97         /**
 98           * 本身給本身發送一個消息,去啓動一個調度器,按期的檢測HashMap中超時的worker
 99           */
100         masterActorRef ! CheckTimeOutWorker
101     }
102 }

 

三.本機測試

1>.啓動master端

配置參數以下:
127.0.0.1 8888 master

2>.啓動woker端

兩個worker的配置參數以下:
127.0.0.1 6665 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master
127.0.0.1 6666 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master

  服務端輸出以下:

 

四.master worker打包部署到linux多臺服務測試

1>.打包SparkMaster

  第一步:修改Maven配置以下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>cn.org.yinzhengjie</groupId>
 7     <artifactId>MyActor</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9     <!-- 定義一下常量 -->
10     <properties>
11         <encoding>UTF-8</encoding>
12         <scala.version>2.11.8</scala.version>
13         <scala.compat.version>2.11</scala.compat.version>
14         <akka.version>2.4.17</akka.version>
15     </properties>
16     <dependencies>
17         <!-- 添加scala的依賴 -->
18         <dependency>
19             <groupId>org.scala-lang</groupId>
20             <artifactId>scala-library</artifactId>
21             <version>${scala.version}</version>
22         </dependency>
23         <!-- 添加akka的actor依賴 -->
24         <dependency>
25             <groupId>com.typesafe.akka</groupId>
26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
27             <version>${akka.version}</version>
28         </dependency>
29         <!-- 多進程之間的Actor通訊 -->
30         <dependency>
31             <groupId>com.typesafe.akka</groupId>
32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
33             <version>${akka.version}</version>
34         </dependency>
35     </dependencies>
36     <!-- 指定插件-->
37     <build>
38         <!-- 指定源碼包和測試包的位置 -->
39         <sourceDirectory>src/main/scala</sourceDirectory>
40         <testSourceDirectory>src/test/scala</testSourceDirectory>
41         <plugins>
42             <!-- 指定編譯scala的插件 -->
43             <plugin>
44                 <groupId>net.alchim31.maven</groupId>
45                 <artifactId>scala-maven-plugin</artifactId>
46                 <version>3.2.2</version>
47                 <executions>
48                     <execution>
49                         <goals>
50                             <goal>compile</goal>
51                             <goal>testCompile</goal>
52                         </goals>
53                         <configuration>
54                             <args>
55                                 <arg>-dependencyfile</arg>
56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
57                             </args>
58                         </configuration>
59                     </execution>
60                 </executions>
61             </plugin>
62             <!-- maven打包的插件 -->
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-shade-plugin</artifactId>
66                 <version>2.4.3</version>
67                 <executions>
68                     <execution>
69                         <phase>package</phase>
70                         <goals>
71                             <goal>shade</goal>
72                         </goals>
73                         <configuration>
74                             <filters>
75                                 <filter>
76                                     <artifact>*:*</artifact>
77                                     <excludes>
78                                         <exclude>META-INF/*.SF</exclude>
79                                         <exclude>META-INF/*.DSA</exclude>
80                                         <exclude>META-INF/*.RSA</exclude>
81                                     </excludes>
82                                 </filter>
83                             </filters>
84                             <transformers>
85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
86                                     <resource>reference.conf</resource>
87                                 </transformer>
88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkMaster -->
89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
90                                     <mainClass>cn.org.yinzhengjie.spark.SparkMaster</mainClass>
91                                 </transformer>
92                             </transformers>
93                         </configuration>
94                     </execution>
95                 </executions>
96             </plugin>
97         </plugins>
98     </build>
99 </project>
指定main方法:cn.org.yinzhengjie.spark.SparkMaster

  第二步:點擊package開始打包:

  第三步:查看依賴包內部結構:

 

2>.打包SparkWorker

  第一步:修改Maven配置以下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>cn.org.yinzhengjie</groupId>
 7     <artifactId>MyActor</artifactId>
 8     <version>1.0-SNAPSHOT</version>
 9     <!-- 定義一下常量 -->
10     <properties>
11         <encoding>UTF-8</encoding>
12         <scala.version>2.11.8</scala.version>
13         <scala.compat.version>2.11</scala.compat.version>
14         <akka.version>2.4.17</akka.version>
15     </properties>
16     <dependencies>
17         <!-- 添加scala的依賴 -->
18         <dependency>
19             <groupId>org.scala-lang</groupId>
20             <artifactId>scala-library</artifactId>
21             <version>${scala.version}</version>
22         </dependency>
23         <!-- 添加akka的actor依賴 -->
24         <dependency>
25             <groupId>com.typesafe.akka</groupId>
26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
27             <version>${akka.version}</version>
28         </dependency>
29         <!-- 多進程之間的Actor通訊 -->
30         <dependency>
31             <groupId>com.typesafe.akka</groupId>
32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
33             <version>${akka.version}</version>
34         </dependency>
35     </dependencies>
36     <!-- 指定插件-->
37     <build>
38         <!-- 指定源碼包和測試包的位置 -->
39         <sourceDirectory>src/main/scala</sourceDirectory>
40         <testSourceDirectory>src/test/scala</testSourceDirectory>
41         <plugins>
42             <!-- 指定編譯scala的插件 -->
43             <plugin>
44                 <groupId>net.alchim31.maven</groupId>
45                 <artifactId>scala-maven-plugin</artifactId>
46                 <version>3.2.2</version>
47                 <executions>
48                     <execution>
49                         <goals>
50                             <goal>compile</goal>
51                             <goal>testCompile</goal>
52                         </goals>
53                         <configuration>
54                             <args>
55                                 <arg>-dependencyfile</arg>
56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
57                             </args>
58                         </configuration>
59                     </execution>
60                 </executions>
61             </plugin>
62             <!-- maven打包的插件 -->
63             <plugin>
64                 <groupId>org.apache.maven.plugins</groupId>
65                 <artifactId>maven-shade-plugin</artifactId>
66                 <version>2.4.3</version>
67                 <executions>
68                     <execution>
69                         <phase>package</phase>
70                         <goals>
71                             <goal>shade</goal>
72                         </goals>
73                         <configuration>
74                             <filters>
75                                 <filter>
76                                     <artifact>*:*</artifact>
77                                     <excludes>
78                                         <exclude>META-INF/*.SF</exclude>
79                                         <exclude>META-INF/*.DSA</exclude>
80                                         <exclude>META-INF/*.RSA</exclude>
81                                     </excludes>
82                                 </filter>
83                             </filters>
84                             <transformers>
85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
86                                     <resource>reference.conf</resource>
87                                 </transformer>
88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkWorker -->
89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
90                                     <mainClass>cn.org.yinzhengjie.spark.SparkWorker</mainClass>
91                                 </transformer>
92                             </transformers>
93                         </configuration>
94                     </execution>
95                 </executions>
96             </plugin>
97         </plugins>
98     </build>
99 </project>
指定main方法:cn.org.yinzhengjie.spark.SparkWorker

  接下來的兩步仍是和上面的步驟一直,將打包完成後的文件更名並查看主類信息以下:

3>.開啓三臺虛擬機並在master節點上傳master.jar並運行

[yinzhengjie@s101 download]$ ll
total 67320
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124547 Jul 31 20:42 master.jar
-rw-r--r--. 1 yinzhengjie yinzhengjie 28678231 Jul 20 21:18 scala-2.11.8.tgz
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 21:52 worker.jar
[yinzhengjie@s101 download]$ 
[yinzhengjie@s101 download]$ 
[yinzhengjie@s101 download]$ java -jar master.jar 172.16.30.101 8888 master

 

4>.將worker.jar包上傳到另外的兩個節點並運行,以下:

  172.16.30.102節點操做以下:
[yinzhengjie@s102 download]$ ll
total 19656
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:01 worker.jar
[yinzhengjie@s102 download]$ 
[yinzhengjie@s102 download]$ java -jar worker.jar 172.16.30.102 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master
[yinzhengjie@s102 download]$ 

  172.16.30.103節點操做以下:
[yinzhengjie@s103 download]$ ll
total 19656
-rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:00 worker.jar
[yinzhengjie@s103 download]$ 
[yinzhengjie@s103 download]$ 
[yinzhengjie@s103 download]$ java -jar worker.jar 172.16.30.103 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master

  172.16.30.102節點操做以下:

 

  172.16.30.103節點操做以下:

  172.16.30.101輸出信息以下:

相關文章
相關標籤/搜索