一:Spark SQL的特色
一、支持多種數據源:Hive、RDD、Parquet、JSON、JDBC等。
二、多種性能優化技術:in-memory columnar storage、byte-code generation、cost model動態評估等。
三、組件擴展性:對於SQL的語法解析器、分析器以及優化器,用戶均可以本身從新開發,而且動態擴展。html
二:Spark SQL的性能優化技術簡介
一、內存列存儲(in-memory columnar storage)
內存列存儲意味着,Spark SQL的數據,不是使用Java對象的方式來進行存儲,而是使用面向列的內存存儲的方式來進行存儲。也就是說,每一列,做爲一個數據存儲的單位。從而大大優化了內存使用的效率。採用了內存列存儲以後,減小了對內存的消耗,也就避免了gc大量數據的性能開銷。
二、字節碼生成技術(byte-code generation)
Spark SQL在其catalyst模塊的expressions中增長了codegen模塊,對於SQL語句中的計算表達式,好比select num + num from t這種的sql,就可使用動態字節碼生成技術來優化其性能。
三、Scala代碼編寫的優化
對於Scala代碼編寫中,可能會形成較大性能開銷的地方,本身重寫,使用更加複雜的方式,來獲取更好的性能。好比Option樣例類、for循環、map/filter/foreach等高階函數,以及不可變對象,都改爲了用null、while循環等來實現,而且重用可變的對象。java
三:spark Sql的通常用法mysql
可參考官方文檔:https://spark.apache.org/docs/latest/sql-programming-guide.htmlgit
1- 建立DataFramegithub
JavaSparkContext sc = ...; SQLContext sqlContext = new SQLContext(sc); DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json"); df.show();
2- DataFrame經常使用用法redis
load:主要用於加載數據,建立出DataFrame; DataFrame df = sqlContext.read().load("users.parquet"); save: 主要用於將DataFrame中的數據保存到文件中。 df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); SaveMode:ErrorIfExists--拋異常,Append--追加,Overwrite--重寫,Ignore--忽略,不作操做 format:手動指定數據類型:parquet也是一種數據類型 DataFrame df = sqlContext.read().format("json").load("people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); show: 顯示數據 collect: 獲取全部數據到數組 collectAsList:獲取全部數據到List describe(cols: String*):獲取指定字段的統計信息,好比count, mean, stddev, min, max等 first, head, take, takeAsList:獲取若干行記錄 where篩選條件 filter:根據字段進行篩選 select:獲取指定字段值 selectExpr:能夠對指定字段進行特殊處理 col/apply:獲取指定字段 drop:去除指定字段,保留其餘字段 limit:限制行數 orderBy和sort:排序 group by數據分組 distinct數據去重 dropDuplicates:根據指定字段去重 agg方法實現聚合操做 withColumn添加新的一列 join連接 union intersect方法能夠計算出兩個DataFrame中相同的記錄, except獲取一個DataFrame中有另外一個DataFrame中沒有的記錄 withColumnRenamed:重命名DataFrame中的指定字段名 explode根據某個字段內容進行分割,而後生成多行,這時可使用explode方法
示例:sql
DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json"); df.show(); //打印全部數據 df.printSchema(); //打印元數據 df.select("name").show(); //查詢某列的數據 df.select(df.col("name"), df.col("age").plus(1)).show(); //查詢某列的數據,並對其進行計算 df.filter(df.col("age").gt(21)).show(); //對某列的值進行過濾 df.groupBy("age").count().show(); //排序
3- spark Sql全部的內置函數express
種類 | 函數 |
聚合函數 | approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct |
集合函數 | array_contains, explode, size, sort_array |
日期/時間函數 | 日期時間轉換 日期/時間計算 |
數學函數 | abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex |
混合函數 | array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when |
字符串函數 | ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper |
窗口函數 | cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber |
4- Spark SQL支持兩種方式來將RDD轉換爲DataFrame。
第一種方式,是使用反射來推斷包含了特定數據類型的RDD的元數據。這種基於反射的方式,代碼比較簡潔,當你已經知道你的RDD的元數據時,是一種很是不錯的方式。
JavaRDD<String> lines = sc.textFile("C:\\Users\\zhang\\Desktop\\students.txt");
JavaRDD<Student> students = lines.map(new Function<String, Student>() {
private static final long serialVersionUID = 1L;
@Override
public Student call(String line) throws Exception {
String[] lineSplited = line.split(",");
Student stu = new Student();
stu.setId(Integer.valueOf(lineSplited[0].trim()));
stu.setName(lineSplited[1]);
stu.setAge(Integer.valueOf(lineSplited[2].trim()));
return stu;
}
});
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //Stundent類必須實現接口 Serializable
第二種方式,是經過編程接口來建立DataFrame,你能夠在程序運行時動態構建一份元數據,而後將其應用到已經存在的RDD上。這種方式的代碼比較冗長,可是若是在編寫程序時,還不知道RDD的元數據,只有在程序運行時,才能動態得知其元數據,那麼只能經過這種動態構建元數據的方式。
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;
@Override
public Row call(String line) throws Exception {
String[] lineSplited = line.split(",");
return RowFactory.create(
Integer.valueOf(lineSplited[0]),
lineSplited[1],
Integer.valueOf(lineSplited[2]));
}
});
List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(structFields); // 第三步,使用動態構造的元數據,將RDD轉換爲DataFrame DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);// 第一步,建立一個普通的RDDJavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");// 第二步,動態構造元數據
可參考:https://www.jianshu.com/p/4df4aa54ad15
四:數據源Parquet
1--列式存儲和行式存儲相比有哪些優點呢?
能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。
壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。
只讀取須要的列,支持向量運算,可以獲取更好的掃描性能
2--爲何要使用parque:相對於txt文件而言,parquet查詢性能的提高在某些狀況下可能達到 30 倍或更高,存儲的節省可高達 75%。
用法:
// 讀取Parquet文件中的數據,建立一個DataFrame DataFrame userDf = sqlContext.read().parquet("D:\\文檔\\users.parquet");
3--自動分區推斷:Spark SQL就會自動根據目錄結構,推斷出分區信息。此外,分區列的數據類型,也是自動被推斷出來的。目前,Spark SQL僅支持自動推斷出數字類型和字符串類型。有時,用戶也許不但願Spark SQL自動推斷分區列的數據類型。此時只要設置一個配置便可, spark.sql.sources.partitionColumnTypeInference.enabled,默認爲true,即自動推斷分區列的類型,設置爲false,即不會自動推斷類型。禁止自動推斷分區列的類型時,全部分區列的類型,就統一默認都是String。
可參考:spark從入門到精通--中華石杉,第78講
4-- 合併元數據:用戶能夠在一開始就定義一個簡單的元數據,而後隨着業務須要,逐漸往元數據中添加更多的列。在這種狀況下,用戶可能會建立多個Parquet文件,有着多個不一樣的可是卻互相兼容的元數據。Parquet數據源支持自動推斷出這種狀況,而且進行多個Parquet文件的元數據的合併。
默認狀況下是不進行數據元數據的合併:
一、讀取Parquet文件時,將數據源的選項,mergeSchema,設置爲true 二、使用SQLContext.setConf()方法,將spark.sql.parquet.mergeSchema參數設置爲true
五:數據源
1- json:注意的是,這裏使用的JSON文件與傳統意義上的JSON文件是不同的。每行都必須,也只能包含一個,單獨的,自包含的,有效的JSON對象。不能讓一個JSON對象分散在多行。不然會報錯。
2- hive
使用hive中數據:
JavaSparkContext sparkContext = new JavaSparkContext(conf);
// 建立HiveContext,注意,這裏,它接收的是SparkContext做爲參數,不是JavaSparkContext
HiveContext hiveContext = new HiveContext(sparkContext.sc());
// 判斷是否存在student_infos表,若是存在則刪除
hiveContext.sql("drop table if exists student_infos");
將DataFrame數據保存到hive表中:
goodStudentInfoDF.saveAsTable("good_student_info");
2- jdbc
public class JDBCDataSource { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("JDBCDataSourceJava").setMaster("local"); JavaSparkContext sparkContext = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sparkContext); // 分別將mysql中兩張表的數據加載爲DataFrame Map<String, String> options = new HashMap<String, String>(); options.put("url", "jdbc:mysql://hadoop-100:3306/mytest"); options.put("dbtable", "student_infos"); options.put("user", "root"); options.put("password", "zhaojun2436"); DataFrame infoDF = sqlContext.read().options(options).format("jdbc").load(); options.put("dbtable", "student_scores"); DataFrame scoreDF = sqlContext.read().options(options).format("jdbc").load(); // 將兩個DataFrame轉換爲JavaPairRDD,執行join操做 JavaPairRDD<String, Integer> infoRDD = infoDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<>(row.getString(0), row.getInt(1)); } }); JavaPairRDD<String, Integer> scoreRDD = scoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { @Override public Tuple2<String, Integer> call(Row row) throws Exception { return new Tuple2<>(row.getString(0), row.getInt(1)); } }); JavaPairRDD<String, Tuple2<Integer, Integer>> infoJoinScore = infoRDD.join(scoreRDD); // 將JavaPairRDD轉換爲JavaRDD<Row> JavaRDD<Row> infoJoinScoreRDD = infoJoinScore.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() { @Override public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception { return RowFactory.create(v1._1, v1._2._1, v1._2._2); } }); // 過濾出分數大於80分的數據 JavaRDD<Row> goodStudent = infoJoinScoreRDD.filter(new Function<Row, Boolean>() { @Override public Boolean call(Row v1) throws Exception { if (v1.getInt(2) > 80) { return true; } return false; } }); // 轉換爲DataFrame List<StructField> fieldList = new ArrayList<>(); fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); fieldList.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); StructType structType = DataTypes.createStructType(fieldList); DataFrame df = sqlContext.createDataFrame(goodStudent, structType); Row[] collect = df.collect(); for(Row row : collect) { System.out.println(row); } // 將DataFrame中的數據保存到mysql表中 // 這種方式是在企業裏很經常使用的,有多是插入mysql、有多是插入hbase,還有多是插入redis緩存 goodStudent.foreach(new VoidFunction<Row>() { @Override public void call(Row row) throws Exception { String sql = "insert into good_student_infos values(" + "'" + String.valueOf(row.getString(0)) + "'," + Integer.valueOf(String.valueOf(row.get(1))) + "," + Integer.valueOf(String.valueOf(row.get(2))) + ")"; Class.forName("com.mysql.jdbc.Driver"); Connection conn = null; Statement stmt = null; try { conn = DriverManager.getConnection( "jdbc:mysql://hadoop-100:3306/mytest", "root", "zhaojun2436"); stmt = conn.createStatement(); stmt.executeUpdate(sql); } catch (Exception e) { e.printStackTrace(); } finally { if(stmt != null) { stmt.close(); } if(conn != null) { conn.close(); } } } }); } }
擴展閱讀:udf自定義函數,https://zhangslob.github.io/2018/10/29/Spark%E5%AE%9E%E6%88%98%EF%BC%88%E4%BA%8C%EF%BC%89%E5%AD%A6%E4%B9%A0UDF/