官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html html
Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分佈式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()建立的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數python
SQLContext能夠用來建立DataFrame、註冊DataFrame爲表、在表上執行SQL、緩存表、讀取parquet文件。sql
參數:● sparkContext - 支持sqlcontext的sparkcontext
● sqlContext - 一個可選的JVM Scala sqlcontext,若設置,咱們不須要在JVM實例化一個新的sqlcontext,而是都調用這個對象。數據庫
注:在1.3中已過期,使用createDataFrame()代替。apache
緩存表到內存中json
從內存緩存刪除全部緩存表。api
從元組/列表RDD或列表或pandas.DataFrame建立DataFrame
當模式是列名的列表時,每一個列的類型會從數據中推斷出來。
當模式沒有時,將嘗試從數據中推斷模式(列名和類型),數據應該是行或命名元組或字典的RDD。
若是模式推理是必要的,samplingRatio用來肯定用於模式推理的行比率。若是沒有samplingratio,將使用第一行。緩存
參數:● data - 行或元組或列表或字典的RDD、list、pandas.DataFrame.
● schema – 一個結構化類型或者列名列表,默認是空。app
samplingRatio – 用於推斷的行的樣本比率。
返回: DataFrame分佈式
>>> l=[('Alice',1)] >>> sqlContext.createDataFrame(l).collect() [Row(_1=u'Alice', _2=1)] >>> sqlContext.createDataFrame(l,['name','age']).collect() [Row(name=u'Alice', age=1)]
>>> d=[{'name':'Alice','age':1}] >>> sqlContext.createDataFrame(d).collect() [Row(age=1, name=u'Alice')]
>>> rdd=sc.parallelize(l) >>> sqlContext.createDataFrame(rdd).collect() [Row(_1=u'Alice', _2=1)] >>> df=sqlContext.createDataFrame(rdd,['name','age']) >>> df.collect() [Row(name=u'Alice', age=1)]
>>> sqlContext.createDataFrame(df.toPandas()).collect() [Row(name=u'Alice', age=1)] >>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect() [Row(0=1, 1=2)]
建立基於數據源中的數據的外部表.
返回與外部表關聯的DataFrame
數據源由源和一組選項指定。若是未指定源,那麼將使用由spark.sql.sources.default 配置的默認的數據源配置。
一般,一個模式能夠被提供做爲返回的DataFrame的模式,而後建立外部表。
返回: DataFrame
從目錄中刪除臨時表
>>> sqlContext.registerDataFrameAsTable(df, "table1") >>> sqlContext.dropTempTable("table1")
返回指定鍵的Spark SQL配置屬性值。
若是鍵沒有指定返回默認值。
注:在1.3中已過期,使用createDataFrame()代替。
從一個文本文件中加載數據,這個文件的每一行均爲JSON字符串。
注:在1.4中已過期,使用DataFrameReader.json()代替。
從一個已經存在的RDD中加載數據,這個RDD中的每個元素均爲一個JSON字符串。
若是提供了模式,將給定的模式應用到這個JSON數據集。不然,它根據數據集的採樣比例來肯定模式。
>>> json=sc.parallelize(["""{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""","""{"name":"john","addr":{"city":"shanghai","mail":"10002"}}"""]) >>> df1 = sqlContext.jsonRDD(json) >>> df1.collect() [Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]
>>> df2 = sqlContext.jsonRDD(json,df1.schema) >>> df2.collect() [Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]
返回數據源中的數據集爲DataFrame.
注:在1.4中已過期,使用DataFrameReader.load()代替。
返回一個新的SQLContext作爲一個新的會話,這個會話有單獨的SQLConf,註冊臨時表和UDFs,但共享sparkcontext和緩存表。
加載Parquet文件,返回結果爲DataFrame
注:在1.4中已過期,使用DataFrameReader.parquet()代替。
建立只有一個名爲id的長類型的列的DataFrame,包含從開始到結束的按照必定步長的獨立元素。
參數:● start - 開始值
● end - 結束值
● step - 增量值(默認:1)
● numPartitions – DataFrame分區數
返回: DataFrame
>>> sqlContext.range(1, 7, 2).collect() [Row(id=1), Row(id=3), Row(id=5)]
若是僅有一個參數,那麼這個參數被做爲結束值。
>>> sqlContext.range(3).collect() [Row(id=0), Row(id=1), Row(id=2)]
返回一個DataFrameReader,可用於讀取數據爲DataFrame。
註冊給定的DataFrame做爲目錄中的臨時表。
臨時表只在當前SQLContext實例有效期間存在。
>>> sqlContext.registerDataFrameAsTable(df, "table1")
註冊python方法(包括lambda方法),做爲UDF,這樣能夠在 SQL statements中使用。
除了名稱和函數自己以外,還能夠選擇性地指定返回類型。當返回類型沒有指定時,默認自動轉換爲字符串。對於任何其餘返回類型,所生成的對象必須與指定的類型匹配。
參數:● name - UDF名稱
● f – python方法
● 返回類型 數據類型對象
>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x)) >>> sqlContext.sql("SELECT stringLengthString('test')").collect() [Row(_c0=u'4')]
>>> from pyspark.sql.types import IntegerType >>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(_c0=4)]
>>> from pyspark.sql.types import IntegerType >>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(_c0=4)]
設置給定的Spark SQL配置屬性
返回DataFrame表明給定查詢的結果
參數:● sqlQuery - sql語句
返回: DataFrame
>>> l=[(1,'row1'),(2,'row2'),(3,'row3')] >>> df = sqlContext.createDataFrame(l,['field1','field2']) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1") >>> df2.collect() [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
返回指定的表爲DataFrame
返回: DataFrame
>>> l=[(1,'row1'),(2,'row2'),(3,'row3')] >>> df = sqlContext.createDataFrame(l,['field1','field2']) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.table("table1") >>> sorted(df.collect()) == sorted(df2.collect()) True
返回數據庫的表名稱列表
參數:dbName – 字符串類型的數據庫名稱.默認爲當前的數據庫。
返回: 字符串類型的表名稱列表
>>> l=[(1,'row1'),(2,'row2'),(3,'row3')] >>> df = sqlContext.createDataFrame(l,['field1','field2']) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() True >>> "table1" in sqlContext.tableNames("db") True
返回一個包含表名稱的DataFrame從給定的數據庫。
若是數據庫名沒有指定,將使用當前的數據庫。
返回的DataFrame包含兩列: 表名稱和是否臨時表 (一個Bool類型的列,標識表是否爲臨時表)。
參數:● dbName – 字符串類型的使用的數據庫名
返回: DataFrame
>>> l=[(1,'row1'),(2,'row2'),(3,'row3')] >>> df = sqlContext.createDataFrame(l,['field1','field2']) >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> df2 = sqlContext.tables() >>> df2.filter("tableName = 'table1'").first() Row(tableName=u'table1', isTemporary=True)
返回一個註冊的UDF爲UDFRegistration。
返回: UDFRegistration
從內存的緩存表中移除指定的表。
Hive此處暫略