Spark SQL是Spark框架的重要組成部分, 主要用於結構化數據處理和對Spark數據執行類SQL的查詢。html
DataFrame是一個分佈式的,按照命名列的形式組織的數據集合。 一張SQL數據表能夠映射爲一個DataFrame對象,DataFrame是Spark SQL中的主要數據結構。python
SqlContext實例是DataFrame和Spark SQL的操做入口, pyspark交互環境中已初始化了一個sqlContext實例, 在提交任務腳本時須要使用一個SparkContext來初始化:sql
from pyspark.sql import SQLContext sqlContext = SqlContext(sparkContext)
本文測試環境爲Spark 2.1.0, Python API.數據庫
SqlContext.createDataFrame
方法能夠從python的list中建立DataFrame:apache
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)] >>> df = sqlContext.createDataFrame(data) >>> df.collect() [Row(_1=u'a', _2=1, _3=18), Row(_1=u'b', _2=2, _3=22), Row(_1=u'c', _2=3, _3=20)]
list中的每一項成爲DataFrame中的一行, 每一列的名字默認爲_1
, _2
, _3
.api
一樣能夠使用RDD來建立:緩存
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)] >>> rdd = sc.parallelize(data) >>> df = sqlContext.createDataFrame(rdd) >>> df.collect() [Row(_1=u'a', _2=1, _3=18), Row(_1=u'b', _2=2, _3=22), Row(_1=u'c', _2=3, _3=20)]
或者採用更簡單的方法:數據結構
>>> df = rdd.toDF() >>> >>> df.collect() [Row(_1=u'a', _2=1, _3=18), Row(_1=u'b', _2=2, _3=22), Row(_1=u'c', _2=3, _3=20)]
createFrame的第二個參數爲可選參數schema用於定義每一列的名稱和類型:框架
>>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)] >>> df = sqlContext.createDataFrame(data, ['name', 'id', 'age']) >>> df.collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
一樣能夠使用元素爲dict的列表建立DataFrame實例:分佈式
>>> data = [ ... {'name':'a', 'id':1, 'age': 18}, ... {'name':'b', 'id':2, 'age': 22}, ... {'name':'c', 'id':3, 'age': 20}] >>> df = sqlContext.createDataFrame(data) >>> df.collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
不過Spark官方推薦使用Row對象來代替dict:
>>> from pyspark.sql import Row >>> User = Row('name', 'id', 'age') >>> row1 = User('a', 1, 18) >>> row2 = User('b', 2, 22) >>> row3 = User('b', 3, 20) >>> data = [row1, row2, row3] >>> df = sqlContext.createDataFrame(data) >>> df.collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
schema參數也能夠使用pyspark中定義的字段類型:
>>> from pyspark.sql.types import StructType, StructField >>> from pyspark.sql.types import StringType, IntegerType >>> schema = StructType([ ... StructField("name", StringType(), True), # name, type, nullable ... StructField("id", IntegerType(), True), ... StructField("age", IntegerType(), True)]) >>> data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)] >>> df = sqlContext.createDataFrame(data, schema) >>> df.collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
更多關於createDataFrame方法的信息能夠參考官方文檔
SqlContext.read
是一個pyspark.sql.DataFrameReader
對象, 它能夠用於根據外部數據源建立DataFrame, 包括讀取文件和使用jdbc讀取數據庫。
詳情能夠參考官方文檔
DataFrame提供了一些經常使用操做的實現, 能夠使用這些接口查看或修改DataFrame:
df.collect()
: 以Row列表的方式顯示df中的全部數據df.show()
: 以可視化表格的方式打印df中的全部數據df.count()
: 顯示df中數據的行數df.describe()
返回一個新的DataFrame對象包含對df中數值列的統計數據df.cache()
: 以MEMORY_ONLY_SER
方式進行持久化df.persist(level)
: 以指定的方式進行持久化df.unpersist()
: 刪除緩存DataFrame的一些屬性能夠用於查看它的結構信息:
df.columns
: 返回各列名稱的列表
df.schema
: 以StructType對象的形式返回df的表結構
df.dtypes
: 以列表的形式返回每列的名稱和類型。
[('name', 'string'), ('id', 'int')]
df.rdd
將DataFrame對象轉換爲rdd
DataFrame支持使用Map和Reduce操做:
df.map(func)
: 等同於df.rdd.map(func)
df.reduce(func)
: 等同於 df.rdd.reduce(func)
DataFrame的結構能夠進行一些修改:
df.drop(col)
: 返回一個刪除指定列後的DataFrame對象:>>> df.drop('age') DataFrame[age:int, id: int]
>>>df.drop(df.name) DataFrame[age:int, id: int]
一樣能夠查詢DataFrame中特定的記錄:
df.take(index)
: 以列表的形式返回df的前n條記錄, 下標從1開始
df.first()
: 返回df中的第一個Row對象
df.filter(cond)
: 返回只包含知足條件記錄的新DataFrame對象
>>> df.filter(df.age>=20).collect() [Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)]
df.select(col)
: 返回只包含指定列的新DataFrame對象:>>> df.select('*').collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)] >>> df.select(df.id, df.age-1).collect() [Row(id=1, (age - 1)=17), Row(id=2, (age - 1)=21), Row(id=3, (age - 1)=19)]
df.join(other, on=None, how=None)
將df和other兩個DataFrame對象鏈接爲一個DataFrame對象.
'inner'
, 'outer'
, 'left_outer'
, 'right_outer'
, 'leftsemi'
, 默認爲'inner'
>>> df.collect() [Row(name=u'a', id=1, age=18), Row(name=u'b', id=2, age=22), Row(name=u'c', id=3, age=20)] >>> df2.collect() [Row(id=1, nation=u'cn'), Row(id=2, nation=u'us'), Row(id=4, nation=u'uk')] >>> df.join(df2, 'id').collect() [Row(id=1, name=u'a', age=18, nation=u'cn'), Row(id=2, name=u'b', age=22, nation=u'us')]
df.limit(num)
: 返回一個新的DataFrame對象, 其記錄數不超過num, 多餘的記錄將被刪除.
df.distinct()
: 返回一個新的去除重複行後的DataFrame對象
更多信息能夠參考官方文檔