pyspark 學習記錄 2020-01-02

昨天學習和實際操做了pyspark的RDD,今天就到了Dataframe了。sql

#首先初始化和導入數據:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.Builder().appName('Exercise').getOrCreate()

from pyspark.sql import SparkSession
spark=SparkSession.Builder().appName('Exercise').getOrCreate()
df = spark.read.csv('xxxx.csv',inferSchema=True, header\=True)
df.show(3)          #df.show(n)用來查看前面n條記錄和take差很少可是排版不同

|               Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
|2012-01-03 00:00:00|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04 00:00:00|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05 00:00:00|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|

#查看各column的type
df.dtypes

[('Date', 'string'),
 ('Open', 'float'),
 ('High', 'float'),
 ('Low', 'float'),
 ('Close', 'float'),
 ('Volume', 'float'),
 ('Adj Close', 'float'),
 ('Hight', 'float')]

接下來對DF進行操做:
1 基本信息查看app

#查看df的基本信息和pandas.describe()效果差很少
df.describe().show()

|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|

# 查看columns
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close'\]


#輸出表結構,相似於pd.info()
df.printSchema()        #這個函數在我使用的時候存在點問題

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)

生成新的列函數

# 強大的withcolumn函數
df.withColumn('HV Ratio', df['High'] / df['Volume']).select('HV Ratio').show()

|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|

#意思是使用df['High']/df['Volume'] 產生一個新的特徵叫作df['HV Ratio']

#修改列屬性cast(這裏是用withColumn函數覆蓋了原來的列)
df.withColumn('High', df.High.cast('float'))

排序學習

#orderBy
df.orderBy(df['High'].desc())    #按照降序排列,注意這裏排序返回的是整個DF,下面會用到一些有用的函數來獲取單個值

在 pyspark.sql.functions 有着許多能夠常用function(目前看下來都是搭配在select裏面使用的)ui

from pyspark.sql.functions import to_date, year, max, min, avg
df.select(date_format('Date', 'yy-mm-dd')).take(5)

[Row(date_format(Date, yy-mm-dd)='12-00-03'),
 Row(date_format(Date, yy-mm-dd)='12-00-04'),
 Row(date_format(Date, yy-mm-dd)='12-00-05'),
 Row(date_format(Date, yy-mm-dd)='12-00-06'),
 Row(date_format(Date, yy-mm-dd)='12-00-09')]
 
 
year_df = df.withColumn('Year', year(df['Date']))                   
year_df.groupBy('Year').max().select(['Year', 'max(High)']).show()

|Year|max(High)|
+----+---------+
|2015|    90.97|
|2013|    81.37|
|2014|    88.09|
|2012|     77.6|
|2016|    75.19|


df.select(avg('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+


 # groupby函數
 month_df = df.withColumn('month', month(df.Date))  #使用month函數取出df.Date的月份而且單首創建一個特徵
 month_df.groupby('month').avg('Close').orderBy('avg(Close)').show() #先對month進行分組,而後統計Close的avg而後排序
 
|month|       avg(Close)|
|    2|71.30680438169499|
|    1| 71.4480196131338|
|   10| 71.5785454489968|
|    3|71.77794376266337|
|   11|72.11108927207418|
|    9|72.18411782208611|
|    5|72.30971685445533|
|    6| 72.4953774506191|
|   12|72.84792482628012|
|    4|72.97361900692894|
|    8| 73.0298185521906|
|    7|74.43971944078106|

過濾spa

#filter 過濾
df.filter(df.Close <60).count()   #過濾數Close < 60 並計數

81

總結一下:
df.show(3) 查看前3行
df.withColumn() 對列進行操做
df.select() 選擇(pyspark.sql.function裏面存着許多在select函數下能夠使用的方法)
df.groupBy() 分組
df.orderBy() 排序
還有不少今天沒用到具體能夠去看官方文檔code

相關文章
相關標籤/搜索