遠程提交Spark任務到yarn集羣

參考文章:在idea裏面怎麼遠程提交spark任務到yarn集羣html

spark任務運行的幾種模式:java

1,本地模式,在idea裏面寫完代碼直接運行.node

2,standalone模式,須要把程序打jar包,上傳到集羣,spark-submit提交到集羣運行web

3,yarn模式(local,client,cluster)跟上面的同樣,也須要打jar包,提交到集羣運行apache

若是是本身測試的話,用上面幾種方法都比較麻煩,每次改完代碼都須要打包上傳到集羣,而後spark-submit提交到集羣運行,也很是浪費時間,下面就介紹怎麼在本地idea遠程提交到yarn集羣bootstrap

直接看下面的demo(代碼寫的比較簡單)app

package spark

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf}
import spark.wordcount.kafkaStreams

object RemoteSubmitApp {
  def main(args: Array[String]) {
    // 設置提交任務的用戶
    System.setProperty("HADOOP_USER_NAME", "root")
    val conf = new SparkConf()
      .setAppName("WordCount")
      // 設置yarn-client模式提交
      .setMaster("yarn")
      // 設置resourcemanager的ip
      .set("yarn.resourcemanager.hostname","master")
      // 設置executor的個數
      .set("spark.executor.instance","2")
      // 設置executor的內存大小
      .set("spark.executor.memory", "1024M")
      // 設置提交任務的yarn隊列
      .set("spark.yarn.queue","spark")
      // 設置driver的ip地址
      .set("spark.driver.host","192.168.17.1")
      // 設置jar包的路徑,若是有其餘的依賴包,能夠在這裏添加,逗號隔開
      .setJars(List("D:\\develop_soft\\idea_workspace_2018\\sparkdemo\\target\\sparkdemo-1.0-SNAPSHOT.jar"
    ))
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val scc = new StreamingContext(conf, Seconds(1))
    scc.sparkContext.setLogLevel("WARN")
    //scc.checkpoint("/spark/checkpoint")
    val topic = "jason_flink"
    val topicSet = Set(topic)
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest",
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> "master:9092,storm1:9092,storm2:9092"
      , "group.id" -> "jason_"
      , "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    kafkaStreams = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams))
    kafkaStreams.foreachRDD(rdd=> {
      if (!rdd.isEmpty()) {
        rdd.foreachPartition(fp=> {
          fp.foreach(f=> {
            println(f.value().toString)
          })
        })
      }
    })
    scc.start()
    scc.awaitTermination()
  }
}

而後咱們直接右鍵運行,看下打印的日誌ide

...
19/08/16 23:17:35 INFO Client:client token: N/Adiagnostics: AM container is launched, waiting for AM container to Register with RMApplicationMaster host: N/AApplicationMaster RPC port: -1queue: sparkstart time: 1565997454105final status: UNDEFINEDtracking URL: http://master:8088/proxy/application_1565990507758_0020/user: root19/08/16 23:17:36 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:37 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:38 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:39 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:40 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)19/08/16 23:17:40 INFO YarnClientSchedulerBackend: Add WebUI Filter. 
org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> master, 
PROXY_URI_BASES -> http://master:8088/proxy/application_1565990507758_0020), 
/proxy/application_1565990507758_002019/08/16 23:17:40 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter19/08/16 23:17:40 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:41 INFO Client: Application report for application_1565990507758_0020 (state: RUNNING)19/08/16 23:17:41 INFO Client:client token: N/Adiagnostics: N/AApplicationMaster host: 192.168.17.145ApplicationMaster RPC port: 0queue: sparkstart time: 1565997454105final status: UNDEFINED   
   tracking URL: http://master:8088/proxy/application_1565990507758_0020/
   user: root
 ...

能夠看到提交成功了,而後咱們打開yarn的監控頁面看下有沒有joboop

 

能夠看到有一個spark程序在運行,而後咱們點進去,看下具體的運行狀況測試

 

能夠看到運行的正常,選擇一下job,看下executor打印的日誌

這個就是咱們寫到kafka的數據,沒什麼問題,中止的時候,只須要在idea裏面點擊中止程序就能夠了,這樣測試起來就會方便不少.

運行過程當中可能會遇到的問題:

1,首先須要把yarn-site.xml,core-site.xml,hdfs-site.xml放到resource下面,由於程序運行的時候須要這些環境.

2,權限問題

Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):

 Permission denied: user=JasonLee, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x  
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:342)  
  at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:251)at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1744)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1687)at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:2980)at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1096)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.
   mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652)at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.
   callBlockingMethod(ClientNamenodeProtocolProtos.java)at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:868)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:814)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2603)

這個是由於在本地提交的因此用戶名是JasonLee,它沒有訪問hdfs的權限,最簡單的解決方法就是在代碼裏面設置用戶是root

System.setProperty("HADOOP_USER_NAME", "root")

3,缺失環境變量

Exception in thread "main" java.lang.IllegalStateException: 
  Library directory 'D:\develop_soft\idea_workspace_2018\sparkdemo\assembly\target\scala-2.11\jars' 
  does not exist; make sure Spark is built.at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:347)at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:526)at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:814)at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:169)at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:839)at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)at spark.RemoteSubmitApp$.main(RemoteSubmitApp.scala:31)at spark.RemoteSubmitApp.main(RemoteSubmitApp.scala)

這個報錯是由於咱們沒有配置SPARK_HOME的環境變量,直接在idea裏面的configurations裏面的environment variables裏面設置SPARK_HOME就能夠了,以下圖所示:

4,沒有設置driver的ip

19/08/17 07:52:45 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Uncaught exception:org.apache.spark.SparkException: Failed to connect to driver!at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkDriver(ApplicationMaster.scala:577)at org.apache.spark.deploy.yarn.ApplicationMaster.runExecutorLauncher(ApplicationMaster.scala:433)at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:256)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:764)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:762)at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:785)at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)

這個報錯是由於沒有設置driver host,由於咱們運行的是yarn-client模式,driver就是咱們的本機,因此要設置本地的ip,否則找不到driver.

.set("spark.driver.host","192.168.17.1")

5,還有一個就是須要保證本身的電腦和虛擬機在同一個網段內,並且要關閉本身電腦的防火牆,否則可能會出現鏈接不上的狀況.

我是以yarn-client模式提交的,yarn分了兩個隊列,提交的時候須要設置下隊列的名稱,

還有不少參數均可以在代碼裏面設置,好比executor的內存,個數,

driver的內存等,你們能夠根據本身的狀況去設置,固然了這個也能夠提交到standalone集羣

相關文章
相關標籤/搜索