Spark 2.4.0編程指南--spark dataSet action

Spark 2.4.0編程指南--spark dataSet action

更多資源

視頻

<iframe width="800" height="500" src="//player.bilibili.com/player.html?aid=38193405&cid=67137841&page=3" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true"> </iframe>html

文檔

前置條件

  • 已安裝好java(選用的是java 1.8.0_191)
  • 已安裝好scala(選用的是scala 2.11.121)
  • 已安裝好hadoop(選用的是Hadoop 3.1.1)
  • 已安裝好spark(選用的是spark 2.4.0)

技能標籤

  • Spark session 建立
  • 在Spark 2.0以後,RDD被數據集(Dataset)取代 ,保留RDD舊api
  • 數據集數據集介紹
  • 讀取本地文件(txt,json),HDFS文件
  • 對txt格式文件數據遍歷(行數據轉成對象)
  • 對json格式文件數據遍歷(直接轉對象)
  • 數據集的action操做
  • collect,collectAsList,count,describe,first,foreach,head,reduce,show,take,takeAsList,toLocalIterator
  • 官網: http://spark.apache.org/docs/2.4.0/sql-getting-started.html

DataSet(數據集)

數據集是分佈式數據集合。數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優點(強類型,使用強大的lambda函數的能力)以及Spark SQL優化執行引擎的優勢。數據集能夠從JVM對象構造,而後使用功能轉換(map,flatMap,filter等)進行操做。數據集API在Scala和Java中可用。 Python沒有對Dataset API的支持。但因爲Python的動態特性,數據集API的許多好處已經可用(即您能夠經過名稱天然地訪問行的字段row.columnName)。 R的狀況相似。

BaseSparkSession

  • 公用獲得SparkSession的方法
def sparkSession(isLocal:Boolean = false): SparkSession = {

    if(isLocal){
      master = "local"
      val spark = SparkSession.builder
        .master(master)
        .appName(appName)
        .getOrCreate()
      //spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
      //import spark.implicits._
      spark
    }else{
      val spark = SparkSession.builder
        .master(master)
        .appName(appName)
        .config("spark.eventLog.enabled","true")
        .config("spark.history.fs.logDirectory","hdfs://standalone.com:9000/spark/log/historyEventLog")
        .config("spark.eventLog.dir","hdfs://standalone.com:9000/spark/log/historyEventLog")
        .getOrCreate()
     // spark.sparkContext.addJar("/opt/n_001_workspaces/bigdata/spark-scala-maven-2.4.0/target/spark-scala-maven-2.4.0-1.0-SNAPSHOT.jar")
      //import spark.implicits._
      spark
    }

  }

textFile

  • 讀取本地文件
val spark = sparkSession(true)
    //返回dataFrame
    val df = spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
    df.show()

//    +-----------+
//    |      value|
//    +-----------+
//    |Michael, 29|
//    |   Andy, 30|
//    | Justin, 19|
//    |  Think, 30|
//    +-----------+

textFile

  • 讀取HDFS文件
val spark = sparkSession(true)
    //返回dataFrame
    val df = spark.read.textFile("hdfs://standalone.com:9000/home/liuwen/data/people.txt")
    df.show()


//    +-----------+
//    |      value|
//    +-----------+
//    |Michael, 29|
//    |   Andy, 30|
//    | Justin, 19|
//    |  Think, 30|
//    +-----------+

    spark.stop()

text

  • 讀取本地文件
val spark = sparkSession(true)
    //返回dataFrame
    val df = spark.read.text("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
    df.show()

//    +-----------+
//    |      value|
//    +-----------+
//    |Michael, 29|
//    |   Andy, 30|
//    | Justin, 19|
//    |  Think, 30|
//    +-----------+

text

  • 讀取HDFS數據
object Run extends BaseSparkSession{

  def main(args: Array[String]): Unit = {

    val spark = sparkSession(true)
    //返回dataFrame
    val df = spark.read.text("hdfs://standalone.com:9000/home/liuwen/data/people.txt")
    df.show()

//    +-----------+
//    |      value|
//    +-----------+
//    |Michael, 29|
//    |   Andy, 30|
//    | Justin, 19|
//    |  Think, 30|
//    +-----------+

    spark.stop()
  }

}

foreach 遍歷文件內容

  • 對象遍歷
object Run1 extends BaseSparkSession{

  case class Person(name: String, age: Long)


  def main(args: Array[String]): Unit = {

    val spark = sparkSession(true)

    import spark.implicits._
    spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
      .map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong))
        .foreach( person => println(s"name:${person.name}\t age:${person.age}"))

    spark.stop()

  }
}

first

  • 獲得dataSet的第一個元素
val spark = sparkSession()
    val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")

    println(dataSet.first()) //first裏邊調用的是head()
    spark.stop()

head

  • 獲得dataSet的第一個元素
val spark = sparkSession()
    val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
    println(dataSet.head()) //first裏邊調用的是head()

head n

  • 獲得dataSet的前n個元素
val spark = sparkSession()
    val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
    println(dataSet.head(5)) //first裏邊調用的是head()

count

  • 獲得dataSet 一共有多少行數據
val spark = sparkSession()
    val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
    println(dataSet.count())

collect

  • 收集dataSet中全部行的數據,在本地輸出
val spark = sparkSession()
    val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
    println(dataSet.collect().mkString("\n"))

collectAsList

  • 收集dataSet中全部的數據,轉成java.util.List對象
val spark = sparkSession(true)

    val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
    println( dataSet.collectAsList())
    import scala.collection.JavaConversions._
    for( v <- dataSet.collectAsList()) println(v)
    spark.stop()

foreache

  • 遍歷dataSet中的每一行數據
val spark = sparkSession(true)
    val dataSet = spark.read.textFile("/home/liuwen/data/a.txt")
    dataSet.foreach(println(_))

foreache class

  • 以對象形式遍歷dataSet中全部的數據
object Run1 extends BaseSparkSession{

  case class Person(name: String, age: Long)


  def main(args: Array[String]): Unit = {

    val spark = sparkSession(true)

    import spark.implicits._
    spark.read.textFile("file:///"+ getProjectPath +"/src/main/resource/data/text/people.txt")
      .map(line => Person(line.split(",")(0),line.split(" ")(1).trim.toLong))
        .foreach( person => println(s"name:${person.name}\t age:${person.age}"))

    spark.stop()


  }
}

map

  • 遍歷數據集中的每個元素,進行map函數操做
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/a.text")
    import spark.implicits._
    val lineWordLength = dataSet.map( line => line.split(" ").size)

    println(lineWordLength.collect().mkString("\n"))

reduce

  • 遍歷dataSet中的元素,每兩兩進行reduce函數操做
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/word.text")

    /**
      * 統計全部行單詞個數
      */
    import spark.implicits._
    val lineWordLength = dataSet.map( line => line.split(" ").size)
    val result = lineWordLength.reduce((a,b) => a + b)

    println(result)

show

  • 以表格形式顯示dataSet數據,默認顯示前20行數據
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text")

 
    val result = dataSet.show()
    println(result)

show n

  • 以表格形式顯示dataSet數據,默認顯示前20行數據
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/word.big.cn.text")

    /**
      * 以表格的形式顯示前3行數據
      * numRows是顯示前幾行的數據
      */

    val result = dataSet.show(3)
    println(result)

show truncate

  • 以表格形式顯示dataSet數據,默認顯示前20行數據
  • 參數truncate=false,是不截斷顯示全部數據,true是進截斷
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/word.big.text")

    /**
      * 以表格的形式顯示前3行數據
      * numRows是顯示前幾行的數據
      * false 不進行返回行數據截斷
      */

    val result = dataSet.show(10,false)
    println(result)

take

  • take 是至關於head
val spark = sparkSession()

    val dataSet = spark.read.textFile("/home/liuwen/data/word.big.txt")
    val result = dataSet.take(10) //等於head(n)
    println(result.mkString("\n"))

describe

val spark = sparkSession()

    val dataSet = spark.read.json("hdfs://standalone.com:9000/home/liuwen/data/json/people.json")

    dataSet.describe("name","age").show()

//    +-------+-------+------------------+
//    |summary|   name|               age|
//    +-------+-------+------------------+
//    |  count|      3|                 2|
//    |   mean|   null|              24.5|
//    | stddev|   null|7.7781745930520225|
//    |    min|   Andy|                19|
//    |    max|Michael|                30|
//    +-------+-------+------------------+

endjava

相關文章
相關標籤/搜索