關於Spark SQL (Structured Query Language),首先會想到一個問題:Apache Hive vs Apache Spark SQL – 13 Amazing Differenceshtml
Hive has been known to be the component of Big data ecosystem where legacy mappers and reducers are needed to process data from HDFS whereas Spark SQL is known to be the component of Apache Spark API which has made processing on Big data ecosystem a lot easier and real-time. java
Ref: https://www.bilibili.com/video/av27076260/?p=9 mysql
Ref: [MySQL] 01- Basic sql sql
小比較:數據庫
Hive: SQL --> map/reduce,Hive on Spark 就是把map/reduce直接換爲Spark。apache
Spark SQL: SQL integrated in Spark;可直接讀取Hive的保存的文件,兼容Hive。編程
架構圖:json
轉化過程:網絡
Catalyst's general tree transformation framework數據結構
(一)SQL Parser 轉化爲 Abstract Syntax Tree。
(二)邏輯最佳化,只保留須要的部分:
Parquet格式時,將字符串透過字典編碼壓縮成整數,縮小資料量;
RDBMS時,將篩選推到資料源端。
(三)可執行的物理計劃,併產生 JVM bytecode。
智能選擇 「Broadcast Join」 or "Shuffle Join" 來減小網絡流量
低階優化,減小比較消耗的物件。
優化示例:
def add_demographics(events): u = sqlCtx.table("users") events.join(u, events.user_id) == u.user_id).withColumn("city", zipToCity(df.zip)) events = add_demographics(sqlCtx.load("/data/events", "parquet")) training_data = events.where(events.city == "New York").select(events.timestamp).collect()
抽象語法樹:
Dataframe做爲新的數據結構,能夠操做更爲細粒度。(能看到RDD內部的結構化信息)
Spark SQL編程使用的是:SparkSession 接口,相似以前的SparkContext 的地位。
「關係型數據庫」 與 "機器學習" 的結合。
PySpark交互環境下,會自動生成 SparkSession 和 SparkContext。
from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
/usr/local/spark/examples/src/main/resources/ 文件夾下有實例文件以供實驗。
df = spark.read.text("people.txt") df = spark.read.format("text").load("people.txt") # 等價的方式
df = spark.read.json("people.json") df = spark.read.parquet("people.parquet")
df.show()
從HDFS中讀取文件。
from pyspark.sql import SQLContext
sc = SparkContext() sqlcontext = SQLContext(sc)
#format後面爲告訴程序讀取csv格式,load後面爲hdfs地址,hdfs後面跟着hadoop的名字,而後文件目錄(這塊有點懵,若是報錯,跟着報錯查修) data = sqlcontext.read.format("com.databricks.spark.csv").\ options(header="true",inferschema="true").\ load("hdfs://cdsw1.richstone.com/test1/5min.csv") data.show(5) result = data.groupBy("type").count().orderBy("count") result.show()
注意,最後 newpeople.json 是目錄,有點意思。
peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("json").save("file:///usr/local/spark/mycode/sparksql/newpeople.json")
除了如上的json,還支持 text, parquet 格式 的文件。
df.printSchema() # 模式信息
df.select(df["name",df["age"]+1]).show() # age列的value都加1
df.filter(df["age"]>20).show()
df.groupBy("age").count().show()
df.sort(df["age"]).desc()).show() df.sort(df["age"]).desc().df["name"].asc()).show() # 輕鬆實現 "二次排序"
使用反射來推斷包含特定對象類型的RDD的模式(schema)。適用於寫spark程序的同時,已經知道了模式,使用反射可使得代碼簡潔。
結合樣本的名字,經過反射讀取,做爲列的名字。
這個RDD能夠隱式轉化爲一個SchemaRDD,而後註冊爲一個表。表能夠在後續的sql語句中使用。
第一步、轉化爲Row形式的RDD格式
from pyspark.sql import Row people = spark.sparkContext.textFile("..."). \ ... map(lambda line: line.split(",")). \ ... map(lambda p: Row(name=p[0], age=int(p[1])))
這裏轉換爲了Row對象。
第二步、RDD格式 轉換爲 DataFrame
這裏只是經過DataFrame的sql比較方便的查詢了下數據而已。
注意理解:people是rdd,通過一次轉變df後,又變回rdd,完成一次 「反射」 過程。
schemaPeople = spark.createDataFrame(people)
# 必須註冊爲臨時表才能供下面的查詢使用 schemaPeople.createOrReplaceTempView("people") personDF = spark.sql("select name, age from people where age > 20")
# 再轉化回RDD形式,而df的一行也就是p,其中包含name, age兩個元素 personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + "," + "Age: " + str(p.age)) personsRDD.foreach(print)
Output:
Name: Michael,Age: 29
Name: Andy,Age: 30
當沒法提早獲知數據結構時。
Jeff: 數據文件中沒有說明「數據類型」,只是全都是字符串。而「反射」方案中的rdd是含有類型的。
from pyspark.sql.types import * from pyspark.sql import Row # 生成"表頭" schemaString = "name age" fields = [ StructField(field_name, StringType(), True) for field_name in schemaString.split(" ") ] schema = StructType(fields) # 生成"記錄" lines = spark.sparkContext.textFile("file:/// ... people.txt") parts = lines.map(lambda x: x.split(",")) people = parts.map(lambda p: Row(p[0], p[1].strip()))
#--------------------------------------------------------------
# 拼接「表頭」和「記錄」 schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name,age FROM people")
results.show()
啓動數據庫。
service mysql start
mysql -u root -p
建立數據。
create database spark; use spark; create table student (id int(4), name char(20), gender char(4), age int(4)); insert into student values(1, 'Xueqian', 'F', 23) insert into student values(2, 'Weiliang', 'M', 24) select * from student
Spark調用MySQL,需安裝JDBC驅動程序:mysql-connector-java-5.1.40.tar.gz
作個查詢,測試一下鏈接。
>>> use spark; >>> select * from student;
轉變爲Row的形式
rowRDD = studentRDD.map(lambda p: Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
studentDF = spark.createDataFrame(rowRDD, schema)
DataFrame形式的數據 寫入 數據庫。
prop = {} prop['user'] = 'root' prop['password'] = '123456' prop['driver'] = "com.mysql.jdbc.Driver"
# 構建好參數後 studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark",'student','append', prop)
End.