pyspark中的DataFrame等價於Spark SQL中的一個關係表。在pyspark中,DataFrame由Column和Row構成。html
在操做DataFrame以前,首先須要建立SparkSession,經過SparkSession來操做DataFrame。python
1,建立SparkSessionsql
經過Builder類來建立SparkSession,在Databricks Notebook中,spark是默認建立,表示一個SparkSession對象:apache
spark = SparkSession.builder \ .master("local") \ .appName("Word Count") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
函數註釋:json
master(master):用於設置要鏈接的Spark的master URL,例如local表示在本地運行,local[4] 在本地使用4核運行,api
appName(name):爲application設置一個名字app
config(key=None, value=None, conf=None):設置SparkSession的配置選項,函數
getOrCreate():得到一個已存在的或者建立一個新的SparkSessionfetch
2,從常量數據中建立DataFrameui
從RDD、list或pandas.DataFrame 建立DataFrame:
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
3,從SQL查詢中建立DataFrame
從一個給定的SQL查詢或Table中獲取DataFrame,舉個例子:
df.createOrReplaceTempView("table1") #use SQL query to fetch data df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1") #use table to fetch data df2 = spark.table("table1")
4,屬性
read:該屬性是DataFrameReader 對象,用於讀取數據,返回DataFrame對象
readStream:該屬性是DataStreamReader對象,用於讀取Data Stream,返回 流式的DataFrame對象( streaming DataFrame)
從外部存儲系統中讀取數據,返回DataFrame對象,一般使用SparkSession.read來訪問,通用語法是先調用format()函數來指定輸入數據的格式,後調用load()函數從數據源加載數據,並返回DataFrame對象:
df = spark.read.format('json').load('python/test_support/sql/people.json')
對於不一樣的格式,DataFrameReader類有細分的函數來加載數據:
df_csv = spark.read.csv('python/test_support/sql/ages.csv') df_json = spark.read.json('python/test_support/sql/people.json') df_txt = spark.read.text('python/test_support/sql/text-test.txt') df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned') # read a table as a DataFrame df = spark.read.parquet('python/test_support/sql/parquet_partitioned') df.createOrReplaceTempView('tmpTable') spark.read.table('tmpTable')
還能夠經過jdbc,從JDBC URL中構建DataFrame
jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
用於把DataFrame寫入到外部存儲系統中,經過DataFrame.write來訪問。
(df.write.format('parquet') .mode("overwrite") .saveAsTable('bucketed_table'))
函數註釋:
saveAsTable
(name, format=None, mode=None, partitionBy=None, **options):把DataFrame 存儲爲表save
(path=None, format=None, mode=None, partitionBy=None, **options):把DataFrame存儲到數據源中對於不一樣的格式,DataFrameWriter類有細分的函數來加載數據:
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) df.write.txt(os.path.join(tempfile.mkdtemp(), 'data')) #wirte data to external database via jdbc jdbc(url, table, mode=None, properties=None)
DataFrame等價於Spark SQL中的關係表,
1,常規操做
從parquet 文件中讀取數據,返回一個DataFrame對象:
people = spark.read.parquet("...")
從DataFrame對象返回一列:
ageCol = people.age
從DataFrame對象中row的集合:
people.collect()
從DataFrame對象中刪除列:
people.drop(*cols)
2,建立臨時視圖
能夠建立全局臨時視圖,也能夠建立本地臨時視圖,對於local view,臨時視圖的生命週期和SparkSession相同;對於global view,臨時視圖的生命週期由Spark application決定。
createOrReplaceGlobalTempView(name)
createGlobalTempView(name)
createOrReplaceTempView(name)
createTempView(name)
3,DataFrame數據的查詢
df.filter(df.age > 3) df.select('name', 'age') # join cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age) #group by df.groupBy('name').agg({'age': 'mean'})
DataFrame.groupBy() 返回的是GroupedData類,能夠對分組數據應用聚合函數、apply函數。
df3.groupBy().max('age', 'height').collect()
請參考官方手冊,再也不贅述。
參考文檔: