Spark 數據源

1、mysql做爲數據源mysql

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  * mysql做爲數據源
  *
  * schema信息
  * root
  * |-- uid: integer (nullable = false)
  * |-- xueyuan: string (nullable = true)
  * |-- number_one: string (nullable = true)
  */
object JdbcSource {
  def main(args: Array[String]): Unit = {
    //1.sparkSQL 建立sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource")
      .master("local[2]").getOrCreate()

    //2.加載數據源
    val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map(
      "url" -> "jdbc:mysql://localhost:3306/urlcount",
      "driver" -> "com.mysql.jdbc.Driver",
      "dbtable" -> "url_data",
      "user" -> "root",
      "password" -> "root"
    )).load()

    //測試
    //urlData.printSchema()
    //urlData.show()

    //3.過濾數據
    val fData: Dataset[Row] = urlData.filter(x => {
      //uid>2 如何拿到uid?
      x.getAs[Int](0) > 2
    })

    fData.show()
    sparkSession.stop()
  }
}

mysql數據:sql

2、Spark寫出數據格式apache

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object JdbcSource1 {
  def main(args: Array[String]): Unit = {
    //1.sparkSQL 建立sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._
    //2.加載數據源
    val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map(
      "url" -> "jdbc:mysql://localhost:3306/urlcount",
      "driver" -> "com.mysql.jdbc.Driver",
      "dbtable" -> "url_data",
      "user" -> "root",
      "password" -> "root"
    )).load()

    //3.uid>2
    val r = urlData.filter($"uid" > 2)
    val rs: DataFrame = r.select($"xueyuan", $"number_one")

    //val rs: DataFrame = r.select($"xueyuan")

    //寫入以text格式
    //rs.write.text("e:/saveText")

    //寫入以json格式
    //rs.write.json("e:/saveJson")

    //寫入以csv格式
    rs.write.csv("e:/saveCsv")

    //rs.write.parquet("e:/savePar")

    rs.show()
    sparkSession.stop()
  }
}

3、Json做爲數據源json

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object JsonSource {
  def main(args: Array[String]): Unit = {
    //1.建立sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("JsonSource")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._
    //2.讀取json數據源
    val jread: DataFrame = sparkSession.read.json("e:/saveJson")

    //3.處理數據
    val fread: Dataset[Row] = jread.filter($"xueyuan" === "bigdata")

    //4.觸發action
    fread.show()

    //5.關閉資源
    sparkSession.stop()
  }
}

4、Csv做爲數據源app

import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object CsvSource {
  def main(args: Array[String]): Unit = {
    //1.建立sparkSession
    val sparkSession: SparkSession = SparkSession.builder().appName("CsvSource")
      .master("local[2]").getOrCreate()

    import sparkSession.implicits._
    //2.讀取csv數據源
    val cread: DataFrame = sparkSession.read.csv("e:/saveCsv")

    //3.處理數據
    val rdf = cread.toDF("id", "xueyuan")
    val rs = rdf.filter($"id" <= 3)

    //4.觸發action
    rs.show()

    //5.關閉資源
    sparkSession.stop()
  }
}
相關文章
相關標籤/搜索