windows下Idea結合maven開發spark和本地調試

本人的開發環境:
1.虛擬機centos 6.5
2.jdk 1.8
3.spark2.2.0
4.scala 2.11.8
5.maven 3.5.2
    在開發和搭環境時必須注意版本兼容的問題,否則會出現不少莫名其妙的問題
 
1.啓動master進程
./sbin/start-master.sh
 
2.啓動worker進程
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://hadoop000:7077
【注意,spark://hadoop000:7077,是在啓動master進程後,經過localhost:8080登錄到spark WebUI上查看的。】
 
第一第二點是運行環境的前提條件,下面是開發環境。
 
1.idea結合maven開發spark,下面以NetWorldCount爲例子
package com.spark
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/**
  * Spark Streaming處理Socket數據
  * 測試: nc
  */
object NetworkWordCount {
 
  def main(args: Array[String]): Unit = {
 
    val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")
 
    /**
      * 建立StreamingContext須要兩個參數:SparkConf和batch interval
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    val lines = ssc.socketTextStream("localhost", 6789)
 
    val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
 
    result.print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}

 

2.生成jar包apache

 

 

 3.上傳jar包windows

 

 

4.提交任務前先啓動監聽端口,在終端輸入如下命令
nc -lk 6789
 
 
5.提交任務
./spark-submit  --master local[2] --class com.spark.NetworkWordCount --name NetworkWordCount  /home/hadoop/tmp/spark.jar

 

運行程序,出現下面的錯誤:centos

a.local這裏出錯。緣由簡單來講,local模式下只開啓一條線程,reciver佔用一條線程後,沒有資源用來計算處理數據了。
解決辦法:local--->local[2]
 
b.缺乏com.fasterxml.jackson.scala這個方法
解決辦法:
1.查看這個類的版本:view--->maven project---> ---> .而後在pom.xml增長對應的dependency
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-scala_2.11</artifactId>
  <version>2.6.5</version>
</dependency>
從新reimport,再次運行。出現如下錯誤

 

 

 去maven reposition查找對應的依賴: socket

 

 

在這裏,使用1.3.0版本的。 maven

 

在pom.xml添加如下的 dependency
<dependency>
  <groupId>net.jpountz.lz4</groupId>
  <artifactId>lz4</artifactId>
  <version>1.3.0</version>
</dependency>
 
從新reimport,再次運行。此次程序正常運行。
 
輸入數據:

 

接受數據:ide

 

至此,windows下,idea結合maven開發spark+調試過程 完整跑了一遍。
下面分析 
 val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") 

 在本地調試中,輸入源除了 fileStream外,必須local[n], n >= 2 。
 在spark中,輸入源除了  fileStream ,其餘的都繼承自 ReceiverInputDStream ,所以其餘都須要至少兩條線程(針對local模式)以上來供程序使用。
def fileStream[
  K: ClassTag,
  V: ClassTag,
  F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
  new FileInputDStream[K, V, F](this, directory)
}
 
例如本例子中使用的  socketTextStream
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
相關文章
相關標籤/搜索