Azure Databricks 第二篇:pyspark.sql 簡介

pyspark中的DataFrame等價於Spark SQL中的一個關係表。在pyspark中,DataFrame由Column和Row構成。html

  • pyspark.sql.SparkSession:是DataFrame和SQL函數的主要入口
  • DataFrameReader:讀取數據,返回DataFrame
  • DataFrameWriter:把DataFrame存儲到其餘存儲系統
  • pyspark.sql.DataFrame、pyspark.sql.Column和 pyspark.sql.Row

一,SparkSession類

在操做DataFrame以前,首先須要建立SparkSession,經過SparkSession來操做DataFrame。python

1,建立SparkSessionsql

經過Builder類來建立SparkSession,在Databricks Notebook中,spark是默認建立,表示一個SparkSession對象:apache

spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

函數註釋:json

master(master):用於設置要鏈接的Spark的master URL,例如local表示在本地運行,local[4] 在本地使用4核運行,api

appName(name):爲application設置一個名字app

config(key=Nonevalue=Noneconf=None):設置SparkSession的配置選項,函數

getOrCreate():得到一個已存在的或者建立一個新的SparkSessionfetch

2,從常量數據中建立DataFrameui

從RDD、list或pandas.DataFrame 建立DataFrame:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

3,從SQL查詢中建立DataFrame

從一個給定的SQL查詢或Table中獲取DataFrame,舉個例子:

df.createOrReplaceTempView("table1")

#use SQL query to fetch data
df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")

#use table to fetch data
df2 = spark.table("table1")

4,屬性

read:該屬性是DataFrameReader 對象,用於讀取數據,返回DataFrame對象

readStream:該屬性是DataStreamReader對象,用於讀取Data Stream,返回 流式的DataFrame對象( streaming DataFrame)

二,DataFrameReader類

從外部存儲系統中讀取數據,返回DataFrame對象,一般使用SparkSession.read來訪問,通用語法是先調用format()函數來指定輸入數據的格式,後調用load()函數從數據源加載數據,並返回DataFrame對象:

df = spark.read.format('json').load('python/test_support/sql/people.json')

對於不一樣的格式,DataFrameReader類有細分的函數來加載數據:

df_csv = spark.read.csv('python/test_support/sql/ages.csv')
df_json = spark.read.json('python/test_support/sql/people.json')
df_txt = spark.read.text('python/test_support/sql/text-test.txt')
df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned')

# read a table as a DataFrame
df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
df.createOrReplaceTempView('tmpTable')
spark.read.table('tmpTable')

還能夠經過jdbc,從JDBC URL中構建DataFrame

jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

三,DataFrameWriter類

用於把DataFrame寫入到外部存儲系統中,經過DataFrame.write來訪問。

(df.write.format('parquet')  
    .mode("overwrite")
    .saveAsTable('bucketed_table'))

函數註釋:

  • format(source):指定底層輸出的源的格式
  • mode(saveMode):當數據或表已經存在時,指定數據存儲的行爲,保存的模式有:append、overwrite、error和ignore。
  • saveAsTable(nameformat=Nonemode=NonepartitionBy=None**options):把DataFrame 存儲爲表
  • save(path=Noneformat=Nonemode=NonepartitionBy=None**options):把DataFrame存儲到數據源中

對於不一樣的格式,DataFrameWriter類有細分的函數來加載數據:

df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.txt(os.path.join(tempfile.mkdtemp(), 'data'))

#wirte data to external database via jdbc
jdbc(url, table, mode=None, properties=None)

四,DataFrame操做

DataFrame等價於Spark SQL中的關係表,

1,常規操做

從parquet 文件中讀取數據,返回一個DataFrame對象:

people = spark.read.parquet("...")

從DataFrame對象返回一列:

ageCol = people.age

從DataFrame對象中row的集合:

people.collect()

從DataFrame對象中刪除列:

people.drop(*cols)

2,建立臨時視圖

能夠建立全局臨時視圖,也能夠建立本地臨時視圖,對於local view,臨時視圖的生命週期和SparkSession相同;對於global view,臨時視圖的生命週期由Spark application決定。

createOrReplaceGlobalTempView(name)
createGlobalTempView(name)
createOrReplaceTempView(name)
createTempView(name)

3,DataFrame數據的查詢

df.filter(df.age > 3)
df.select('name', 'age')

# join
cond = [df.name == df3.name, df.age == df3.age]
df.join(df3, cond, 'outer').select(df.name, df3.age)

#group by 
df.groupBy('name').agg({'age': 'mean'})

五,分組數據

DataFrame.groupBy() 返回的是GroupedData類,能夠對分組數據應用聚合函數、apply函數。

df3.groupBy().max('age', 'height').collect()

請參考官方手冊,再也不贅述。

 

 

參考文檔:

pyspark.sql module

相關文章
相關標籤/搜索