《Spark Python API 官方文檔中文版》 之 pyspark.sql (一)

摘要:在Spark開發中,因爲須要用Python實現,發現API與Scala的略有不一樣,而Python API的中文資料相對不多。每次去查英文版API的說明相對比較慢,仍是中文版比較容易get到所需,因此利用閒暇之餘將官方文檔翻譯爲中文版,並親測Demo的代碼。在此記錄一下,但願對那些對Spark感興趣和從事大數據開發的人員提供有價值的中文資料,對PySpark開發人員的工做和學習有所幫助。

官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            html

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分佈式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()建立的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數python

1.class pyspark.sql.SQLContext(sparkContext, sqlContext=None)

SQLContext能夠用來建立DataFrame、註冊DataFrame爲表、在表上執行SQL、緩存表、讀取parquet文件。sql

參數:●  sparkContext - 支持sqlcontext的sparkcontext
           ●  sqlContext - 一個可選的JVM Scala sqlcontext,若設置,咱們不須要在JVM實例化一個新的sqlcontext,而是都調用這個對象。數據庫

1.1 applySchema(rdd, schema)

注:在1.3中已過期,使用createDataFrame()代替。apache

1.2 cacheTable(tableName)

緩存表到內存中json

1.3 clearCache()

從內存緩存刪除全部緩存表。api

1.4 createDataFrame(data, schema=None, samplingRatio=None)

從元組/列表RDD或列表或pandas.DataFrame建立DataFrame
當模式是列名的列表時,每一個列的類型會從數據中推斷出來。
當模式沒有時,將嘗試從數據中推斷模式(列名和類型),數據應該是行或命名元組或字典的RDD。
若是模式推理是必要的,samplingRatio用來肯定用於模式推理的行比率。若是沒有samplingratio,將使用第一行。緩存

參數:●  data - 行或元組或列表或字典的RDD、list、pandas.DataFrame.
      ● schema – 一個結構化類型或者列名列表,默認是空。app

samplingRatio – 用於推斷的行的樣本比率。
返回: DataFrame分佈式

>>> l=[('Alice',1)]
>>> sqlContext.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> sqlContext.createDataFrame(l,['name','age']).collect()
[Row(name=u'Alice', age=1)]
>>> d=[{'name':'Alice','age':1}]
>>> sqlContext.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd=sc.parallelize(l)
>>> sqlContext.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df=sqlContext.createDataFrame(rdd,['name','age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> sqlContext.createDataFrame(df.toPandas()).collect()  
[Row(name=u'Alice', age=1)]
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]

1.5 createExternalTable(tableName, path=None, source=None, schema=None, **options)

建立基於數據源中的數據的外部表.
返回與外部表關聯的DataFrame
數據源由源和一組選項指定。若是未指定源,那麼將使用由spark.sql.sources.default 配置的默認的數據源配置。
一般,一個模式能夠被提供做爲返回的DataFrame的模式,而後建立外部表。
返回: DataFrame

1.6 dropTempTable(tableName)

從目錄中刪除臨時表

>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")

1.7 getConf(key, defaultValue)

返回指定鍵的Spark SQL配置屬性值。
若是鍵沒有指定返回默認值。

1.8 inferSchema(rdd, samplingRatio=None)

注:在1.3中已過期,使用createDataFrame()代替。

1.9 jsonFile(path, schema=None, samplingRatio=1.0)

從一個文本文件中加載數據,這個文件的每一行均爲JSON字符串。
注:在1.4中已過期,使用DataFrameReader.json()代替。

1.10 jsonRDD(rdd, schema=None, samplingRatio=1.0)

從一個已經存在的RDD中加載數據,這個RDD中的每個元素均爲一個JSON字符串。
若是提供了模式,將給定的模式應用到這個JSON數據集。不然,它根據數據集的採樣比例來肯定模式。

>>> json=sc.parallelize(["""{"name":"jack","addr":{"city":"beijing","mail":"10001"}}""","""{"name":"john","addr":{"city":"shanghai","mail":"10002"}}"""])
>>> df1 = sqlContext.jsonRDD(json)
>>> df1.collect()
[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]
>>> df2 = sqlContext.jsonRDD(json,df1.schema)
>>> df2.collect()
[Row(addr=Row(city=u'beijing', mail=u'10001'), name=u'jack'), Row(addr=Row(city=u'shanghai', mail=u'10002'), name=u'john')]

1.11 load(path=None, source=None, schema=None, **options)

返回數據源中的數據集爲DataFrame.
注:在1.4中已過期,使用DataFrameReader.load()代替。

1.12 newSession()

返回一個新的SQLContext作爲一個新的會話,這個會話有單獨的SQLConf,註冊臨時表和UDFs,但共享sparkcontext和緩存表。

1.13 parquetFile(*paths)

加載Parquet文件,返回結果爲DataFrame
注:在1.4中已過期,使用DataFrameReader.parquet()代替。

1.14 range(start, end=None, step=1, numPartitions=None)

建立只有一個名爲id的長類型的列的DataFrame,包含從開始到結束的按照必定步長的獨立元素。

參數:●  start - 開始值
      ●  end -  結束值
           ●  step - 增量值(默認:1)            
   ●  numPartitions – DataFrame分區數

返回: DataFrame

>>> sqlContext.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]

若是僅有一個參數,那麼這個參數被做爲結束值。

>>> sqlContext.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]

1.15 read

返回一個DataFrameReader,可用於讀取數據爲DataFrame。

1.16 registerDataFrameAsTable(df, tableName)

註冊給定的DataFrame做爲目錄中的臨時表。
臨時表只在當前SQLContext實例有效期間存在。

>>> sqlContext.registerDataFrameAsTable(df, "table1")

1.17 registerFunction(name, f, returnType=StringType)

註冊python方法(包括lambda方法),做爲UDF,這樣能夠在 SQL statements中使用。
除了名稱和函數自己以外,還能夠選擇性地指定返回類型。當返回類型沒有指定時,默認自動轉換爲字符串。對於任何其餘返回類型,所生成的對象必須與指定的類型匹配。
參數:●  name - UDF名稱
      ●  f – python方法
      ●  返回類型 數據類型對象

>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlContext.sql("SELECT stringLengthString('test')").collect()
[Row(_c0=u'4')]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(_c0=4)]
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
>>> sqlContext.sql("SELECT stringLengthInt('test')").collect()
[Row(_c0=4)]

1.18 setConf(key, value)

設置給定的Spark SQL配置屬性

1.19 sql(sqlQuery)

返回DataFrame表明給定查詢的結果
參數:● sqlQuery - sql語句  
返回: DataFrame

>>> l=[(1,'row1'),(2,'row2'),(3,'row3')]
>>> df = sqlContext.createDataFrame(l,['field1','field2'])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]

1.20 table(tableName)

返回指定的表爲DataFrame
返回: DataFrame

>>> l=[(1,'row1'),(2,'row2'),(3,'row3')]
>>> df = sqlContext.createDataFrame(l,['field1','field2'])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True

1.21 tableNames(dbName=None)  

返回數據庫的表名稱列表
參數dbName – 字符串類型的數據庫名稱.默認爲當前的數據庫。
返回: 字符串類型的表名稱列表

>>> l=[(1,'row1'),(2,'row2'),(3,'row3')]
>>> df = sqlContext.createDataFrame(l,['field1','field2'])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlContext.tableNames()
True
>>> "table1" in sqlContext.tableNames("db")
True

1.22 tables(dbName=None)

返回一個包含表名稱的DataFrame從給定的數據庫。
若是數據庫名沒有指定,將使用當前的數據庫。
返回的DataFrame包含兩列: 表名稱和是否臨時表 (一個Bool類型的列,標識表是否爲臨時表)。

參數:● dbName – 字符串類型的使用的數據庫名
返回: DataFrame

>>> l=[(1,'row1'),(2,'row2'),(3,'row3')]
>>> df = sqlContext.createDataFrame(l,['field1','field2'])
>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlContext.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)

1.23 udf

返回一個註冊的UDF爲UDFRegistration。
返回: UDFRegistration

1.24 uncacheTable(tableName)

從內存的緩存表中移除指定的表。

2.class pyspark.sql.HiveContext(sparkContext, hiveContext=None)

Hive此處暫略

相關文章
相關標籤/搜索