一,簡介mysql
2.1 函數定義apache
2.2 函數註冊json
2.3 示例app
3.1 JSONui
3.2 JDBCurl
3.3 ParQuetspa
3.4 CSVcode
不少時候sql中的內置函數沒法知足咱們的平常開發需求,這就須要咱們進行函數的自定義。同時Spark的數據源來源普遍,如JSON,MYSQL等均可以做爲咱們的數據源。
val fun1 = (arg: String) => { arg + "aaa" }
spark.udf.register("addString", fun1)
package cn.edu360.spark07 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.types._ object AutoFun { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession.builder() .appName("sparkDateSet1") .master("local[2]") .getOrCreate() val lines: Dataset[String] = spark.read.textFile("hdfs://hd1:9000/wordcount/input/") import spark.implicits._ val words: Dataset[String] = lines.flatMap(_.split(" ")) // 註冊視圖操做SQL形式 words.createTempView("v_wc") // 定義函數 val fun1 = (arg: String) => { arg + "aaa" } // 對函數進行註冊 spark.udf.register("addString", fun1) val result: DataFrame = spark.sql("select addString(value), count(*) from v_wc group by value") result.show() spark.stop() } }
object JsonDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ //指定之後讀取json類型的數據(有表頭) val jsons: DataFrame = spark.read.json("/Users/zx/Desktop/json") val filtered: DataFrame = jsons.where($"age" <=500) filtered.printSchema() filtered.show() spark.stop() } }
package cn.edu360.spark07 import org.apache.spark.sql.{DataFrame, SparkSession} object JDBCSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("JdbcDataSource") .master("local[*]") .getOrCreate() import spark.implicits._ val log: DataFrame = spark.read.format("jdbc").options( Map("url" -> "jdbc:mysql://localhost:3306/test?useSSL=true", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "log", "user" -> "root", "password" -> "qwe123" ) ).load() val result: DataFrame = log.select($"id", $"name", $"age") result.show() spark.stop() } }
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object ParquetDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("ParquetDataSource") .master("local[*]") .getOrCreate() //指定之後讀取json類型的數據 val parquetLine: DataFrame = spark.read.parquet("/Users/zx/Desktop/parquet") //val parquetLine: DataFrame = spark.read.format("parquet").load("/Users/zx/Desktop/pq") parquetLine.printSchema() //show是Action parquetLine.show() spark.stop() } }
package cn.edu360.day7 import org.apache.spark.sql.{DataFrame, SparkSession} /** * Created by zx on 2017/9/18. */ object CsvDataSource { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("CsvDataSource") .master("local[*]") .getOrCreate() //指定之後讀取json類型的數據 val csv: DataFrame = spark.read.csv("/Users/zx/Desktop/csv") csv.printSchema() val pdf: DataFrame = csv.toDF("id", "name", "age") pdf.show() spark.stop() } }