當 Mars 趕上 RAPIDS:用 GPU 以並行的方式加速數據科學

背景

在數據科學世界,Python 是一個不可忽視的存在,且有愈演愈烈之勢。而其中主要的使用工具,包括 Numpy、Pandas 和 Scikit-learn 等。html

Numpy

Numpy是數值計算的基礎包,內部提供了多維數組(ndarray)這樣一個數據結構,用戶能夠很方便地在任意維度上進行數值計算。git

image

咱們舉一個蒙特卡洛方法求解 Pi 的例子。這背後的原理很是簡單,如今咱們有個半徑爲1的圓和邊長爲2的正方形,他們的中心都在原點。如今咱們生成大量的均勻分佈的點,讓這些點落在正方形內,經過簡單的推導,咱們就能夠知道,Pi 的值 = 落在圓內的點的個數 / 點的總數 * 4。github

這裏要注意,就是隨機生成的點的個數越多,結果越精確。算法

用 Numpy 實現以下:數據庫

import numpy as np

N = 10 ** 7  # 1千萬個點

data = np.random.uniform(-1, 1, size=(N, 2))  # 生成1千萬個x軸和y軸都介於-1和1間的點
inside = (np.sqrt((data ** 2).sum(axis=1)) < 1).sum()  # 計算到原點的距離小於1的點的個數
pi = 4 * inside / N
print('pi: %.5f' % pi)

能夠看到,用 Numpy 來進行數值計算很是簡單,只要寥寥數行代碼,而若是讀者習慣了 Numpy 這種面相數組的思惟方式以後,不管是代碼的可讀性仍是執行效率都會有巨大提高。編程

pandas

pandas是一個強大的數據分析和處理的工具,它其中包含了海量的 API 來幫助用戶在二維數據(DataFrame)上進行分析和處理。api

pandas 中的一個核心數據結構就是 DataFrame,它能夠簡單理解成表數據,但不一樣的是,它在行和列上都包含索引(Index),要注意這裏不一樣於數據庫的索引的概念,它的索引能夠這麼理解:當從行看 DataFrame 時,咱們能夠把 DataFrame 當作行索引到行數據的這麼一個字典,經過行索引,能夠很方便地選中一行數據;列也同理。數組

咱們拿movielens 的數據集做爲簡單的例子,來看 pandas 是如何使用的。這裏咱們用的是 Movielens 20M Dataset.網絡

import pandas as pd

ratings = pd.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

經過一行簡單的pandas.read_csv就能夠讀取 CSV 數據,接着按 userId 作分組聚合,求 rating 這列在每組的總和、平均、最大、最小值。數據結構

「食用「 pandas 的最佳方式,仍是在 Jupyter notebook 裏,以交互式的方式來分析數據,這種體驗會讓你不禁感嘆:人生苦短,我用 xx(😉)

scikit-learn

scikit-learn是一個 Python 機器學習包,提供了大量機器學習算法,用戶不須要知道算法的細節,只要經過幾個簡單的 high-level 接口就能夠完成機器學習任務。固然如今不少算法都使用深度學習,但 scikit-learn 依然能做爲基礎機器學習庫來串聯整個流程。

咱們以 K-最鄰近算法爲例,來看看用 scikit-learn 如何完成這個任務。

import pandas as pd
from sklearn.neighbors import NearestNeighbors

df = pd.read_csv('data.csv')  # 輸入是 CSV 文件,包含 20萬個向量,每一個向量10個元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

fit接口就是 scikit-learn 裏最經常使用的用來學習的接口。能夠看到整個過程很是簡單易懂。

Mars——Numpy、pandas 和 scikit-learn 的並行和分佈式加速器

Python 數據科學棧很是強大,但它們有以下幾個問題:

  1. 如今是多核時代,這幾個庫裏鮮有操做能利用得上多核的能力。
  2. 隨着深度學習的流行,用來加速數據科學的新的硬件層出不窮,這其中最多見的就是 GPU,在深度學習前序流程中進行數據處理,咱們是否是也能用上 GPU 來加速呢?
  3. 這幾個庫的操做都是命令式的(imperative),和命令式相對應的就是聲明式(declarative)。命令式的更關心 how to do,每個操做都會當即獲得結果,方便對結果進行探索,優勢是很靈活;缺點則是中間過程可能佔用大量內存,不能及時釋放,並且每一個操做之間就被割裂了,沒有辦法作算子融合來提高性能;那相對應的聲明式就恰好相反,它更關心 what to do,它只關心結果是什麼,中間怎麼作並無這麼關心,典型的聲明式像 SQL、TensorFlow 1.x,聲明式能夠等用戶真正須要結果的時候纔去執行,也就是 lazy evaluation,這中間過程就能夠作大量的優化,所以性能上也會有更好的表現,缺點天然也就是命令式的優勢,它不夠靈活,調試起來比較困難。

爲了解決這幾個問題,Mars被咱們開發出來,Mars 在MaxCompute團隊內部誕生,它的主要目標就是讓 Numpy、pandas 和 scikit-learn 等數據科學的庫可以並行和分佈式執行,充分利用多核和新的硬件。

Mars 的開發過程當中,咱們核心關注的幾點包括:

  1. 咱們但願 Mars 足夠簡單,只要會用 Numpy、pandas 或 scikit-learn 就會用 Mars。
  2. 避免重複造輪子,咱們但願能利用到這些庫已有的成果,只須要能讓他們被調度到多核/多機上便可。
  3. 聲明式和命令式兼得,用戶能夠在這二者之間自由選擇,靈活度和性能兼而有之。
  4. 足夠健壯,生產可用,能應付各類 failover 的狀況。

固然這些是咱們的目標,也是咱們一直努力的方向。

Mars tensor:Numpy 的並行和分佈式加速器

上面說過,咱們的目標之一是,只要會用 Numpy 等數據科學包,就會用 Mars。咱們直接來看代碼,仍是以蒙特卡洛爲例。變成 Mars 的代碼是什麼樣子呢?

import mars.tensor as mt

N = 10 ** 10

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = (4 * inside / N).execute()
print('pi: %.5f' % pi)

能夠看到,區別就只有兩處:import numpy as np變成import mars.tensor as mt,後續的np.都變成mt.;pi 在打印以前調用了一下.execute()方法。

也就是默認狀況下,Mars 會按照聲明式的方式,代碼自己移植的代價極低,而在真正須要一個數據的時候,經過.execute()去觸發執行。這樣能最大限度得優化性能,以及減小中間過程內存消耗。

這裏,咱們還將數據的規模擴大了 1000 倍,來到了 100 億個點。以前 1/1000 的數據量的時候,在個人筆記本上須要 757ms;而如今數據擴大一千倍,光data就須要 150G 的內存,這用 Numpy 自己根本沒法完成。而使用 Mars,計算時間只須要 3min 44s,而峯值內存只須要 1G 左右。假設咱們認爲內存無限大,Numpy 須要的時間也就是以前的 1000 倍,大概是 12min 多,能夠看到 Mars 充分利用了多核的能力,而且經過聲明式的方式,極大減小了中間內存佔用。

前面說到,咱們試圖讓聲明式和命令式兼得,而使用命令式的風格,只須要在代碼的開始配置一個選項便可。

import mars.tensor as mt
from mars.config import options

options.eager_mode = True  # 打開 eager mode 後,每一次調用都會當即執行,行爲和 Numpy 就徹底一致

N = 10 ** 7

data = mt.random.uniform(-1, 1, size=(N, 2))
inside = (mt.linalg.norm(data, axis=1) < 1).sum()
pi = 4 * inside / N  # 不須要調用 .execute() 了
print('pi: %.5f' % pi.fetch())  # 目前須要 fetch() 來轉成 float 類型,後續咱們會加入自動轉換

Mars DataFrame:pandas 的並行和分佈式加速器

看過怎麼樣輕鬆把 Numpy 代碼遷移到 Mars tensor ,想必讀者也知道怎麼遷移 pandas 代碼了,一樣也只有兩個區別。咱們仍是以 movielens 的代碼爲例。

import mars.dataframe as md

ratings = md.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']}).execute()

Mars Learn:scikit-learn 的並行和分佈式加速器

Mars Learn 也同理,這裏就不作過多闡述了。但目前 Mars learn 支持的 scikit-learn 算法還很少,咱們也在努力移植的過程當中,這須要大量的人力和時間,歡迎感興趣的同窗一塊兒參與。

import mars.dataframe as md
from mars.learn.neighbors import NearestNeighbors

df = md.read_csv('data.csv')  # 輸入是 CSV 文件,包含 20萬個向量,每一個向量10個元素
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)  # 這裏 fit 的時候也會總體觸發執行,所以機器學習的高層接口都是當即執行的
neighbors = nn.kneighbors(df).fetch()  # kneighbors 也已經觸發執行,只須要 fetch 數據

這裏要注意的是,對於機器學習的fitpredict等高層接口,Mars Learn 也會當即觸發執行,以保證語義的正確性。

RAPIDS:GPU 上的數據科學

相信細心的觀衆已經發現,GPU 好像沒有被提到。不要着急,這就要說到RAPIDS

在以前,雖然 CUDA 已經將 GPU 編程的門檻降到至關低的一個程度了,但對於數據科學家們來講,在 GPU 上處理 Numpy、pandas 等能處理的數據無異於天方夜譚。幸運的是,NVIDIA 開源了 RAPIDS 數據科學平臺,它和 Mars 的部分思想高度一致,即便用簡單的 import 替換,就能夠將 Numpy、pandas 和 scikit-learn 的代碼移植到 GPU 上。

image

其中,RAPIDS cuDF 用來加速 pandas,而 RAPIDS cuML 用來加速 scikit-learn。

對於 Numpy 來講,CuPy已經很好地支持用 GPU 來加速了,這樣 RAPIDS 也得以把重心放在數據科學的其餘部分。

CuPy:用 GPU 加速 Numpy

仍是蒙特卡洛求解 Pi。

import cupy as cp
 
N = 10 ** 7

data = cp.random.uniform(-1, 1, size=(N, 2))
inside = (cp.sqrt((data ** 2).sum(axis=1)) < 1).sum()
pi = 4 * inside / N
print('pi: %.5f' % pi)

在個人測試中,它將 CPU 的 757ms,降到只有 36ms,提高超過 20 倍,能夠說效果很是顯著。這正是得益於 GPU 很是適合計算密集型的任務。

RAPIDS cuDF:用 GPU 加速 pandas

import pandas as pd替換成import cudf,GPU 內部如何並行,CUDA 編程這些概念,用戶都再也不須要關心。

import cudf

ratings = cudf.read_csv('ml-20m/ratings.csv')
ratings.groupby('userId').agg({'rating': ['sum', 'mean', 'max', 'min']})

運行時間從 CPU 上的 18s 提高到 GPU 上的 1.66s,提高超過 10 倍。

RAPIDS cuML:用 GPU 加速 scikit-learn

一樣是 k-最鄰近問題。

import cudf
from cuml.neighbors import NearestNeighbors

df = cudf.read_csv('data.csv')
nn = NearestNeighbors(n_neighbors=10)
nn.fit(df)
neighbors = nn.kneighbors(df)

運行時間從 CPU 上 1min52s,提高到 GPU 上 17.8s。

Mars 和 RAPIDS 結合能帶來什麼?

RAPIDS 將 Python 數據科學帶到了 GPU,極大地提高了數據科學的運行效率。它們和 Numpy 等同樣,是命令式的。經過和 Mars 結合,中間過程將會使用更少的內存,這使得數據處理量更大;Mars 也能夠將計算分散到多機多卡,以提高數據規模和計算效率。

在 Mars 裏使用 GPU 也很簡單,只須要在對應函數上指定gpu=True。例如建立 tensor、讀取 CSV 文件等都適用。

import mars.tensor as mt
import mars.dataframe as md

a = mt.random.uniform(-1, 1, size=(1000, 1000), gpu=True)
df = md.read_csv('ml-20m/ratings.csv', gpu=True)

下圖是用 Mars 分別在 Scale up 和 Scale out 兩個維度上加速蒙特卡洛計算 Pi 這個任務。通常來講,咱們要加速一個數據科學任務,能夠有這兩種方式,Scale up 是指可使用更好的硬件,好比用更好的 CPU、更大的內存、使用 GPU 替代 CPU等;Scale out 就是指用更多的機器,用分佈式的方式提高效率。

image

能夠看到在一臺 24 核的機器上,Mars 計算須要 25.8s,而經過分佈式的方式,使用 4 臺 24 核的機器的機器幾乎以線性的時間提高。而經過使用一個 NVIDIA TESLA V100 顯卡,咱們就能將單機的運行時間提高到 3.98s,這已經超越了4臺 CPU 機器的性能。經過再將單卡拓展到多卡,時間進一步下降,但這裏也能夠看到,時間上很難再線性擴展了,這是由於 GPU 的運行速度提高巨大,這個時候網絡、數據拷貝等的開銷就變得明顯。

性能測試

咱們使用了https://github.com/h2oai/db-benchmark的數據集,測試了三個數據規模的 groupby 和 一個數據規模的 join。而咱們主要對比了 pandas 和DASK。DASK 和 Mars 的初衷很相似,也是試圖並行和分佈式化 Python 數據科學,但它們的設計、實現、分佈式都存在較多差別,這個後續咱們再撰文進行詳細對比。

測試機器配置是 500G 內存、96 核、NVIDIA V100 顯卡。Mars 和 DASK 在 GPU 上都使用 RAPIDS 執行計算。

Groupby

數據有三個規模,分別是 500M、5G 和 20G。

查詢也有三組。

查詢一

df = read_csv('data.csv')
df.groupby('id1').agg({'v1': 'sum'})

查詢二

df = read_csv('data.csv')
df.groupby(['id1', 'id2']).agg({'v1': 'sum'})

查詢三

df = read_csv('data.csv')
df.gropuby(['id6']).agg({'v1': 'sum', 'v2': 'sum', 'v3': 'sum'})

數據大小 500M,性能結果

image

數據大小 5G,性能結果

image

數據大小 20G,性能結果

image

數據大小到 20G 時,pandas 在查詢2會內存溢出,得不出結果。

能夠看到,隨着數據增長,Mars 的性能優點會愈發明顯。

得益於 GPU 的計算能力,GPU 運算性能相比於 CPU 都有數倍的提高。若是單純使用 RAPIDS cuDF,因爲顯存大小的限制,數據來到 5G 都難以完成,而因爲 Mars 的聲明式的特色,中間過程對顯存的使用大幅獲得優化,因此整組測試來到 20G 都能輕鬆完成。這正是 Mars + RAPIDS 所能發揮的威力。

Join

測試查詢:

x = read_csv('x.csv')
y = read_csv('y.csv')
x.merge(y, on='id1')

測試數據 x 爲500M,y 包含10行數據。

image

總結

RAPIDS 將 Python 數據科學帶到了 GPU,極大提高了數據分析和處理的效率。Mars 的注意力更多放在並行和分佈式。相信這二者的結合,在將來會有更多的想象空間。

Mars 誕生於MaxCompute團隊,MaxCompute 原名 ODPS,是一種快速、徹底託管的EB級數據倉庫解決方案。Mars 即將經過 MaxCompute 提供服務,購買了 MaxCompute 服務的用戶屆時能夠開箱即用體驗 Mars 服務。敬請期待。

相關文章
相關標籤/搜索