【原】Spark之機器學習(Python版)(一)——聚類

 

    kmeans聚類相信你們都已經很熟悉了。在Python裏咱們用kmeans一般調用Sklearn包(固然本身寫也很簡單)。那麼在Spark裏能不能也直接使用sklean包呢?目前來講直接使用有點困難,不過我看到spark-packages裏已經有了,但尚未發佈。不過不要緊,PySpark裏有ml包,除了ml包,還可使用MLlib,這個在後期會寫,也很方便。html

  首先來看一下Spark自帶的例子python

 

 1 from pyspark.mllib.linalg import Vectors  2 from pyspark.ml.clustering import KMeans  3 from pyspark.sql import SQLContext  4 from pyspark.mllib.linalg import Vectors  5 #導入數據
 6 data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),),(Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)]  7 df = sqlContext.createDataFrame(data, ["features"])  8 #kmeans模型
 9 kmeans = KMeans(k=2, seed=1) 10 model = kmeans.fit(df) 11 #簇心數量
12 centers = model.clusterCenters() 13 len(centers) 14 #2
15 #訓練模型
16 transformed = model.transform(df).select("features", "prediction") 17 rows = transformed.collect() 18 rows[0].prediction == rows[1].prediction 19 #True
20 rows[2].prediction == rows[3].prediction 21 # True

 

  這個例子很簡單,導入的數據是四個稠密向量(能夠本身在二維向量裏畫一下),設定了兩個簇心,最後驗證預測的結果是否正確,顯示爲True,證實預測正確。算法中具體的參數能夠參考API中的說明。然而實際生產中咱們的數據集不可能以這樣的方式一條條寫進去,通常是讀取文件,關於怎麼讀取文件,能夠具體看個人這篇博文。這裏咱們採用iris數據集(不要問我爲何又是iris數據集,由於真的太方便了)來給你們講解一下。git

  

  個人數據集是csv格式的,而Spark又不能直接讀取csv格式的數據,這裏咱們有兩個方式,一是我提到的這篇博文裏有寫怎麼讀取csv文件,二是安裝spark-csv包(在這裏下載),github地址在這裏。按照步驟安裝能夠了。這裏友情提示一下你們,github的安裝方法是:
github

 

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0

  

  若是報錯了,能夠把 --packages 換成 --jars,若是仍是不行,在加一個 common-csv.jars包放到lib下面就能夠了。我由於這個耽誤了很多時間,不過具體問題也得具體分析。算法

  安裝好這個包之後,就能夠讀取數據了sql

  

1 from pyspark.sql import SQLContext 2 sqlContext = SQLContext(sc) 3 data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('iris.csv') 4 data.show()

  

  讀取數據之後,咱們來看一下數據集:shell

  

 1 +------+------------+-----------+------------+-----------+-------+
 2 |row.id|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|
 3 +------+------------+-----------+------------+-----------+-------+
 4 |     1|         5.1|        3.5|         1.4|        0.2|      0|
 5 |     2|         4.9|        3.0|         1.4|        0.2|      0|
 6 |     3|         4.7|        3.2|         1.3|        0.2|      0|
 7 |     4|         4.6|        3.1|         1.5|        0.2|      0|
 8 |     5|         5.0|        3.6|         1.4|        0.2|      0|
 9 |     6|         5.4|        3.9|         1.7|        0.4|      0|
10 |     7|         4.6|        3.4|         1.4|        0.3|      0|
11 |     8|         5.0|        3.4|         1.5|        0.2|      0|
12 |     9|         4.4|        2.9|         1.4|        0.2|      0|
13 |    10|         4.9|        3.1|         1.5|        0.1|      0|
14 |    11|         5.4|        3.7|         1.5|        0.2|      0|
15 |    12|         4.8|        3.4|         1.6|        0.2|      0|
16 |    13|         4.8|        3.0|         1.4|        0.1|      0|
17 |    14|         4.3|        3.0|         1.1|        0.1|      0|
18 |    15|         5.8|        4.0|         1.2|        0.2|      0|
19 |    16|         5.7|        4.4|         1.5|        0.4|      0|
20 |    17|         5.4|        3.9|         1.3|        0.4|      0|
21 |    18|         5.1|        3.5|         1.4|        0.3|      0|
22 |    19|         5.7|        3.8|         1.7|        0.3|      0|
23 |    20|         5.1|        3.8|         1.5|        0.3|      0|
24 +------+------------+-----------+------------+-----------+-------+
25 only showing top 20 rows

  

  第二步:提取特徵apache

  咱們在上一步導入的數據中label是String類型的,但在Spark中要變成數值型才能計算,否則就會報錯。能夠利用StringIndexer功能將字符串轉化爲數值型api

 

1 from pyspark.ml.feature import StringIndexer 2 
3 feature = StringIndexer(inputCol="Species", outputCol="targetlabel") 4 target = feature.fit(data).transform(data) 5 target.show()

 

  targetlabel這一列就是Species轉化成數值型的結果機器學習

 

 1 +------+------------+-----------+------------+-----------+-------+-----------+
 2 |row.id|Sepal.Length|Sepal.Width|Petal.Length|Petal.Width|Species|targetlabel|
 3 +------+------------+-----------+------------+-----------+-------+-----------+
 4 |     1|         5.1|        3.5|         1.4|        0.2|      0|        0.0|
 5 |     2|         4.9|        3.0|         1.4|        0.2|      0|        0.0|
 6 |     3|         4.7|        3.2|         1.3|        0.2|      0|        0.0|
 7 |     4|         4.6|        3.1|         1.5|        0.2|      0|        0.0|
 8 |     5|         5.0|        3.6|         1.4|        0.2|      0|        0.0|
 9 |     6|         5.4|        3.9|         1.7|        0.4|      0|        0.0|
10 |     7|         4.6|        3.4|         1.4|        0.3|      0|        0.0|
11 |     8|         5.0|        3.4|         1.5|        0.2|      0|        0.0|
12 |     9|         4.4|        2.9|         1.4|        0.2|      0|        0.0|
13 |    10|         4.9|        3.1|         1.5|        0.1|      0|        0.0|
14 |    11|         5.4|        3.7|         1.5|        0.2|      0|        0.0|
15 |    12|         4.8|        3.4|         1.6|        0.2|      0|        0.0|
16 |    13|         4.8|        3.0|         1.4|        0.1|      0|        0.0|
17 |    14|         4.3|        3.0|         1.1|        0.1|      0|        0.0|
18 |    15|         5.8|        4.0|         1.2|        0.2|      0|        0.0|
19 |    16|         5.7|        4.4|         1.5|        0.4|      0|        0.0|
20 |    17|         5.4|        3.9|         1.3|        0.4|      0|        0.0|
21 |    18|         5.1|        3.5|         1.4|        0.3|      0|        0.0|
22 |    19|         5.7|        3.8|         1.7|        0.3|      0|        0.0|
23 |    20|         5.1|        3.8|         1.5|        0.3|      0|        0.0|
24 +------+------------+-----------+------------+-----------+-------+-----------+
25 only showing top 20 rows

  

  最後一步:模型訓練和驗證

 

 1 from pyspark.sql import Row
 2 from pyspark.ml.clustering import KMeans
 3 from pyspark.mllib.linalg import Vectors
 4  
 5 #把數據格式轉化成稠密向量
 6 def transData(row):
 7     return Row(label=row["targetlabel"],
 8                features=Vectors.dense([row["Sepal.Length"],
 9                    row["Sepal.Width"],
10                    row["Petal.Length"],
11                    row["Petal.Width"]]))
12  
13 #轉化成Dataframe格式
14 transformed = target.map(transData).toDF()
15 kmeans = KMeans(k=3)
16 model = kmeans.fit(transformed) 
17 
18 predict_data = model.transform(transformed)
19 
20 train_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 
21 total = predict_data.count()
22 print traing_err, total, float(train_err)/total

 

  到這一步就結束了。總結一下,用pyspark作機器學習時,數據格式要轉成須要的格式,否則很容易出錯。下週寫pyspark在機器學習中如何作分類。

相關文章
相關標籤/搜索