Spark 學習(九) SparkSQL 函數自定義和數據源

一,簡介mysql

二,SparkSQL 的函數自定義sql

  2.1 函數定義apache

  2.2 函數註冊json

  2.3 示例app

三,spark的數據源讀取函數

  3.1 JSONui

  3.2 JDBCurl

  3.3 ParQuetspa

  3.4 CSVcode

 

 

 

正文

一,簡介

  不少時候sql中的內置函數沒法知足咱們的平常開發需求,這就須要咱們進行函數的自定義。同時Spark的數據源來源普遍,如JSON,MYSQL等均可以做爲咱們的數據源。

二,SparkSQL 的函數自定義

  2.1 定義函數

val fun1 = (arg: String) => {
    arg + "aaa"
}

  2.2 註冊函數

spark.udf.register("addString", fun1)

  2.3 示例

package cn.edu360.spark07

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.types._

object AutoFun {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession.builder()
            .appName("sparkDateSet1")
            .master("local[2]")
            .getOrCreate()
        val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/")
        import spark.implicits._
        val words: Dataset[String] = lines.flatMap(_.split(" "))
        // 註冊視圖操做SQL形式
        words.createTempView("v_wc")
        // 定義函數
        val fun1 = (arg: String) => {
            arg + "aaa"
        }
        // 對函數進行註冊
        spark.udf.register("addString", fun1)
        val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value")
        result.show()
        spark.stop()
    }
}

三,spark的數據源讀取

  3.1 JSON

object JsonDataSource {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("JdbcDataSource")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    //指定之後讀取json類型的數據(有表頭)
    val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json")
    val filtered: DataFrame = jsons.where($"age" <=500)
    filtered.printSchema()
    filtered.show()
    spark.stop()
    
  }
}

  3.2 JDBC

package cn.edu360.spark07

import org.apache.spark.sql.{DataFrame, SparkSession}

object JDBCSource {
    def main(args: Array[String]): Unit = {

        val spark = SparkSession.builder().appName("JdbcDataSource")
            .master("local[*]")
            .getOrCreate()
        import spark.implicits._
        val log: DataFrame = spark.read.format("jdbc").options(
            Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true",
                "driver" -> "com.mysql.jdbc.Driver",
                "dbtable" -> "log",
                "user" -> "root",
                "password" -> "qwe123"
            )
        ).load()
        val result: DataFrame = log.select($"id", $"name", $"age")
        result.show()
        spark.stop()
    }
}

  3.3  ParQuet

package cn.edu360.day7

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Created by zx on 2017/9/18.
  */
object ParquetDataSource {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("ParquetDataSource")
      .master("local[*]")
      .getOrCreate()
    //指定之後讀取json類型的數據
    val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet")
    //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq")
    parquetLine.printSchema()
    //show是Action
    parquetLine.show()
    spark.stop()
  }
}

  3.4 CSV

package cn.edu360.day7

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Created by zx on 2017/9/18.
  */
object CsvDataSource {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("CsvDataSource")
      .master("local[*]")
      .getOrCreate()
    //指定之後讀取json類型的數據
    val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv")
    csv.printSchema()
    val pdf: DataFrame = csv.toDF("id", "name", "age")
    pdf.show()
    spark.stop()
    
  }
}
相關文章
相關標籤/搜索