寫這個系列是由於最近公司在搞技術分享,學習Spark,個人任務是講PySpark的應用,由於我主要用Python,結合Spark,就講PySpark了。然而我在學習的過程當中發現,PySpark很雞肋(至少如今我以爲我不會拿PySpark作開發)。爲何呢?緣由以下:html
1.PySpark支持的算法太少了。咱們看一下PySpark支持的算法:(參考官方文檔)python
前面兩個pyspark.sql和pyspark.streaming是對sql和streaming的支持。主要是讀取數據,和streaming處理這種方式(固然這是spark的優點,要是這也不支持真是見鬼了)。pyspark.ml和pyspark.mllib分別是ml的api和mllib的api,ml的算法真心少啊,並且支持的功能頗有限,譬如Lr(邏輯迴歸)和GBT目前只支持二分類,不支持多分類。mllib相對好點,支持的算法也多點,雖然昨天發的博文講mlllib的時候說過有的算法不支持分佈式,因此纔會有限,可是我在想,若是我須要用到A算法,而Ml和Mllib的包裏面都沒有,這樣是否是意味着要本身開發分佈式算法呢?代價有點大誒,感受寫這個的時間不如多找找有用的特徵,而後上LR,這樣效果說不定更好。由於目前尚未在實際中用過,因此以上只是個人想法。下面把ml和mllib的全部api列出來,這樣看的更清楚。算法
圖一 pyspark.ml的apisql
圖二 pyspark.mllib的apiapache
從上面兩張圖能夠看到,mllib的功能比ml強大的不是一點半點啊,那ml這個包的存在還有什麼意義呢?不懂(若是有了解的歡迎留言)。雖然有這麼多疑問,可是我仍是跟你們講了,用的數據依然是iris(其實我真心想換個數據集啊 == ,下次換)。上代碼:編程
1 from pyspark.sql import SQLContext 2 sqlContext = SQLContext(sc) 3 df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('iris.csv') 4 # Displays the content of the DataFrame to stdout 5 df.show() 6 7 8 from pyspark.ml.feature import StringIndexer 9 indexer = StringIndexer(inputCol="Species", outputCol="labelindex") 10 indexed = indexer.fit(df).transform(df) 11 indexed.show() 12 13 from pyspark.sql import Row 14 from pyspark.mllib.linalg import Vectors 15 from pyspark.ml.classification import NaiveBayes 16 17 # Load and parse the data 18 def parseRow(row): 19 return Row(label=row["labelindex"], 20 features=Vectors.dense([row["Sepal.Length"], 21 row["Sepal.Width"], 22 row["Petal.Length"], 23 row["Petal.Width"]])) 24 25 ## Must convert to dataframe after mapping 26 parsedData = indexed.map(parseRow).toDF() 27 labeled = StringIndexer(inputCol="label", outputCol="labelpoint") 28 data = labeled.fit(parsedData).transform(parsedData) 29 data.show() 30 31 ## 訓練模型 32 #Naive Bayes 33 nb = NaiveBayes(smoothing=1.0, modelType="multinomial") 34 model_NB = nb.fit(data) 35 predict_data= model_NB.transform(data) 36 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 37 total = predict_data.count() 38 nb_scores = float(traing_err)/total 39 print traing_err, total, nb_scores 40 #7 150 0.0466666666667 41 42 43 #Logistic Regression########################################################### 44 # Logistic regression. Currently, this class only supports binary classification. 45 from pyspark.ml.classification import LogisticRegression 46 lr = LogisticRegression(maxIter=5, regParam=0.01) 47 model_lr = lr.fit(data) 48 predict_data= model_lr.transform(data) 49 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 50 total = predict_data.count() 51 lr_scores = float(traing_err)/total 52 print traing_err, total, float(traing_err)/total 53 54 55 #Decision Tree 56 from pyspark.ml.classification import DecisionTreeClassifier 57 dt = DecisionTreeClassifier(maxDepth=2,labelCol = 'labelpoint') 58 model_DT= dt.fit(data) 59 predict_data= model_DT.transform(data) 60 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 61 total = predict_data.count() 62 dt_scores = float(traing_err)/total 63 print traing_err, total, float(traing_err)/total 64 65 66 #GBT########################################################### 67 ## GBT. Currently, this class only supports binary classification. 68 from pyspark.ml.classification import GBTClassifier 69 gbt = GBTClassifier(maxIter=5, maxDepth=2,labelCol="labelpoint") 70 model_gbt = gbt.fit(data) 71 predict_data= model_gbt.transform(data) 72 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 73 total = predict_data.count() 74 dt_scores = float(traing_err)/total 75 print traing_err, total, float(traing_err)/total 76 77 78 #Random Forest 79 from pyspark.ml.classification import RandomForestClassifier 80 rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="labelpoint", seed=42) 81 model_rf= rf.fit(data) 82 predict_data= model_rf.transform(data) 83 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 84 total = predict_data.count() 85 dt_scores = float(traing_err)/total 86 print traing_err, total, float(traing_err)/total 87 88 #MultilayerPerceptronClassifier########################################################### 89 # Classifier trainer based on the Multilayer Perceptron. Each layer has sigmoid activation function, output layer has softmax. 90 # Number of inputs has to be equal to the size of feature vectors. Number of outputs has to be equal to the total number of labels. 91 from pyspark.ml.classification import MultilayerPerceptronClassifier 92 mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[150, 5, 150], blockSize=1, seed=11) 93 model_mlp= mlp.fit(parsedData) 94 predict_data= model_mlp.transform(parsedData) 95 traing_err = predict_data.filter(predict_data['label'] != predict_data['prediction']).count() 96 total = predict_data.count() 97 dt_scores = float(traing_err)/total 98 print traing_err, total, float(traing_err)/total
由於數據集和上次講pyspark聚類應用的數據是同樣的,就不一步步的展現了,可是我這個程序裏只有NaiveBayes的效果還行,0.94的正確率,其餘的像DecisionTree等,效果然心差,可能參數還須要調。先掌握怎麼用再來調參,官方文檔裏關於參數的解釋也很是詳細,能夠看看。下一次講迴歸,我決定不僅寫pyspark.ml的應用了,由於實在是圖樣圖naive,想弄清楚pyspark的機器學習算法是怎麼運行的,跟普通的算法運行有什麼區別,優點等,再寫個pyspark.mllib,看相同的算法在ml和mllib的包裏運行效果有什麼差別,若是有,是爲何,去看源碼怎麼寫的。此外,我真的想弄清楚這貨在實際生產中到底有用嗎,畢竟仍是要落實生產的,我以前想,若是python的sklearn可以在spark上應用就行了,後來在databricks裏面找到了一個包好像是準備把sklearn弄到spark上來,固然算法確定要從新寫,不過尚未發佈,期待發布的時候。此外,我在知乎上也看到過有人提問說「spark上能用skearn嗎?」(大概是這意思,應該很好搜),裏面有個回答好像說能夠,不過不是直接用(等我找到了把連接放出來)。其實換一種想法,不用spark也行,直接用mapreduce編程序,可是mapreduce慢啊(此處不嚴謹,由於並無測試過二者的性能差別,待補充),在我使用spark的短暫時間內,我我的認爲spark的優點在於數據處理快,它不須要像mapreduce同樣把數據切分紅這麼多塊計算而後再reduce合併,而是直接將數據導入的時候就指定分區,運行機制不一樣,尤爲是spark streaming的功能,仍是很快的,因此這是spark的優點(鄙人拙見,若有錯誤歡迎指出)。而spark的劣勢也比較明顯,由於它對設備的要求過高了(吃內存啊能不高嗎!),這也是它快的緣由,你把數據都放在內存裏,取的時間比放在磁盤裏固然要快,不過實際上在存儲數據或者輸出結果的時候仍是會選擇(memory+disk)的方式,保險嘛。前段時間看的alluxio也是佔了內存的優點。恩,說了不少廢話。下週爭取研究的深一點,否則在公司裏講都沒人聽 = =。api