Scala進階之路-Spark底層通訊小案例java
做者:尹正傑linux
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。apache
一.Spark Master和worker通訊過程簡介dom
1>.Worker會向master註冊本身;maven
2>.Master收到worker的註冊信息以後,會告訴你已經註冊成功,並給worker發送啓動執行器的消息;tcp
3>.Worker收到master的註冊消息以後,會按期向master彙報本身的狀態;ide
4>.master收到worker的心跳信息後,按期的更新worker的狀態,由於worker在發送心跳的時候會攜帶心跳發送的時間,master會監測master發送過來的心跳信時間和當前時間的差,若是大於5分鐘,master會監測發送過來的心跳時間和當前時間的差,若是大於5分鐘,則認爲worker已死。而後master在分配任務的時候就不會給worker下發任務!測試
關於Master和Worker之間的通訊機制,咱們能夠用如下一張圖介紹:ui
二.編寫源代碼spa
1>.Maven依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>cn.org.yinzhengjie</groupId> 8 <artifactId>MyActor</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- 定義一下常量 --> 12 <properties> 13 <encoding>UTF-8</encoding> 14 <scala.version>2.11.8</scala.version> 15 <scala.compat.version>2.11</scala.compat.version> 16 <akka.version>2.4.17</akka.version> 17 </properties> 18 19 <dependencies> 20 <!-- 添加scala的依賴 --> 21 <dependency> 22 <groupId>org.scala-lang</groupId> 23 <artifactId>scala-library</artifactId> 24 <version>${scala.version}</version> 25 </dependency> 26 27 <!-- 添加akka的actor依賴 --> 28 <dependency> 29 <groupId>com.typesafe.akka</groupId> 30 <artifactId>akka-actor_${scala.compat.version}</artifactId> 31 <version>${akka.version}</version> 32 </dependency> 33 34 <!-- 多進程之間的Actor通訊 --> 35 <dependency> 36 <groupId>com.typesafe.akka</groupId> 37 <artifactId>akka-remote_${scala.compat.version}</artifactId> 38 <version>${akka.version}</version> 39 </dependency> 40 </dependencies> 41 42 <!-- 指定插件--> 43 <build> 44 <!-- 指定源碼包和測試包的位置 --> 45 <sourceDirectory>src/main/scala</sourceDirectory> 46 <testSourceDirectory>src/test/scala</testSourceDirectory> 47 </build> 48 </project>
2>.MessageProtocol源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 /** 9 * worker -> master ,即worker向master發送消息 10 */ 11 case class RegisterWorkerInfo(id: String, core: Int, ram: Int) // worker向master註冊本身(信息) 12 case class HearBeat(id: String) // worker給master發送心跳信息 13 14 /** 15 * master -> worker,即master向worker發送消息 16 */ 17 case object RegisteredWorkerInfo // master向worker發送註冊成功消息 18 case object SendHeartBeat // worker 發送發送給本身的消息,告訴本身說要開始週期性的向master發送心跳消息 19 case object CheckTimeOutWorker //master本身給本身發送一個檢查超時worker的信息,並啓動一個調度器,週期新檢測刪除超時worker 20 case object RemoveTimeOutWorker // master發送給本身的消息,刪除超時的worker 21 22 /** 23 * 定義存儲worker信息的類 24 * @param id : 每一個worker的id是不變的且惟一的! 25 * @param core : 機器的核數 26 * @param ram : 內存大小 27 */ 28 case class WorkerInfo(val id: String, core: Int, ram: Int) { 29 //定義最後一次的心跳時間,初始值爲null。 30 var lastHeartBeatTime: Long = _ 31 }
3>.SparkWorker源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 import java.util.UUID 9 10 import akka.actor.{Actor, ActorSelection, ActorSystem, Props} 11 import com.typesafe.config.ConfigFactory 12 import scala.concurrent.duration._ // 導入時間單位 13 14 /** 15 * 定義主構造器,用於指定master的地址 16 */ 17 class SparkWorker(masterUrl: String) extends Actor{ 18 var masterProxy:ActorSelection = _ //定義master的引用對象(actorRef) 19 val workId:String = UUID.randomUUID().toString //定義worker的uuid,每一個worker的id是不變的且惟一的! 20 /** 21 * 經過preStart方法拿到master的引用對象(actorRef),咱們重寫該方法就會在receive方法執行以前執行!也就是拿到master對象只須要拿一次。 22 */ 23 override def preStart(): Unit = { 24 masterProxy = context.actorSelection(masterUrl) 25 } 26 override def receive: Receive = { 27 case "started" => { // 本身已就緒 28 // 向master註冊本身的信息,id, core, ram 29 masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此時master會收到該條信息 30 } 31 32 /** 33 * 處理master發送給本身的註冊成功消息 34 */ 35 case RegisteredWorkerInfo => { 36 import context.dispatcher // 使用調度器時候必須導入dispatcher,由於該包涉及到隱式轉換的東西。 37 /** 38 * worker經過"context.system.scheduler.schedule"啓動一個定時器,定時向master 發送心跳信息,須要指定 39 * 四個參數: 40 * 第一個參數是須要指定延時時間,此處指定的間隔時間爲0毫秒; 41 * 第二個參數是間隔時間,即指定定時器的週期性執行時間,咱們這裏指定爲1秒; 42 * 第三個參數是發送消息給誰,咱們這裏指定發送消息給本身,使用變量self便可; 43 * 第四個參數是指發送消息的具體內容; 44 * 注意:因爲咱們將消息週期性的發送給本身,所以咱們本身須要接受消息並處理,也就是須要定義下面的SendHeartBeat 45 */ 46 context.system.scheduler.schedule(0 millis, 1000 millis, self, SendHeartBeat) 47 } 48 case SendHeartBeat => { 49 // 開始向master發送心跳了 50 println(s"------- $workId 發送心跳 -------") 51 masterProxy ! HearBeat(workId) // 此時master將會收到心跳信息 52 } 53 } 54 } 55 56 57 object SparkWorker { 58 def main(args: Array[String]): Unit = { 59 // 檢驗參數 60 if(args.length != 4) { 61 println( 62 """ 63 |請輸入參數:<host> <port> <workName> <masterURL> 64 """.stripMargin) 65 sys.exit() // 退出程序 66 } 67 /** 68 * 定義參數,主機,端口號,worker名稱以及master的URL。 69 */ 70 val host = args(0) 71 val port = args(1) 72 val workName = args(2) 73 val masterURL = args(3) 74 /** 75 * 咱們使用ConfigFactory.parseString來建立讀取參數配置的對象config 76 */ 77 val config = ConfigFactory.parseString( 78 s""" 79 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 80 |akka.remote.netty.tcp.hostname=$host 81 |akka.remote.netty.tcp.port=$port 82 """.stripMargin) 83 val actorSystem = ActorSystem("sparkWorker", config) 84 /** 85 * 建立worker的actorRef 86 */ 87 val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName) 88 workerActorRef ! "started" //給本身發送一個以啓動的消息,表示本身已經就緒了 89 } 90 }
4>.SparkMaster源代碼
1 /* 2 @author :yinzhengjie 3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/ 4 EMAIL:y1053419035@qq.com 5 */ 6 package cn.org.yinzhengjie.spark 7 8 import akka.actor.{Actor, ActorSystem, Props} 9 import com.typesafe.config.ConfigFactory 10 import scala.concurrent.duration._ 11 12 class SparkMaster extends Actor{ 13 14 // 定義存儲worker的信息的saveWorkerInfo對象 15 val saveWorkerInfo = collection.mutable.HashMap[String, WorkerInfo]() 16 17 // override def preStart(): Unit = { 18 // context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker) 19 // } 20 21 override def receive: Receive = { 22 /** 23 * 處理收到worker註冊過來的信息 24 */ 25 case RegisterWorkerInfo(wkId, core, ram) => { 26 /** 27 * 存儲以前須要判斷以前是否已經存儲過了,若是沒有存儲就以wkId爲key將worker的信息存儲起來,存儲到HashMap, 28 */ 29 if (!saveWorkerInfo.contains(wkId)) { 30 val workerInfo = new WorkerInfo(wkId, core, ram) 31 saveWorkerInfo += ((wkId, workerInfo)) 32 /** 33 * master存儲完worker註冊的數據以後,要告訴worker說你已經註冊成功 34 */ 35 sender() ! RegisteredWorkerInfo // 此時worker會收到註冊成功消息 36 } 37 } 38 /** 39 * master收到worker的心跳消息以後,更新woker的上一次心跳時間 40 */ 41 case HearBeat(wkId) => { 42 val workerInfo = saveWorkerInfo(wkId) 43 val currentTime = System.currentTimeMillis() 44 workerInfo.lastHeartBeatTime = currentTime // 更改心跳時間 45 } 46 case CheckTimeOutWorker => { 47 import context.dispatcher // 使用調度器時候必須導入dispatcher,由於該包涉及到隱式轉換的東西。 48 context.system.scheduler.schedule(0 millis, 5000 millis, self, RemoveTimeOutWorker) 49 } 50 case RemoveTimeOutWorker => { 51 /** 52 * 將hashMap中的全部的value都拿出來,而後查看當前時間和上一次心跳時間差是否超過三次心跳時間, 53 * 即三次沒有發送心跳信息就認爲超時,每次心跳時間默認爲1000毫秒,三次則爲3000毫秒 54 */ 55 val workerInfos = saveWorkerInfo.values 56 val currentTime = System.currentTimeMillis() 57 58 59 workerInfos 60 .filter(workerInfo => currentTime - workerInfo.lastHeartBeatTime > 3000) //過濾超時的worker 61 .foreach(workerInfo => saveWorkerInfo.remove(workerInfo.id)) //將過濾超時的worker刪除掉 62 println(s"====== 還剩 ${saveWorkerInfo.size} 存活的Worker ======") 63 } 64 } 65 } 66 67 object SparkMaster { 68 private var name = "" 69 private val age = 100 70 def main(args: Array[String]): Unit = { 71 // 檢驗參數 72 if(args.length != 3) { 73 println( 74 """ 75 |請輸入參數:<host> <port> <masterName> 76 """.stripMargin) 77 sys.exit() // 退出程序 78 } 79 /** 80 * 定義參數,主機,端口號,master名稱 81 */ 82 val host = args(0) 83 val port = args(1) 84 val masterName = args(2) 85 /** 86 * 咱們使用ConfigFactory.parseString來建立讀取參數配置的對象config 87 */ 88 val config = ConfigFactory.parseString( 89 s""" 90 |akka.actor.provider="akka.remote.RemoteActorRefProvider" 91 |akka.remote.netty.tcp.hostname=$host 92 |akka.remote.netty.tcp.port=$port 93 """.stripMargin) 94 95 val actorSystem = ActorSystem("sparkMaster", config) 96 val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName) 97 /** 98 * 本身給本身發送一個消息,去啓動一個調度器,按期的檢測HashMap中超時的worker 99 */ 100 masterActorRef ! CheckTimeOutWorker 101 } 102 }
三.本機測試
1>.啓動master端
配置參數以下:
127.0.0.1 8888 master
2>.啓動woker端
兩個worker的配置參數以下: 127.0.0.1 6665 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master 127.0.0.1 6666 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master
服務端輸出以下:
四.master worker打包部署到linux多臺服務測試
1>.打包SparkMaster
第一步:修改Maven配置以下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>cn.org.yinzhengjie</groupId> 7 <artifactId>MyActor</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 <!-- 定義一下常量 --> 10 <properties> 11 <encoding>UTF-8</encoding> 12 <scala.version>2.11.8</scala.version> 13 <scala.compat.version>2.11</scala.compat.version> 14 <akka.version>2.4.17</akka.version> 15 </properties> 16 <dependencies> 17 <!-- 添加scala的依賴 --> 18 <dependency> 19 <groupId>org.scala-lang</groupId> 20 <artifactId>scala-library</artifactId> 21 <version>${scala.version}</version> 22 </dependency> 23 <!-- 添加akka的actor依賴 --> 24 <dependency> 25 <groupId>com.typesafe.akka</groupId> 26 <artifactId>akka-actor_${scala.compat.version}</artifactId> 27 <version>${akka.version}</version> 28 </dependency> 29 <!-- 多進程之間的Actor通訊 --> 30 <dependency> 31 <groupId>com.typesafe.akka</groupId> 32 <artifactId>akka-remote_${scala.compat.version}</artifactId> 33 <version>${akka.version}</version> 34 </dependency> 35 </dependencies> 36 <!-- 指定插件--> 37 <build> 38 <!-- 指定源碼包和測試包的位置 --> 39 <sourceDirectory>src/main/scala</sourceDirectory> 40 <testSourceDirectory>src/test/scala</testSourceDirectory> 41 <plugins> 42 <!-- 指定編譯scala的插件 --> 43 <plugin> 44 <groupId>net.alchim31.maven</groupId> 45 <artifactId>scala-maven-plugin</artifactId> 46 <version>3.2.2</version> 47 <executions> 48 <execution> 49 <goals> 50 <goal>compile</goal> 51 <goal>testCompile</goal> 52 </goals> 53 <configuration> 54 <args> 55 <arg>-dependencyfile</arg> 56 <arg>${project.build.directory}/.scala_dependencies</arg> 57 </args> 58 </configuration> 59 </execution> 60 </executions> 61 </plugin> 62 <!-- maven打包的插件 --> 63 <plugin> 64 <groupId>org.apache.maven.plugins</groupId> 65 <artifactId>maven-shade-plugin</artifactId> 66 <version>2.4.3</version> 67 <executions> 68 <execution> 69 <phase>package</phase> 70 <goals> 71 <goal>shade</goal> 72 </goals> 73 <configuration> 74 <filters> 75 <filter> 76 <artifact>*:*</artifact> 77 <excludes> 78 <exclude>META-INF/*.SF</exclude> 79 <exclude>META-INF/*.DSA</exclude> 80 <exclude>META-INF/*.RSA</exclude> 81 </excludes> 82 </filter> 83 </filters> 84 <transformers> 85 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> 86 <resource>reference.conf</resource> 87 </transformer> 88 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkMaster --> 89 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 90 <mainClass>cn.org.yinzhengjie.spark.SparkMaster</mainClass> 91 </transformer> 92 </transformers> 93 </configuration> 94 </execution> 95 </executions> 96 </plugin> 97 </plugins> 98 </build> 99 </project>
第二步:點擊package開始打包:
第三步:查看依賴包內部結構:
2>.打包SparkWorker
第一步:修改Maven配置以下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>cn.org.yinzhengjie</groupId> 7 <artifactId>MyActor</artifactId> 8 <version>1.0-SNAPSHOT</version> 9 <!-- 定義一下常量 --> 10 <properties> 11 <encoding>UTF-8</encoding> 12 <scala.version>2.11.8</scala.version> 13 <scala.compat.version>2.11</scala.compat.version> 14 <akka.version>2.4.17</akka.version> 15 </properties> 16 <dependencies> 17 <!-- 添加scala的依賴 --> 18 <dependency> 19 <groupId>org.scala-lang</groupId> 20 <artifactId>scala-library</artifactId> 21 <version>${scala.version}</version> 22 </dependency> 23 <!-- 添加akka的actor依賴 --> 24 <dependency> 25 <groupId>com.typesafe.akka</groupId> 26 <artifactId>akka-actor_${scala.compat.version}</artifactId> 27 <version>${akka.version}</version> 28 </dependency> 29 <!-- 多進程之間的Actor通訊 --> 30 <dependency> 31 <groupId>com.typesafe.akka</groupId> 32 <artifactId>akka-remote_${scala.compat.version}</artifactId> 33 <version>${akka.version}</version> 34 </dependency> 35 </dependencies> 36 <!-- 指定插件--> 37 <build> 38 <!-- 指定源碼包和測試包的位置 --> 39 <sourceDirectory>src/main/scala</sourceDirectory> 40 <testSourceDirectory>src/test/scala</testSourceDirectory> 41 <plugins> 42 <!-- 指定編譯scala的插件 --> 43 <plugin> 44 <groupId>net.alchim31.maven</groupId> 45 <artifactId>scala-maven-plugin</artifactId> 46 <version>3.2.2</version> 47 <executions> 48 <execution> 49 <goals> 50 <goal>compile</goal> 51 <goal>testCompile</goal> 52 </goals> 53 <configuration> 54 <args> 55 <arg>-dependencyfile</arg> 56 <arg>${project.build.directory}/.scala_dependencies</arg> 57 </args> 58 </configuration> 59 </execution> 60 </executions> 61 </plugin> 62 <!-- maven打包的插件 --> 63 <plugin> 64 <groupId>org.apache.maven.plugins</groupId> 65 <artifactId>maven-shade-plugin</artifactId> 66 <version>2.4.3</version> 67 <executions> 68 <execution> 69 <phase>package</phase> 70 <goals> 71 <goal>shade</goal> 72 </goals> 73 <configuration> 74 <filters> 75 <filter> 76 <artifact>*:*</artifact> 77 <excludes> 78 <exclude>META-INF/*.SF</exclude> 79 <exclude>META-INF/*.DSA</exclude> 80 <exclude>META-INF/*.RSA</exclude> 81 </excludes> 82 </filter> 83 </filters> 84 <transformers> 85 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> 86 <resource>reference.conf</resource> 87 </transformer> 88 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkWorker --> 89 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 90 <mainClass>cn.org.yinzhengjie.spark.SparkWorker</mainClass> 91 </transformer> 92 </transformers> 93 </configuration> 94 </execution> 95 </executions> 96 </plugin> 97 </plugins> 98 </build> 99 </project>
接下來的兩步仍是和上面的步驟一直,將打包完成後的文件更名並查看主類信息以下:
3>.開啓三臺虛擬機並在master節點上傳master.jar並運行
[yinzhengjie@s101 download]$ ll total 67320 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124547 Jul 31 20:42 master.jar -rw-r--r--. 1 yinzhengjie yinzhengjie 28678231 Jul 20 21:18 scala-2.11.8.tgz -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 21:52 worker.jar [yinzhengjie@s101 download]$ [yinzhengjie@s101 download]$ [yinzhengjie@s101 download]$ java -jar master.jar 172.16.30.101 8888 master
4>.將worker.jar包上傳到另外的兩個節點並運行,以下:
172.16.30.102節點操做以下: [yinzhengjie@s102 download]$ ll total 19656 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:01 worker.jar [yinzhengjie@s102 download]$ [yinzhengjie@s102 download]$ java -jar worker.jar 172.16.30.102 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master [yinzhengjie@s102 download]$ 172.16.30.103節點操做以下: [yinzhengjie@s103 download]$ ll total 19656 -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:00 worker.jar [yinzhengjie@s103 download]$ [yinzhengjie@s103 download]$ [yinzhengjie@s103 download]$ java -jar worker.jar 172.16.30.103 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master
172.16.30.102節點操做以下:
172.16.30.103節點操做以下:
172.16.30.101輸出信息以下: