Spark 2.X開發的一個動機是讓它能夠觸及更普遍的受衆,特別是缺少編程技能但可能很是熟悉SQL的數據分析師或業務分析師。所以,Spark 2.X如今比以往更易使用。mysql
在之前的Spark 1.x版本中,主要使用RDD(彈性分佈式數據集),全部的操做都是基於RDD的轉化,而在Spark 2.x中,主要基於DataFrame操做,全部的操做都是基於dataframe進行操做。算法
在本文中,我將重點介紹使用fea spk包如何進行spark的dataframe操做,爲之後進行fea大數據分析作一下鋪墊。使用這種方式的優點在於,能夠利用spark集羣的分佈式原理,對大規模的數據進行分析和處理,步驟以下:sql
一、 建立spk鏈接mongodb
在spark 2.X的操做裏面,使用SparkSession爲Spark集羣提供了惟一的入口點。val spk= SparkSession.builder. master("local") .appName("spark session example") .getOrCreate()編程
而使用fea spk包,須要建立的spk鏈接以下json
spk = @udf df0@sys by spk.open_spark網絡
2. fea spk dataframesession
fea spk操做有2種dataframe,一種是pandas的dataframe,能夠直接在fea裏面運行dump查看。oracle
另一種是spark的dataframe,它可以進行各類各樣的spark算子操做,好比group,agg等。app
spark dataframe須要轉換爲pandas的dataframe才能運行dump命令查看,轉換的原語以下:
pd= @udf df by spk.to_DF #spark dataframe df轉換爲pandas dataframe pd
dump pd
#能夠直接使用dump命令查看
sdf= @udf spk,pd by spk.to_SDF
#將pandas dataframe pd轉換爲spark dataframe sdf,以便進行spark的各類操做。
3. 使用spk鏈接讀取數據
fea spk包支持各類各樣的數據源。如,hive,mongodb,text,avro , json, csv , parquet,mysql,oracle等數據源,下面列舉幾個比較常見的數據源來進行演示。
csv數據源
a.csv文件格式以下:
id,hash
1,ssss
2,333
3,5567
使用以下命令,鏈接讀取數據
df= @udf spk by spk.load_csv with (header,/data/a.csv)
pd= @udf df by spk.to_DF
dump pd
Mysql數據源
Mysql中student_infos表數據以下:
使用以下命令,鏈接讀取數據
df1= @udf spk by spk.load_mysql with (student_infos)
pd= @udf df1 by spk.to_DF
dump pd
4. 使用spk包 來進行groupby,agg操做
d.csv數據以下
df2= @udf spk by spk.load_csv with (header,/data/d.csv)
df3= @udf df2 by spk.group with (name)
#對df2表的name字段進行group操做
df4= @udf df3 by spk.agg with (salary:avg,consumer:sum)
#對group以後的df3表的salary字段求均值,consumer字段進行求和操做
pd= @udf df4 by spk.to_DF
dump pd
5 使用spk包來進行join操做
b.csv數據以下
c.csv數據以下
df5= @udf spk by spk.load_csv with (header,/data/b.csv)
df6= @udf spk by spk.load_csv with (header,/data/c.csv)
df7= @udf df5,df6 by spk.join with (name:name1,inner)
#按照df5表的name字段,df6表的name1字段進行join內鏈接
pd= @udf df7 by spk.to_DF
dump pd
6. 使用spk包給表的一列或者多列重命名
對於上面的df7表,把name命名爲name1,age命名爲age1
df8=@udf df7 by spk.rename with (name:name1,age:age1)
pd=@udf df8 by spk.to_DF
dump pd
7 使用spk包對錶按照某種條件進行過濾
以上面的df6表爲例,統計income字段大於3000
df9= @udf df6 by spk.filter with (income>3000)
pd=@udf df9 by spk.to_DF
8. 使用spk包將表註冊成可以使用SQL語句的表
以上面的df7表爲例進行說明,將表註冊爲employee表
a= @udf df7 by spk.df_table with (employee)
使用SQL語句查詢註冊的表,返回DF
df10= @udf spk by spk.df_sql with (select * from employee where income>2000)
pd=@udf df10 by spk.to_DF
dump pd
9 將表保存爲parquet文件格式
以df10爲例,保存目錄爲hdfs的目錄/user/root/employee.parquet
b=@udf df10 by spk.save_parquet with (employee.parquet)
此外spk還有不少原語,暫時列舉一部分,下面進行spk包機器學習的演示。
使用spk包進行機器學習,真正實現了分佈式機器學習的思想,替代了原始的單機版本的機器學習,大大提升了機器學習的速度和吞吐量。目前spk包支持的機器學習仍是比較完善的,包括邏輯迴歸,決策樹,隨機森林,貝葉斯,神經網絡,Kmeans等算法。
10.使用隨機森林進行分類
m1表的內容以下:
前面4個是特徵,後面label爲標籤,有3種狀況,0,1,2,下面使用隨機森林算法進行模型的訓練。注意,使用spk包進行機器學習,要求表的字段爲double類型,因此要先進行轉換。
m1= @udf m1 by spk.ML_double
md1= @udf m1 by spk.ML_rf with (maxDepth=5, numTrees=10)
md1是訓練出的隨機森林模型
下面進行預測,預測的表爲m2,數據具備4個特徵,不包括標籤列,表格式以下
r1= @udf m2 by spk.ML_predict with (md1@public)
pd=@udf r1 by spk.to_DF
dump pd
prediction這列就是預測的結果
下面對模型進行打分
s1= @udf m1 by spk.ML_score with (md1@public)
dump s1