隨機森林是經常使用的機器學習算法,既能夠用於分類問題,也可用於迴歸問題。本文對scikit-learn、Spark MLlib、DolphinDB、xgboost四個平臺的隨機森林算法實現進行對比測試。評價指標包括內存佔用、運行速度和分類準確性。本次測試使用模擬生成的數據做爲輸入進行二分類訓練,並用生成的模型對模擬數據進行預測。html
本次測試使用的各平臺版本以下:node
scikit-learn:Python 3.7.1,scikit-learn 0.20.2python
Spark MLlib:Spark 2.0.2,Hadoop 2.7.2git
DolphinDB:0.82github
xgboost:Python package,0.81算法
CPU:Intel(R) Xeon(R) CPU E5-2650 v4 2.20GHz(共24核48線程)sql
RAM:512GB數據庫
操做系統:CentOS Linux release 7.5.1804apache
在各平臺上進行測試時,都會把數據加載到內存中再進行計算,所以隨機森林算法的性能與磁盤無關。bootstrap
本次測試使用DolphinDB腳本產生模擬數據,並導出爲CSV文件。訓練集平均分紅兩類,每一個類別的特徵列分別服從兩個中心不一樣,標準差相同,且兩兩獨立的多元正態分佈N(0, 1)和N(2/sqrt(20), 1)。訓練集中沒有空值。
假設訓練集的大小爲n行p列。本次測試中n的取值爲10,000、100,000、1,000,000,p的取值爲50。
因爲測試集和訓練集獨立同分布,測試集的大小對模型準確性評估沒有顯著影響。本次測試對於全部不一樣大小的訓練集都採用1000行的模擬數據做爲測試集。
產生模擬數據的DolphinDB腳本見附錄1。
在各個平臺中都採用如下參數進行隨機森林模型訓練:
在測試xgboost時,嘗試了參數nthread(表示運行時的併發線程數)的不一樣取值。但當該參數取值爲本次測試環境的線程數(48)時,性能並不理想。進一步觀察到,在線程數小於10時,性能與取值成正相關。在線程數大於10小於24時,不一樣取值的性能差別不明顯,此後,線程數增長時性能反而降低。該現象在xgboost社區中也有人討論過。所以,本次測試在xgboost中最終使用的線程數爲24。
測試腳本見附錄2~5。
當樹的數量爲500,最大深度爲10時,測試結果以下表所示:
當樹的數量爲500,最大深度爲30時,測試結果以下表所示:
從準確率上看,Python scikit-learn、Spark MLlib和DolphinDB的準確率比較相近,略高於xgboost的實現;從性能上看,從高到低依次爲DolphinDB、Python scikit-learn、xgboost、Spark MLlib。
在本次測試中,Python scikit-learn的實現使用了全部CPU核。
Spark MLlib的實現沒有充分使用全部CPU核,內存佔用最高,當數據量爲10,000時,CPU峯值佔用率約8%,當數據量爲100,000時,CPU峯值佔用率約爲25%,當數據量爲1,000,000時,它會由於內存不足而中斷執行。
DolphinDB database 的實現使用了全部CPU核,而且它是全部實現中速度最快的,但內存佔用是scikit-learn的2-7倍,是xgboost的3-9倍。DolphinDB的隨機森林算法實現提供了numJobs參數,能夠經過調整該參數來下降並行度,從而減小內存佔用。詳情請參考DolphinDB用戶手冊。
xgboost經常使用於boosted trees的訓練,也能進行隨機森林算法。它是算法迭代次數爲1時的特例。xgboost實際上在24線程左右時性能最高,其對CPU線程的利用率不如Python和DolphinDB,速度也不及二者。其優點在於內存佔用最少。另外,xgboost的具體實現也和其餘平臺的實現有所差別。例如,沒有bootstrap這一過程,對數據使用無放回抽樣而不是有放回抽樣。這能夠解釋爲什麼它的準確率略低於其它平臺。
Python scikit-learn的隨機森林算法實如今性能、內存開銷和準確率上的表現比較均衡,Spark MLlib的實如今性能和內存開銷上的表現遠遠不如其餘平臺。DolphinDB的隨機森林算法實現性能最優,而且DolphinDB的隨機森林算法和數據庫是無縫集成的,用戶能夠直接對數據庫中的數據進行訓練和預測,而且提供了numJobs參數,實現內存和速度之間的平衡。而xgboost的隨機森林只是迭代次數爲1時的特例,具體實現和其餘平臺差別較大,最佳的應用場景爲boosted tree。
1. 模擬生成數據的DolphinDB腳本
def genNormVec(cls, a, stdev, n) { return norm(cls * a, stdev, n)}def genNormData(dataSize, colSize, clsNum, scale, stdev) { t = table(dataSize:0, `cls join ("col" + string(0..(colSize-1))), INT join take(DOUBLE,colSize)) classStat = groupby(count,1..dataSize, rand(clsNum, dataSize)) for(row in classStat){ cls = row.groupingKey classSize = row.count cols = [take(cls, classSize)] for (i in 0:colSize) cols.append!(genNormVec(cls, scale, stdev, classSize)) tmp = table(dataSize:0, `cls join ("col" + string(0..(colSize-1))), INT join take(DOUBLE,colSize)) insert into t values (cols) cols = NULL tmp = NULL } return t}colSize = 50clsNum = 2t1m = genNormData(10000, colSize, clsNum, 2 / sqrt(20), 1.0)saveText(t1m, "t10k.csv")t10m = genNormData(100000, colSize, clsNum, 2 / sqrt(20), 1.0)saveText(t10m, "t100k.csv")t100m = genNormData(1000000, colSize, clsNum, 2 / sqrt(20), 1.0)saveText(t100m, "t1m.csv")t1000 = genNormData(1000, colSize, clsNum, 2 / sqrt(20), 1.0)saveText(t1000, "t1000.csv")
2. Python scikit-learn的訓練和預測腳本
import pandas as pdimport numpy as npfrom sklearn.ensemble import RandomForestClassifier, RandomForestRegressorfrom time import * test_df = pd.read_csv("t1000.csv")def evaluate(path, model_name, num_trees=500, depth=30, num_jobs=1): df = pd.read_csv(path) y = df.values[:,0] x = df.values[:,1:] test_y = test_df.values[:,0] test_x = test_df.values[:,1:] rf = RandomForestClassifier(n_estimators=num_trees, max_depth=depth, n_jobs=num_jobs) start = time() rf.fit(x, y) end = time() elapsed = end - start print("Time to train model %s: %.9f seconds" % (model_name, elapsed)) acc = np.mean(test_y == rf.predict(test_x)) print("Model %s accuracy: %.3f" % (model_name, acc)) evaluate("t10k.csv", "10k", 500, 10, 48) # choose your own parameter
3. Spark MLlib的訓練和預測代碼(Scala實現)
import org.apache.spark.mllib.tree.configuration.FeatureType.Continuousimport org.apache.spark.mllib.tree.model.{DecisionTreeModel, Node}object Rf { def main(args: Array[String]) = { evaluate("/t100k.csv", 500, 10) // choose your own parameter } def processCsv(row: Row) = { val label = row.getString(0).toDouble val featureArray = (for (i <- 1 to (row.size-1)) yield row.getString(i).toDouble).toArray val features = Vectors.dense(featureArray) LabeledPoint(label, features) } def evaluate(path: String, numTrees: Int, maxDepth: Int) = { val spark = SparkSession.builder.appName("Rf").getOrCreate() import spark.implicits._ val numClasses = 2 val categoricalFeaturesInfo = Map[Int, Int]() val featureSubsetStrategy = "sqrt" val impurity = "gini" val maxBins = 32 val d_test = spark.read.format("CSV").option("header","true").load("/t1000.csv").map(processCsv).rdd d_test.cache() println("Loading table (1M * 50)") val d_train = spark.read.format("CSV").option("header","true").load(path).map(processCsv).rdd d_train.cache() println("Training table (1M * 50)") val now = System.nanoTime val model = RandomForest.trainClassifier(d_train, numClasses, categoricalFeaturesInfo, numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins) println(( System.nanoTime - now )/1e9) val scoreAndLabels = d_test.map { point => val score = model.trees.map(tree => softPredict2(tree, point.features)).sum if (score * 2 > model.numTrees) (1.0, point.label) else (0.0, point.label) } val metrics = new MulticlassMetrics(scoreAndLabels) println(metrics.accuracy) } def softPredict(node: Node, features: Vector): Double = { if (node.isLeaf) { //if (node.predict.predict == 1.0) node.predict.prob else 1.0 - node.predict.prob node.predict.predict } else { if (node.split.get.featureType == Continuous) { if (features(node.split.get.feature) <= node.split.get.threshold) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } else { if (node.split.get.categories.contains(features(node.split.get.feature))) { softPredict(node.leftNode.get, features) } else { softPredict(node.rightNode.get, features) } } } } def softPredict2(dt: DecisionTreeModel, features: Vector): Double = { softPredict(dt.topNode, features) } }
4. DolphinDB的訓練和預測腳本
def createInMemorySEQTable(t, seqSize) { db = database("", SEQ, seqSize) dataSize = t.size() ts = () for (i in 0:seqSize) { ts.append!(t[(i * (dataSize/seqSize)):((i+1)*(dataSize/seqSize))]) } return db.createPartitionedTable(ts, `tb) }def accuracy(v1, v2) { return (v1 == v2).sum() \ v2.size() }def evaluateUnparitioned(filePath, numTrees, maxDepth, numJobs) { test = loadText("t1000.csv") t = loadText(filePath); clsNum = 2; colSize = 50 timer res = randomForestClassifier(sqlDS(<select * from t>), `cls, `col + string(0..(colSize-1)), clsNum, sqrt(colSize).int(), numTrees, 32, maxDepth, 0.0, numJobs) print("Unpartitioned table accuracy = " + accuracy(res.predict(test), test.cls).string()) } evaluateUnpartitioned("t10k.csv", 500, 10, 48) // choose your own parameter
5. xgboost的訓練和預測腳本
import pandas as pdimport numpy as npimport xgboost as xgbfrom time import *def load_csv(path): df = pd.read_csv(path) target = df['cls'] df = df.drop(['cls'], axis=1) return xgb.DMatrix(df.values, label=target.values) dtest = load_csv('/hdd/hdd1/twonormData/t1000.csv')def evaluate(path, num_trees, max_depth, num_jobs): dtrain = load_csv(path) param = {'num_parallel_tree':num_trees, 'max_depth':max_depth, 'objective':'binary:logistic', 'nthread':num_jobs, 'colsample_bylevel':1/np.sqrt(50)} start = time() model = xgb.train(param, dtrain, 1) end = time() elapsed = end - start print("Time to train model: %.9f seconds" % elapsed) prediction = model.predict(dtest) > 0.5 print("Accuracy = %.3f" % np.mean(prediction == dtest.get_label())) evaluate('t10k.csv', 500, 10, 24) // choose your own parameter