GPU上的隨機森林:比Apache Spark快2000倍

做者|Aaron Richter 編譯|VK 來源|Towards Data Sciencehtml

隨機森林是一種機器學習算法,以其魯棒性、準確性和可擴展性而受到許多數據科學家的信賴。python

該算法經過bootstrap聚合訓練出多棵決策樹,而後經過集成對輸出進行預測。因爲其集成特徵的特色,隨機森林是一種能夠在分佈式計算環境中實現的算法。樹能夠在集羣中跨進程和機器並行訓練,結果比使用單個進程的訓練時間快得多。git

在本文中,咱們探索了使用Apache Spark在CPU機器集羣上實現分佈式隨機森林訓練,並將其與使用NVIDIA RAPIDS和Dask的GPU機器集羣上的訓練性能進行了比較。github

雖然GPU計算傳統上是爲深度學習應用而保留的,但RAPIDS是一個在GPU上執行數據處理和非深度學習ML工做的庫,與在cpu上執行相比,它能夠大大提升性能。算法

咱們使用3億個實例訓練了一個隨機森林模型:Spark在20個節點CPU集羣上耗時37分鐘,而RAPIDS在20個節點GPU集羣上耗時1秒。GPU的速度提升了2000倍以上!sql

實驗概述

咱們使用公共可用的紐約出租車數據集,並訓練一個隨機森林迴歸器,該回歸器可使用與乘客接送相關的屬性來預測出租車的票價金額。以2017年、2018年和2019年的出租車出行量爲訓練集,共計300700143個實例。apache

數據集連接:https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.pagebootstrap

Spark和RAPIDS代碼能夠在Jupyter Notebook中找到。安全

硬件

Spark集羣使用Amazon EMR進行管理,而Dask/RAPIDS集羣則使用Saturn Cloud進行管理。app

兩個集羣都有20個工做節點,具備如下AWS實例類型:

Spark:r5.2xlarge

  • 8個CPU,64 GB RAM

  • 按需價格:0.504美圓/小時

RAPIDS:g4dn.xlarge

  • 4個CPU,16 GB RAM

  • 1個GPU,16 GB GPU RAM(NVIDIA T4)

  • 按需價格:0.526美圓/小時

Saturn Cloud也能夠用NVIDIA特斯拉V100 GPU來啓動Dask集羣,但咱們在這個練習中選擇了g4dn.xlarge,保持與Spark集羣類似的小時成本概況。

Spark

Apache Spark是一個在Scala中構建的開源大數據處理引擎,它有一個Python接口,能夠調用Scala/JVM代碼。

它是Hadoop處理生態系統中的一個重要組成部分,圍繞MapReduce範例構建,而且具備用於數據幀和機器學習的接口。

設置Spark集羣不在本文的討論範圍以內,可是一旦準備好集羣,就能夠在Jupyter Notebook中運行如下命令來初始化Spark:

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .config('spark.executor.memory', '36g')
        .getOrCreate())

findspark包檢測系統上的Spark安裝位置;若是能夠知道Spark包的安裝位置,則可能不須要這樣作。

要得到有性能的Spark代碼,須要設置幾個配置設置,這取決於集羣設置和工做流。在這種狀況下,咱們設置spark.executor.memory以確保咱們不會遇到任何內存溢出或Java堆錯誤。

RAPIDS

NVIDIA RAPIDS是一個開源的Python框架,它在gpu而不是cpu上執行數據科學代碼。相似於在訓練深度學習模型時所看到的,這將爲數據科學工做帶來巨大的性能提高。

RAPIDS有數據幀、ML、圖形分析等接口。RAPIDS使用Dask來處理與具備多個gpu的機器的並行化,以及每一個具備一個或多個gpu的機器集羣。

設置GPU機器可能有點棘手,可是Saturn Cloud已經爲啓動GPU集羣預構建了映像,因此你只需幾分鐘就能夠啓動並運行了!要初始化指向羣集的Dask客戶端,能夠運行如下命令:

from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)

要本身設置Dask集羣,請參閱此docs頁面:https://docs.dask.org/en/latest/setup.html

數據加載

數據文件託管在一個公共的S3 bucket上,所以咱們能夠直接從那裏讀取csv。S3 bucket的全部文件都在同一個目錄中,因此咱們使用s3fs來選擇咱們想要的文件:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]

cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
      'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',
      'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

使用Spark,咱們須要單獨讀取每一個CSV文件,而後將它們組合在一塊兒:

import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# 手動指定模式,由於read.csv中的inferSchema很是慢
schema = StructType([
    StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    ...
    # 參考notebook得到完整對象模式
]) 

def read_csv(path):
    df = spark.read.csv(path,
                        header=True,
                        schema=schema,
                        timestampFormat='yyyy-MM-dd HH:mm:ss',
                       )
    df = df.select(cols)
    return df

dfs = []
for tf in files:
    df = read_csv(tf)
    dfs.append(df)

taxi = functools.reduce(DataFrame.unionAll, dfs)
taxi.count()

使用Dask+RAPIDS,咱們能夠一次性讀取全部CSV文件:

import dask_cudf

taxi = dask_cudf.read_csv(files, 
                          assume_missing=True,
                          parse_dates=[1,2], 
                          usecols=cols, 
                          storage_options={'anon': True})
len(taxi)

特徵工程

咱們將根據時間生成一些特徵,而後保存數據幀。在這兩個框架中,這將執行全部CSV加載和預處理,並將結果存儲在RAM中(在RAPIDS的狀況下是GPU RAM)。咱們將用於訓練的特徵包括:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
            'pickup_week_hour', 'passenger_count', 'VendorID', 
            'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 
            'DOLocationID']

對於Spark,咱們須要將特徵收集到向量類中:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))

taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))

taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])
assembler_fitted = pipeline.fit(taxi)
X = assembler_fitted.transform(taxi)
X.cache()
X.count()

對於RAPIDS,咱們將全部浮點值轉換爲float32,以便進行GPU計算:

from dask import persist
from dask.distributed import wait

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']
X, y = persist(X, y)
_ = wait([X, y])
len(X)

訓練隨機森林

咱們只須要幾行代碼就能夠訓練隨機森林。

Spark:

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)

RAPIDS:

from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)

結果

咱們對Spark(CPU)和RAPIDS(GPU)集羣上的300700143個紐約出租車數據實例訓練了一個隨機森林模型。兩個集羣都有20個工做節點,每小時價格大體相同。如下是工做流每一個部分的結果:

Task Spark RAPIDS
Load/rowcount 20.6 seconds 25.5 seconds
Feature engineering 54.3 seconds 23.1 seconds
Random forest 36.9 minutes 1.02 seconds

37分鐘的Spark 與1秒的RAPIDS

GPU勝利!想想,一次擬合你不須要等待37分鐘了,這將加快以後迭代和改進模型的速度。而在CPU上,一旦添加了超參數調優或測試不一樣的模型,迭代都很容易累積到數小時或數天。

你須要看到才能相信嗎?你能夠在這裏找到Notebook,而後本身運行測試:https://github.com/saturncloud/saturn-cloud-examples/tree/main/machine_learning/random_forest

你須要更快的隨機森林嗎

對!你能夠在幾秒鐘內用Saturn Cloud進入Dask/RAPIDS。Saturn處理全部工具基礎設施、安全性和部署方面的難題,讓你當即啓動並運行RAPIDS。點擊這裏在你的AWS賬戶免費試用Saturn:https://manager.aws.saturnenterprise.io/register

原文連接:https://towardsdatascience.com/random-forest-on-gpus-2000x-faster-than-apache-spark-9561f13b00ae

歡迎關注磐創AI博客站: http://panchuang.net/

sklearn機器學習中文官方文檔: http://sklearn123.com/

歡迎關注磐創博客資源彙總站: http://docs.panchuang.net/

相關文章
相關標籤/搜索