由於有它,Spark集羣的交互操做變得更簡單

Spark 2.X開發的一個動機是讓它能夠觸及更普遍的受衆,特別是缺少編程技能但可能很是熟悉SQL的數據分析師或業務分析師。所以,Spark 2.X如今比以往更易使用。mysql

在之前的Spark 1.x版本中,主要使用RDD(彈性分佈式數據集),全部的操做都是基於RDD的轉化,而在Spark 2.x中,主要基於DataFrame操做,全部的操做都是基於dataframe進行操做。算法

在本文中,我將重點介紹使用fea spk包如何進行spark的dataframe操做,爲之後進行fea大數據分析作一下鋪墊。使用這種方式的優點在於,能夠利用spark集羣的分佈式原理,對大規模的數據進行分析和處理,步驟以下:sql

一、 建立spk鏈接mongodb

在spark 2.X的操做裏面,使用SparkSession爲Spark集羣提供了惟一的入口點。val spk= SparkSession.builder.  master("local")  .appName("spark session example")  .getOrCreate()編程

而使用fea spk包,須要建立的spk鏈接以下json

spk = @udf df0@sys by spk.open_spark網絡

2. fea spk dataframesession

fea spk操做有2種dataframe,一種是pandas的dataframe,能夠直接在fea裏面運行dump查看。oracle

另一種是spark的dataframe,它可以進行各類各樣的spark算子操做,好比group,agg等。app

spark dataframe須要轉換爲pandas的dataframe才能運行dump命令查看,轉換的原語以下:

pd= @udf df by spk.to_DF  #spark dataframe df轉換爲pandas dataframe pd

dump pd   

#能夠直接使用dump命令查看

sdf= @udf spk,pd by spk.to_SDF 

#將pandas dataframe pd轉換爲spark dataframe sdf,以便進行spark的各類操做。

3. 使用spk鏈接讀取數據

fea spk包支持各類各樣的數據源。如,hive,mongodb,text,avro , json, csv , parquet,mysql,oracle等數據源,下面列舉幾個比較常見的數據源來進行演示。

  • csv數據源

a.csv文件格式以下:

id,hash

1,ssss

2,333

3,5567

使用以下命令,鏈接讀取數據

df= @udf spk by spk.load_csv with (header,/data/a.csv)

pd= @udf df by spk.to_DF

dump pd

  • Mysql數據源

Mysql中student_infos表數據以下:

使用以下命令,鏈接讀取數據

df1= @udf spk by spk.load_mysql with (student_infos)

pd= @udf df1 by spk.to_DF

dump pd

4. 使用spk包 來進行groupby,agg操做

d.csv數據以下

df2= @udf spk by spk.load_csv with (header,/data/d.csv)

df3= @udf df2 by spk.group with (name) 

#對df2表的name字段進行group操做

df4= @udf df3 by spk.agg with (salary:avg,consumer:sum)

#對group以後的df3表的salary字段求均值,consumer字段進行求和操做

pd= @udf df4 by spk.to_DF

dump pd

5 使用spk包來進行join操做

b.csv數據以下

c.csv數據以下

df5= @udf spk by spk.load_csv with (header,/data/b.csv)

df6= @udf spk by spk.load_csv with (header,/data/c.csv)

df7= @udf df5,df6 by spk.join with (name:name1,inner)

#按照df5表的name字段,df6表的name1字段進行join內鏈接

pd= @udf df7 by spk.to_DF

dump pd

6. 使用spk包給表的一列或者多列重命名

對於上面的df7表,把name命名爲name1,age命名爲age1

df8=@udf df7 by spk.rename with (name:name1,age:age1)

pd=@udf df8 by spk.to_DF

dump pd

7 使用spk包對錶按照某種條件進行過濾

以上面的df6表爲例,統計income字段大於3000

df9= @udf df6 by spk.filter with (income>3000)
pd=@udf df9 by spk.to_DF

8. 使用spk包將表註冊成可以使用SQL語句的表

以上面的df7表爲例進行說明,將表註冊爲employee表

a= @udf df7 by spk.df_table with (employee)

使用SQL語句查詢註冊的表,返回DF

df10= @udf spk by spk.df_sql with (select * from employee where income>2000)
pd=@udf df10 by spk.to_DF

dump pd

9 將表保存爲parquet文件格式

以df10爲例,保存目錄爲hdfs的目錄/user/root/employee.parquet

b=@udf df10 by spk.save_parquet with (employee.parquet)

 此外spk還有不少原語,暫時列舉一部分,下面進行spk包機器學習的演示。

使用spk包進行機器學習,真正實現了分佈式機器學習的思想,替代了原始的單機版本的機器學習,大大提升了機器學習的速度和吞吐量。目前spk包支持的機器學習仍是比較完善的,包括邏輯迴歸,決策樹,隨機森林,貝葉斯,神經網絡,Kmeans等算法。

10.使用隨機森林進行分類

m1表的內容以下:

前面4個是特徵,後面label爲標籤,有3種狀況,0,1,2,下面使用隨機森林算法進行模型的訓練。注意,使用spk包進行機器學習,要求表的字段爲double類型,因此要先進行轉換。

m1= @udf m1 by spk.ML_double

md1= @udf m1 by spk.ML_rf with (maxDepth=5, numTrees=10)

md1是訓練出的隨機森林模型

下面進行預測,預測的表爲m2,數據具備4個特徵,不包括標籤列,表格式以下

r1= @udf m2 by spk.ML_predict with (md1@public)
pd=@udf r1 by spk.to_DF

dump pd

prediction這列就是預測的結果

下面對模型進行打分

s1= @udf m1 by spk.ML_score with (md1@public)

dump s1

相關文章
相關標籤/搜索