初識Spark2.0之Spark SQL

內存計算平臺Spark在今年6月份的時候正式發佈了spark2.0,相比上一版本的spark1.6版本,在內存優化,數據組織,流計算等方面都作出了較大的改變,同時更加註重基於DataFrame數據組織的MLlib,更加註重機器學習整個過程的管道化。java

固然,做爲使用者,特別是須要運用到線上的系統,大部分廠家仍是會繼續選擇已經穩定的spark1.6版本,而且在spark2.0逐漸成熟以後纔會開始考慮系統組件的升級。做爲開發者,仍是有必要先行一步,去了解spark2.0的一些特性和使用,及思考/借鑑一些spark2.0作出某些改進的思路。mysql

首先,爲了調用spark API 來完成咱們的計算,須要先建立一個sparkContext:sql

 String warehouseLocation = System.getProperty("user.dir") + "spark-warehouse";//用戶的當前工做目錄
SparkConf conf = new SparkConf().setAppName("spark sql test")  
                .set("spark.sql.warehouse.dir", warehouseLocation)  
                .setMaster("local[3]");
  SparkSession spark = SparkSession  
                .builder()  
                .config(conf)  
                .getOrCreate();

上述代碼主要有三點:數據庫

    • 使用spark sql時須要指定數據庫的文件地址,這裏使用了一個本地的目錄
    • spark配置,指定spark app的名稱和數據庫地址,master url爲local 3核
    • 使用SparkSession,取代了本來的SQLContext與HiveContext。對於DataFrame API的用戶來講,Spark常見的混亂源頭來自於使用哪一個「context」。如今你可使用SparkSession了,它做爲單個入口能夠兼容二者。注意本來的SQLContext與HiveContext仍然保留,以支持向下兼容。這是spark2.0的一個較大的改變,對用戶更加友好。

下面開始體驗spark sql:json

 //===========================================1 spark SQL===================  
        //數據導入方式  
        Dataset<Row> df = spark.read().json("..\\sparkTestData\\people.json");  
        //查看錶  
        df.show();  
        //查看錶結構  
        df.printSchema();  
        //查看某一列 相似於MySQL: select name from people  
        df.select("name").show();  
        //查看多列並做計算 相似於MySQL: select name ,age+1 from people  
        df.select(col("name"), col("age").plus(1)).show();  
        //設置過濾條件 相似於MySQL:select * from people where age>21  
        df.filter(col("age").gt(21)).show();  
        //作聚合操做 相似於MySQL:select age,count(*) from people group by age  
        df.groupBy("age").count().show();  
        //上述多個條件進行組合 select ta.age,count(*) from (select name,age+1 as "age" from people) as ta where ta.age>21 group by ta.age  
        df.select(col("name"), col("age").plus(1).alias("age")).filter(col("age").gt(21)).groupBy("age").count().show();  
  
        //直接使用spark SQL進行查詢  
        //先註冊爲臨時表  
        df.createOrReplaceTempView("people");  
        Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");  
        sqlDF.show();

主要關注如下幾點:windows

  • 數據來源:spark能夠直接導入json格式的文件數據,people.json是我從spark安裝包下拷貝的測試數據。
  • spark sql:sparkSql語法和用法和mysql有必定的類似性,能夠查看錶、表結構、查詢、聚合等操做。用戶可使用sparkSql的API接口作聚合查詢等操做或者用類SQL語句實現(可是必須將DataSet註冊爲臨時表)
  • DataSet:DataSet是spark2.0i引入的一個新的特性(在spark1.6中屬於alpha版本)。DataSet結合了RDD和DataFrame的優勢, 並帶來的一個新的概念Encoder當序列化數據時,,Encoder產生字節碼與off-heap進行交互,,可以達到按需訪問數據的效果,而不用反序列化整個對象。
咱們能夠爲自定義的對象建立DataSet,首先建立一個JavaBeans:
/** 
     * 一個描述人屬性的JavaBeans 
     * A JavaBean is a Java object that satisfies certain programming conventions: 
 
        The JavaBean class must implement either Serializable or Externalizable 
        The JavaBean class must have a no-arg constructor 
        All JavaBean properties must have public setter and getter methods 
        All JavaBean instance variables should be private 
     */  
    public static class Person implements Serializable {  
        private String name;  
        private int age;  
  
        public String getName() {  
            return name;  
        }  
  
        public void setName(String name) {  
            this.name = name;  
        }  
  
        public int getAge() {  
            return age;  
        }  
  
        public void setAge(int age) {  
            this.age = age;  
        }  
    }

接下來,就能夠爲該類的對象建立DataSet了,並像操做表同樣操做自定義對象的DataSet了:數據結構

   //爲自定義的對象建立Dataset  
        List<Person> personpList = new ArrayList<Person>();  
        Person person1 = new Person();  
        person1.setName("Andy");  
        person1.setAge(32);  
        Person person2 = new Person();  
        person2.setName("Justin");  
        person2.setAge(19);  
        personpList.add(person1);  
        personpList.add(person2);  
        Encoder<Person> personEncoder = Encoders.bean(Person.class);  
        Dataset<Person> javaBeanDS = spark.createDataset(  
                personpList,  
                personEncoder  
        );  
        javaBeanDS.show();

同時,能夠利用Java反射的特性,來從其餘數據集中建立DataSet對象:app

 //spark支持使用java 反射機制推斷表結構  
        //1 首先建立一個存儲person對象的RDD  
        JavaRDD<Person> peopleRDD = spark.read()  
                .textFile("..\\sparkTestData\\people.txt")  
                .javaRDD()  
                .map(new Function<String, Person>() {  
                    public Person call(String line) throws Exception {  
                        String[] parts = line.split(",");  
                        Person person = new Person();  
                        person.setName(parts[0]);  
                        person.setAge(Integer.parseInt(parts[1].trim()));  
                        return person;  
                    }  
                });  
        //2 表結構推斷  
        Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);  
        peopleDF.createOrReplaceTempView("people");  
  
        //3 定義map 這裏對每一個元素作序列化操做  
        Encoder<String> stringEncoder = Encoders.STRING();  
        Dataset<String> peopleSerDF = peopleDF.map(new MapFunction<Row, String>() {  
            public String call(Row row) throws Exception {  
                return "Name: " + row.getString(1) + " and age is " + String.valueOf(row.getInt(0));  
            }  
        }, stringEncoder);  
        peopleSerDF.show();  
        //==============================================3 從RDD建立Dataset StructType對象的使用  
        JavaRDD<String> peopleRDD2 = spark.sparkContext()  
                .textFile("..\\sparkTestData\\people.txt", 1)  
                .toJavaRDD();  
  
        // 建立一個描述表結構的schema  
        String schemaString = "name age";  
        List<StructField> fields = new ArrayList<StructField>();  
        for (String fieldName : schemaString.split(" ")) {  
            StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);  
            fields.add(field);  
        }  
        StructType schema = DataTypes.createStructType(fields);  
  
        // Convert records of the RDD (people) to Rows  
        JavaRDD<Row> rowRDD = peopleRDD2.map(new Function<String, Row>() {  
            //@Override  
            public Row call(String record) throws Exception {  
                String[] attributes = record.split(",");  
                return RowFactory.create(attributes[0], attributes[1].trim());  
            }  
        });  
  
        // Apply the schema to the RDD  
        Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);  
  
        // Creates a temporary view using the DataFrame  
        peopleDataFrame.createOrReplaceTempView("people");  
        peopleDataFrame.show();

主要關注如下幾點:機器學習

  • RDD:從普通文本文件中解析數據,並建立結構化數據結構的RDD。
  • 表結構推斷的方式建立DataSet:利用Java類反射特性將RDD轉換爲DataSet。
  • 指定表結構的方式建立DataSet:咱們可使用StructType來明肯定義咱們的表結構,完成DataSet的建立
如何將本身的數據/文本導入spark並建立spark的數據對象,對新手來講顯得尤其關鍵,對本身的數據表達好了以後,纔有機會去嘗試spark的其餘API ,完成咱們的目標。通常數據源在通過咱們其餘程序的前處理以後,存儲成行形式的文本/json格式或者自己存儲的hive/mysql數據庫中,spark對這些數據源的調用都是比較方便的。
 
介紹完了spark-sql的數據導入及數據表達後,咱們來完成一個比較簡單的數據統計任務。通常在工做生活中對某些數據按必定的週期進行統計分析是一個比較常見的任務了。下面,咱們就以股票統計的例子爲例。咱們使用spark的窗口統計功能,來對某一公司的股票在2016年6月份的各個星期的均值作統計。
 //在Spark 2.0中,window API內置也支持time windows!Spark SQL中的time windows和Spark Streaming中的time windows很是相似。  
        Dataset<Row> stocksDF = spark.read().option("header","true").  
                option("inferSchema","true").  
                csv("..\\sparkTestData\\stocks.csv");  
  
        //stocksDF.show();  
  
        Dataset<Row> stocks201606 = stocksDF.filter("year(Date)==2016").  
                filter("month(Date)==6");  
        stocks201606.show(100,false);

首先讀入了csv格式的數據文件,同時將2016年6月份的數據過濾出來,並以不截斷的方式輸出前面100條記錄,運行的結果爲:ide

調用window接口作窗口統計:

  //window通常在group by語句中使用。window方法的第一個參數指定了時間所在的列;  
    //第二個參數指定了窗口的持續時間(duration),它的單位能夠是seconds、minutes、hours、days或者weeks。  
        Dataset<Row> tumblingWindowDS = stocks201606.groupBy(window(stocks201606.col("Date"),"1 week")).  
                agg(avg("Close").as("weekly_average"));  
        tumblingWindowDS.show(100,false);  
        tumblingWindowDS.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

其運行結果爲:

因爲沒有指定窗口的開始時間,所以統計的開始時間爲2016-05-26,而且不是從0點開始的。一般狀況下,這樣統計就顯得有點不對了,所以咱們須要指定其開始的日期和時間,可是遺憾的是spark並無接口/參數讓咱們明確的指定統計窗口的開始時間。好在提供了另一種方式,指定偏移時間,上述時間(2016-05-26 08:00:00)作一個時間偏移,也能夠獲得咱們想要的開始時間(2016-06-01 00:00:00)。

 //在前面的示例中,咱們使用的是tumbling window。爲了可以指定開始時間,咱們須要使用sliding window(滑動窗口)。  
    //到目前爲止,沒有相關API來建立帶有開始時間的tumbling window,可是咱們能夠經過將窗口時間(window duration)  
    //和滑動時間(slide duration)設置成同樣來建立帶有開始時間的tumbling window。代碼以下:  
        Dataset<Row>  windowWithStartTime = stocks201606.  
                groupBy(window(stocks201606.col("Date"),"1 week","1 week", "136 hour")).  
                agg(avg("Close").as("weekly_average"));  
        //6 days參數就是開始時間的偏移量;前兩個參數分別表明窗口時間和滑動時間,咱們打印出這個窗口的內容:  
        windowWithStartTime.sort("window.start").  
                select("window.start","window.end","weekly_average").  
                show(false);

運行結果爲:

這就獲得了咱們須要的統計結果了。

關於spark2.0的sparkSql部分,基本就介紹這麼多了。

相關文章
相關標籤/搜索