咱們已經學習了Hive,它是將Hive SQL轉換成MapReduce而後提交到集羣上執行,大大簡化了編寫MapReduce的程序的複雜性,因爲MapReduce這種計算模型執行效率比較慢。全部Spark SQL的應運而生,它是將Spark SQL轉換成RDD,而後提交到集羣執行,執行效率很是快!java
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency>
object Ops3 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man"))) val df1: DataFrame = sqlContext.createDataFrame(rdd1) df1.show(1) } } case class Person(name: String, age: Int, sex: String);
import org.apache.spark val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man"))) import sqlContext.implicits._ val df1: DataFrame = rdd1.toDF df1.show()
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowRDD: RDD[Row] = linesRDD.map(line => { val lineSplit: Array[String] = line.split(",") Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2)) }) val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema) rowDF.show()
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("p1") val df2 = sparkSession.sql("select * from p1") df2.show()
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API至關於Spark1.x的SQLContext val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema) df.createOrReplaceTempView("p1")//這是Sprk2.x新的API 至關於Spark1.x的registTempTable() val df2 = sparkSession.sql("select * from p1") df2.show()
DSL(domain specific language ) 特定領域語言python
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) df.show()
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) df.write.json("hdfs://uplooking02:8020/sparktest1")
val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]") val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate() val sc = sparkSession.sparkContext val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest") //數據清洗 val rowRDD: RDD[Row] = linesRDD.map(line => { val splits: Array[String] = line.split(",") Row(splits(0), splits(1).toInt, splits(2)) }) val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType))) val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema) import sparkSession.implicits._ val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc) val url = "jdbc:mysql://localhost:3306/test" //表會自動建立 val tbName = "person1"; val prop = new Properties() prop.put("user", "root") prop.put("password", "root") //SaveMode 默認爲ErrorIfExists df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)
xxx.logmysql
SpringBoot+Echarssql