1、mysql做爲數據源mysql
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * mysql做爲數據源 * * schema信息 * root * |-- uid: integer (nullable = false) * |-- xueyuan: string (nullable = true) * |-- number_one: string (nullable = true) */ object JdbcSource { def main(args: Array[String]): Unit = { //1.sparkSQL 建立sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource") .master("local[2]").getOrCreate() //2.加載數據源 val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map( "url" -> "jdbc:mysql://localhost:3306/urlcount", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "url_data", "user" -> "root", "password" -> "root" )).load() //測試 //urlData.printSchema() //urlData.show() //3.過濾數據 val fData: Dataset[Row] = urlData.filter(x => { //uid>2 如何拿到uid? x.getAs[Int](0) > 2 }) fData.show() sparkSession.stop() } }
mysql數據:sql
2、Spark寫出數據格式apache
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object JdbcSource1 { def main(args: Array[String]): Unit = { //1.sparkSQL 建立sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("JdbcSource") .master("local[2]").getOrCreate() import sparkSession.implicits._ //2.加載數據源 val urlData: DataFrame = sparkSession.read.format("jdbc").options(Map( "url" -> "jdbc:mysql://localhost:3306/urlcount", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "url_data", "user" -> "root", "password" -> "root" )).load() //3.uid>2 val r = urlData.filter($"uid" > 2) val rs: DataFrame = r.select($"xueyuan", $"number_one") //val rs: DataFrame = r.select($"xueyuan") //寫入以text格式 //rs.write.text("e:/saveText") //寫入以json格式 //rs.write.json("e:/saveJson") //寫入以csv格式 rs.write.csv("e:/saveCsv") //rs.write.parquet("e:/savePar") rs.show() sparkSession.stop() } }
3、Json做爲數據源json
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object JsonSource { def main(args: Array[String]): Unit = { //1.建立sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("JsonSource") .master("local[2]").getOrCreate() import sparkSession.implicits._ //2.讀取json數據源 val jread: DataFrame = sparkSession.read.json("e:/saveJson") //3.處理數據 val fread: Dataset[Row] = jread.filter($"xueyuan" === "bigdata") //4.觸發action fread.show() //5.關閉資源 sparkSession.stop() } }
4、Csv做爲數據源app
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object CsvSource { def main(args: Array[String]): Unit = { //1.建立sparkSession val sparkSession: SparkSession = SparkSession.builder().appName("CsvSource") .master("local[2]").getOrCreate() import sparkSession.implicits._ //2.讀取csv數據源 val cread: DataFrame = sparkSession.read.csv("e:/saveCsv") //3.處理數據 val rdf = cread.toDF("id", "xueyuan") val rs = rdf.filter($"id" <= 3) //4.觸發action rs.show() //5.關閉資源 sparkSession.stop() } }