參考文章:在idea裏面怎麼遠程提交spark任務到yarn集羣html
spark任務運行的幾種模式:java
1,本地模式,在idea裏面寫完代碼直接運行.node
2,standalone模式,須要把程序打jar包,上傳到集羣,spark-submit提交到集羣運行web
3,yarn模式(local,client,cluster)跟上面的同樣,也須要打jar包,提交到集羣運行apache
若是是本身測試的話,用上面幾種方法都比較麻煩,每次改完代碼都須要打包上傳到集羣,而後spark-submit提交到集羣運行,也很是浪費時間,下面就介紹怎麼在本地idea遠程提交到yarn集羣bootstrap
直接看下面的demo(代碼寫的比較簡單)app
package spark import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf} import spark.wordcount.kafkaStreams object RemoteSubmitApp { def main(args: Array[String]) { // 設置提交任務的用戶 System.setProperty("HADOOP_USER_NAME", "root") val conf = new SparkConf() .setAppName("WordCount") // 設置yarn-client模式提交 .setMaster("yarn") // 設置resourcemanager的ip .set("yarn.resourcemanager.hostname","master") // 設置executor的個數 .set("spark.executor.instance","2") // 設置executor的內存大小 .set("spark.executor.memory", "1024M") // 設置提交任務的yarn隊列 .set("spark.yarn.queue","spark") // 設置driver的ip地址 .set("spark.driver.host","192.168.17.1") // 設置jar包的路徑,若是有其餘的依賴包,能夠在這裏添加,逗號隔開 .setJars(List("D:\\develop_soft\\idea_workspace_2018\\sparkdemo\\target\\sparkdemo-1.0-SNAPSHOT.jar" )) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val scc = new StreamingContext(conf, Seconds(1)) scc.sparkContext.setLogLevel("WARN") //scc.checkpoint("/spark/checkpoint") val topic = "jason_flink" val topicSet = Set(topic) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> "master:9092,storm1:9092,storm2:9092" , "group.id" -> "jason_" , "enable.auto.commit" -> (true: java.lang.Boolean) ) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) kafkaStreams.foreachRDD(rdd=> { if (!rdd.isEmpty()) { rdd.foreachPartition(fp=> { fp.foreach(f=> { println(f.value().toString) }) }) } }) scc.start() scc.awaitTermination() } }
而後咱們直接右鍵運行,看下打印的日誌ide
... 19/08/16 23:17:35 INFO Client:client token: N/Adiagnostics: AM container is launched, waiting for AM container to Register with RMApplicationMaster host: N/AApplicationMaster RPC port: -1queue: sparkstart time: 1565997454105final status: UNDEFINEDtracking URL: http://master:8088/proxy/application_1565990507758_0020/user: root19/08/16 23:17:36 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:37 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:38 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:39 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:40 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)19/08/16 23:17:40 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> master, PROXY_URI_BASES -> http://master:8088/proxy/application_1565990507758_0020), /proxy/application_1565990507758_002019/08/16 23:17:40 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter19/08/16 23:17:40 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:41 INFO Client: Application report for application_1565990507758_0020 (state: RUNNING)19/08/16 23:17:41 INFO Client:client token: N/Adiagnostics: N/AApplicationMaster host: 192.168.17.145ApplicationMaster RPC port: 0queue: sparkstart time: 1565997454105final status: UNDEFINED tracking URL: http://master:8088/proxy/application_1565990507758_0020/ user: root ...
能夠看到提交成功了,而後咱們打開yarn的監控頁面看下有沒有joboop
能夠看到有一個spark程序在運行,而後咱們點進去,看下具體的運行狀況測試
能夠看到運行的正常,選擇一下job,看下executor打印的日誌
這個就是咱們寫到kafka的數據,沒什麼問題,中止的時候,只須要在idea裏面點擊中止程序就能夠了,這樣測試起來就會方便不少.
運行過程當中可能會遇到的問題:
1,首先須要把yarn-site.xml,core-site.xml,hdfs-site.xml放到resource下面,由於程序運行的時候須要這些環境.
2,權限問題
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=JasonLee, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:342) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:251)at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1744)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1687)at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:2980)at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1096)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB. mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652)at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2. callBlockingMethod(ClientNamenodeProtocolProtos.java)at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:868)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:814)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2603)
這個是由於在本地提交的因此用戶名是JasonLee,它沒有訪問hdfs的權限,最簡單的解決方法就是在代碼裏面設置用戶是root
System.setProperty("HADOOP_USER_NAME", "root")
3,缺失環境變量
Exception in thread "main" java.lang.IllegalStateException: Library directory 'D:\develop_soft\idea_workspace_2018\sparkdemo\assembly\target\scala-2.11\jars' does not exist; make sure Spark is built.at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:347)at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:526)at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:814)at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:169)at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:839)at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)at spark.RemoteSubmitApp$.main(RemoteSubmitApp.scala:31)at spark.RemoteSubmitApp.main(RemoteSubmitApp.scala)
這個報錯是由於咱們沒有配置SPARK_HOME的環境變量,直接在idea裏面的configurations裏面的environment variables裏面設置SPARK_HOME就能夠了,以下圖所示:
4,沒有設置driver的ip
19/08/17 07:52:45 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Uncaught exception:org.apache.spark.SparkException: Failed to connect to driver!at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkDriver(ApplicationMaster.scala:577)at org.apache.spark.deploy.yarn.ApplicationMaster.runExecutorLauncher(ApplicationMaster.scala:433)at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:256)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:764)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:762)at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:785)at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
這個報錯是由於沒有設置driver host,由於咱們運行的是yarn-client模式,driver就是咱們的本機,因此要設置本地的ip,否則找不到driver.
.set("spark.driver.host","192.168.17.1")
5,還有一個就是須要保證本身的電腦和虛擬機在同一個網段內,並且要關閉本身電腦的防火牆,否則可能會出現鏈接不上的狀況.
我是以yarn-client模式提交的,yarn分了兩個隊列,提交的時候須要設置下隊列的名稱,
還有不少參數均可以在代碼裏面設置,好比executor的內存,個數,
driver的內存等,你們能夠根據本身的狀況去設置,固然了這個也能夠提交到standalone集羣