昨天學習和實際操做了pyspark的RDD,今天就到了Dataframe了。sql
#首先初始化和導入數據: import findspark findspark.init() from pyspark.sql import SparkSession spark = SparkSession.Builder().appName('Exercise').getOrCreate() from pyspark.sql import SparkSession spark=SparkSession.Builder().appName('Exercise').getOrCreate() df = spark.read.csv('xxxx.csv',inferSchema=True, header\=True) df.show(3) #df.show(n)用來查看前面n條記錄和take差很少可是排版不同 | Date| Open| High| Low| Close| Volume| Adj Close| |2012-01-03 00:00:00| 59.970001|61.060001|59.869999| 60.330002|12668800|52.619234999999996| |2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300| 52.078475| |2012-01-05 00:00:00| 59.349998|59.619999|58.369999| 59.419998|12768200| 51.825539| #查看各column的type df.dtypes [('Date', 'string'), ('Open', 'float'), ('High', 'float'), ('Low', 'float'), ('Close', 'float'), ('Volume', 'float'), ('Adj Close', 'float'), ('Hight', 'float')]
接下來對DF進行操做:
1 基本信息查看app
#查看df的基本信息和pandas.describe()效果差很少 df.describe().show() |summary| Open| High| Low| Close| Volume| Adj Close| +-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+ | count| 1258| 1258| 1258| 1258| 1258| 1258| | mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146| | stddev| 6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991| 4519780.8431556|6.722609449996857| | min|56.389998999999996| 57.060001| 56.299999| 56.419998| 2094900| 50.363689| | max| 90.800003| 90.970001| 89.25| 90.470001| 80898100|84.91421600000001| # 查看columns df.columns ['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'\] #輸出表結構,相似於pd.info() df.printSchema() #這個函數在我使用的時候存在點問題 root |-- Date: timestamp (nullable = true) |-- Open: double (nullable = true) |-- High: double (nullable = true) |-- Low: double (nullable = true) |-- Close: double (nullable = true) |-- Volume: integer (nullable = true) |-- Adj Close: double (nullable = true)
生成新的列函數
# 強大的withcolumn函數 df.withColumn('HV Ratio', df['High'] / df['Volume']).select('HV Ratio').show() | HV Ratio| +--------------------+ |4.819714653321546E-6| |6.290848613094555E-6| |4.669412994783916E-6| |7.367338463826307E-6| |8.915604778943901E-6| |8.644477436914568E-6| |9.351828421515645E-6| | 8.29141562102703E-6| |7.712212102001476E-6| #意思是使用df['High']/df['Volume'] 產生一個新的特徵叫作df['HV Ratio'] #修改列屬性cast(這裏是用withColumn函數覆蓋了原來的列) df.withColumn('High', df.High.cast('float'))
排序學習
#orderBy df.orderBy(df['High'].desc()) #按照降序排列,注意這裏排序返回的是整個DF,下面會用到一些有用的函數來獲取單個值
在 pyspark.sql.functions 有着許多能夠常用function(目前看下來都是搭配在select裏面使用的)ui
from pyspark.sql.functions import to_date, year, max, min, avg df.select(date_format('Date', 'yy-mm-dd')).take(5) [Row(date_format(Date, yy-mm-dd)='12-00-03'), Row(date_format(Date, yy-mm-dd)='12-00-04'), Row(date_format(Date, yy-mm-dd)='12-00-05'), Row(date_format(Date, yy-mm-dd)='12-00-06'), Row(date_format(Date, yy-mm-dd)='12-00-09')] year_df = df.withColumn('Year', year(df['Date'])) year_df.groupBy('Year').max().select(['Year', 'max(High)']).show() |Year|max(High)| +----+---------+ |2015| 90.97| |2013| 81.37| |2014| 88.09| |2012| 77.6| |2016| 75.19| df.select(avg('Close')).show() +-----------------+ | avg(Close)| +-----------------+ |72.38844998012726| +-----------------+ # groupby函數 month_df = df.withColumn('month', month(df.Date)) #使用month函數取出df.Date的月份而且單首創建一個特徵 month_df.groupby('month').avg('Close').orderBy('avg(Close)').show() #先對month進行分組,而後統計Close的avg而後排序 |month| avg(Close)| | 2|71.30680438169499| | 1| 71.4480196131338| | 10| 71.5785454489968| | 3|71.77794376266337| | 11|72.11108927207418| | 9|72.18411782208611| | 5|72.30971685445533| | 6| 72.4953774506191| | 12|72.84792482628012| | 4|72.97361900692894| | 8| 73.0298185521906| | 7|74.43971944078106|
過濾spa
#filter 過濾 df.filter(df.Close <60).count() #過濾數Close < 60 並計數 81
總結一下:
df.show(3) 查看前3行
df.withColumn() 對列進行操做
df.select() 選擇(pyspark.sql.function裏面存着許多在select函數下能夠使用的方法)
df.groupBy() 分組
df.orderBy() 排序
還有不少今天沒用到具體能夠去看官方文檔code