spark-sql基礎

一:Spark SQL的特色
一、支持多種數據源:Hive、RDD、Parquet、JSON、JDBC等。
二、多種性能優化技術:in-memory columnar storage、byte-code generation、cost model動態評估等。
三、組件擴展性:對於SQL的語法解析器、分析器以及優化器,用戶均可以本身從新開發,而且動態擴展。html

 

二:Spark SQL的性能優化技術簡介
一、內存列存儲(in-memory columnar storage)
  內存列存儲意味着,Spark SQL的數據,不是使用Java對象的方式來進行存儲,而是使用面向列的內存存儲的方式來進行存儲。也就是說,每一列,做爲一個數據存儲的單位。從而大大優化了內存使用的效率。採用了內存列存儲以後,減小了對內存的消耗,也就避免了gc大量數據的性能開銷。
二、字節碼生成技術(byte-code generation)
  Spark SQL在其catalyst模塊的expressions中增長了codegen模塊,對於SQL語句中的計算表達式,好比select num + num from t這種的sql,就可使用動態字節碼生成技術來優化其性能。
三、Scala代碼編寫的優化
  對於Scala代碼編寫中,可能會形成較大性能開銷的地方,本身重寫,使用更加複雜的方式,來獲取更好的性能。好比Option樣例類、for循環、map/filter/foreach等高階函數,以及不可變對象,都改爲了用null、while循環等來實現,而且重用可變的對象。java

 

三:spark Sql的通常用法mysql

可參考官方文檔:https://spark.apache.org/docs/latest/sql-programming-guide.htmlgit

1- 建立DataFramegithub

JavaSparkContext sc = ...; 
SQLContext sqlContext = new SQLContext(sc);
DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
df.show();

 2- DataFrame經常使用用法redis

load:主要用於加載數據,建立出DataFrame;                       DataFrame df = sqlContext.read().load("users.parquet");
save: 主要用於將DataFrame中的數據保存到文件中。
df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
SaveMode:ErrorIfExists--拋異常,Append--追加,Overwrite--重寫,Ignore--忽略,不作操做
format:手動指定數據類型:parquet也是一種數據類型
DataFrame df = sqlContext.read().format("json").load("people.json");
df.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
show:  顯示數據
collect: 獲取全部數據到數組
collectAsList:獲取全部數據到List
describe(cols: String*):獲取指定字段的統計信息,好比count, mean, stddev, min, max等
first, head, take, takeAsList:獲取若干行記錄
where篩選條件
filter:根據字段進行篩選
select:獲取指定字段值
selectExpr:能夠對指定字段進行特殊處理
col/apply:獲取指定字段
drop:去除指定字段,保留其餘字段
limit:限制行數
orderBy和sort:排序
group by數據分組
distinct數據去重
dropDuplicates:根據指定字段去重
agg方法實現聚合操做
withColumn添加新的一列
join連接
union
intersect方法能夠計算出兩個DataFrame中相同的記錄,
except獲取一個DataFrame中有另外一個DataFrame中沒有的記錄
withColumnRenamed:重命名DataFrame中的指定字段名
explode根據某個字段內容進行分割,而後生成多行,這時可使用explode方法

 示例:sql

DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");
df.show();   //打印全部數據
df.printSchema();    //打印元數據
df.select("name").show();  //查詢某列的數據
df.select(df.col("name"), df.col("age").plus(1)).show();  //查詢某列的數據,並對其進行計算
df.filter(df.col("age").gt(21)).show();    //對某列的值進行過濾
df.groupBy("age").count().show();       //排序

3- spark Sql全部的內置函數express

種類 函數
聚合函數 approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
集合函數 array_contains, explode, size, sort_array
日期/時間函數

日期時間轉換
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
從日期時間中提取字段
year, month, dayofmonth, hour, minute, secondapache

日期/時間計算
datediff, date_add, date_sub, add_months, last_day, next_day, months_between
獲取當前時間等
current_date, current_timestamp, trunc, date_format編程

數學函數 abs, acros, asin, atan, atan2, bin, cbrt, ceil, conv, cos, sosh, exp, expm1, factorial, floor, hex, hypot, log, log10, log1p, log2, pmod, pow, rint, round, shiftLeft, shiftRight, shiftRightUnsigned, signum, sin, sinh, sqrt, tan, tanh, toDegrees, toRadians, unhex
混合函數 array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, isNaN, isnotnull, isnull, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
字符串函數 ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
窗口函數 cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber

4- Spark SQL支持兩種方式來將RDD轉換爲DataFrame。
第一種方式,是使用反射來推斷包含了特定數據類型的RDD的元數據。這種基於反射的方式,代碼比較簡潔,當你已經知道你的RDD的元數據時,是一種很是不錯的方式。

JavaRDD<String> lines = sc.textFile("C:\\Users\\zhang\\Desktop\\students.txt");
​JavaRDD<Student> students = lines.map(new Function<String, Student>() {
  ​​private static final long serialVersionUID = 1L;
  ​​@Override
  public Student call(String line) throws Exception {
    ​​​String[] lineSplited = line.split(",");  
​​​    Student stu = new Student();
    ​​​stu.setId(Integer.valueOf(lineSplited[0].trim()));  
    ​​​stu.setName(lineSplited[1]);  
    ​​​stu.setAge(Integer.valueOf(lineSplited[2].trim()));
​​​    return stu;
​​  }
​});
DataFrame studentDF = sqlContext.createDataFrame(students, Student.class); //Stundent類必須實現接口 Serializable

第二種方式,是經過編程接口來建立DataFrame,你能夠在程序運行時動態構建一份元數據,而後將其應用到已經存在的RDD上。這種方式的代碼比較冗長,可是若是在編寫程序時,還不知道RDD的元數據,只有在程序運行時,才能動態得知其元數據,那麼只能經過這種動態構建元數據的方式。

​​import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;


JavaRDD<Row> studentRDD = lines.map(new Function<String, Row>() {
  ​​​private static final long serialVersionUID = 1L;
   @Override
​​​  public Row call(String line) throws Exception {
​​​​    String[] lineSplited = line.split(",");
    return RowFactory.create(
      ​​​​​​Integer.valueOf(lineSplited[0]),
      ​​​​​​lineSplited[1],
      ​​​​​​Integer.valueOf(lineSplited[2]));      
   ​​​}
​​});

List<StructField> structFields = new ArrayList<StructField>(); ​​structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); ​​structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); ​​structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); ​​StructType structType = DataTypes.createStructType(structFields); ​​// 第三步,使用動態構造的元數據,將RDD轉換爲DataFrame ​​DataFrame studentDF = sqlContext.createDataFrame(studentRDD, structType);// 第一步,建立一個普通的RDDJavaRDD<String> lines = sc.textFile("C://Users//Administrator//Desktop//students.txt");​​// 第二步,動態構造元數據

 可參考:https://www.jianshu.com/p/4df4aa54ad15

 

四:數據源Parquet

1--列式存儲和行式存儲相比有哪些優點呢?

能夠跳過不符合條件的數據,只讀取須要的數據,下降IO數據量。

壓縮編碼能夠下降磁盤存儲空間。因爲同一列的數據類型是同樣的,可使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。

只讀取須要的列,支持向量運算,可以獲取更好的掃描性能

 

2--爲何要使用parque:相對於txt文件而言,parquet查詢性能的提高在某些狀況下可能達到 30 倍或更高,存儲的節省可高達 75%。

用法:

        // 讀取Parquet文件中的數據,建立一個DataFrame
        DataFrame userDf = sqlContext.read().parquet("D:\\文檔\\users.parquet");

 

3--自動分區推斷:Spark SQL就會自動根據目錄結構,推斷出分區信息。此外,分區列的數據類型,也是自動被推斷出來的。目前,Spark SQL僅支持自動推斷出數字類型和字符串類型。有時,用戶也許不但願Spark SQL自動推斷分區列的數據類型。此時只要設置一個配置便可, spark.sql.sources.partitionColumnTypeInference.enabled,默認爲true,即自動推斷分區列的類型,設置爲false,即不會自動推斷類型。禁止自動推斷分區列的類型時,全部分區列的類型,就統一默認都是String。

可參考:spark從入門到精通--中華石杉,第78講

 

4-- 合併元數據:用戶能夠在一開始就定義一個簡單的元數據,而後隨着業務須要,逐漸往元數據中添加更多的列。在這種狀況下,用戶可能會建立多個Parquet文件,有着多個不一樣的可是卻互相兼容的元數據。Parquet數據源支持自動推斷出這種狀況,而且進行多個Parquet文件的元數據的合併。

默認狀況下是不進行數據元數據的合併:

一、讀取Parquet文件時,將數據源的選項,mergeSchema,設置爲true
二、使用SQLContext.setConf()方法,將spark.sql.parquet.mergeSchema參數設置爲true

 

五:數據源

1- json:注意的是,這裏使用的JSON文件與傳統意義上的JSON文件是不同的。每行都必須,也只能包含一個,單獨的,自包含的,有效的JSON對象。不能讓一個JSON對象分散在多行。不然會報錯。

2- hive

使用hive中數據:
JavaSparkContext sparkContext = new JavaSparkContext(conf); // 建立HiveContext,注意,這裏,它接收的是SparkContext做爲參數,不是JavaSparkContext HiveContext hiveContext = new HiveContext(sparkContext.sc()); // 判斷是否存在student_infos表,若是存在則刪除 hiveContext.sql("drop table if exists student_infos");
將DataFrame數據保存到hive表中:
goodStudentInfoDF.saveAsTable("good_student_info");

  2- jdbc

public class JDBCDataSource {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("JDBCDataSourceJava").setMaster("local");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sparkContext);

        // 分別將mysql中兩張表的數據加載爲DataFrame
        Map<String, String> options = new HashMap<String, String>();
        options.put("url", "jdbc:mysql://hadoop-100:3306/mytest");
        options.put("dbtable", "student_infos");
        options.put("user", "root");
        options.put("password", "zhaojun2436");

        DataFrame infoDF = sqlContext.read().options(options).format("jdbc").load();

        options.put("dbtable", "student_scores");
        DataFrame scoreDF = sqlContext.read().options(options).format("jdbc").load();

        // 將兩個DataFrame轉換爲JavaPairRDD,執行join操做
        JavaPairRDD<String, Integer> infoRDD = infoDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<>(row.getString(0), row.getInt(1));
            }
        });

        JavaPairRDD<String, Integer> scoreRDD = scoreDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Row row) throws Exception {
                return new Tuple2<>(row.getString(0), row.getInt(1));
            }
        });

        JavaPairRDD<String, Tuple2<Integer, Integer>> infoJoinScore = infoRDD.join(scoreRDD);

        // 將JavaPairRDD轉換爲JavaRDD<Row>
        JavaRDD<Row> infoJoinScoreRDD = infoJoinScore.map(new Function<Tuple2<String, Tuple2<Integer, Integer>>, Row>() {
            @Override
            public Row call(Tuple2<String, Tuple2<Integer, Integer>> v1) throws Exception {
                return RowFactory.create(v1._1, v1._2._1, v1._2._2);
            }
        });

        // 過濾出分數大於80分的數據
        JavaRDD<Row> goodStudent = infoJoinScoreRDD.filter(new Function<Row, Boolean>() {
            @Override
            public Boolean call(Row v1) throws Exception {
                if (v1.getInt(2) > 80) {
                    return true;
                }
                return false;
            }
        });

        // 轉換爲DataFrame
        List<StructField> fieldList = new ArrayList<>();
        fieldList.add(DataTypes.createStructField("name", DataTypes.StringType, true));
        fieldList.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
        fieldList.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));

        StructType structType = DataTypes.createStructType(fieldList);

        DataFrame df = sqlContext.createDataFrame(goodStudent, structType);

        Row[] collect = df.collect();
        for(Row row : collect) {
            System.out.println(row);
        }

        // 將DataFrame中的數據保存到mysql表中
        // 這種方式是在企業裏很經常使用的,有多是插入mysql、有多是插入hbase,還有多是插入redis緩存
        goodStudent.foreach(new VoidFunction<Row>() {
            @Override
            public void call(Row row) throws Exception {
                String sql = "insert into good_student_infos values("
                        + "'" + String.valueOf(row.getString(0)) + "',"
                        + Integer.valueOf(String.valueOf(row.get(1))) + ","
                        + Integer.valueOf(String.valueOf(row.get(2))) + ")";

                Class.forName("com.mysql.jdbc.Driver");

                Connection conn = null;
                Statement stmt = null;
                try {
                    conn = DriverManager.getConnection(
                            "jdbc:mysql://hadoop-100:3306/mytest", "root", "zhaojun2436");
                    stmt = conn.createStatement();
                    stmt.executeUpdate(sql);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if(stmt != null) {
                        stmt.close();
                    }
                    if(conn != null) {
                        conn.close();
                    }
                }
            }
        });
    }
}

 

 擴展閱讀:udf自定義函數,https://zhangslob.github.io/2018/10/29/Spark%E5%AE%9E%E6%88%98%EF%BC%88%E4%BA%8C%EF%BC%89%E5%AD%A6%E4%B9%A0UDF/

相關文章
相關標籤/搜索