[譯] 在 Python 中,如何運用 Dask 數據進行並行數據分析

多維度思惟。來源:Pixabayhtml

有時你經過 Python’s Pandas 打開一個大的數據集,而後試着去獲取一些度量標準,但這時整個過程可能會忽然中止。 若是你使用 Pandas 處理大數據,可能一個簡單的序列平均值都須要你等待一分鐘,咱們甚至不會去調用 apply。這還只是百萬級別的行數!當你的數據達到億級別時,你最好使用 Spark 或者其餘方式。前端

我在不久以前發現了這個工具:不須要更好的基礎架構或轉換語言,就能夠在 Python 中加速數據分析的方法。可是若是數據集太大,它的最終優化結果會必定的限制,可是它仍然比常規的 Pandas 擴展性好,可能也更符合你的問題場景 —— 尤爲是不進行大量的重寫索引時。python

什麼是 Dask?

Dask 是一個開源項目,爲你提供 NumPy 數組、Pandas Dataframes 以及常規 list 的抽象,容許你使用多核處理器並行運行它們的操做。android

如下是來自本教程的摘錄:ios

Dask 提供了更高級別的 Array、Bag、和 DataFrame 集合,它們模仿 NumPy、list 和 Pandas,但容許在不適合主內存的數據集上並行操做。對於大型數據集,Dask 的高級集合能夠取代 NumPy 和 Pandas。git

這聽上去很好!爲了這篇文章,我特地試用了 Dask Dataframs,並在其上運行了幾個基準測試。github

閱讀文檔

我首先閱讀了官方文檔,看看在 Dask 的文檔中的精確推薦,而不是常規 Dataframse。如下是官方文檔的部份內容:npm

  • 操做大型數據集,即便這些數據不適用於內存
  • 使用盡量多的內核來加速長時間計算
  • 在大的數據集上,經過標準的 Pandas 操做,如集羣、鏈接、還有時間序列計算,來對計算作分佈式處理。

接下來,它列出了一些很快的場景,但前提是你在使用 Dask 數據:後端

  • 算術運算(對序列進行乘或加)
  • 經常使用聚合(均值、最小值、最大值、和等)
  • 調用 apply(只要它是索引,而非 groupby(‘y’),其中 y 並不是索引)
  • 調用 value_counts()、drop_duplicates() 或 corr()
  • Locisin 和逐行選擇進行過濾

若是發現它有用,只對數據過濾進行一次小瀏覽就行。數組

#經過引用僅,返回 x >5 的行(根據起初的 df 寫入更改)
df2 = df.loc[df['x'] > 5]
#經過引用,僅返回x 爲 0、一、二、3 或 4 的行
df3 = df.x.isin(range(4))
#經過只讀引用,僅返回 x > 5 的行(不能被寫)
df4 = df[df['x']>5]
複製代碼

如何使用 Dask Dataframes

Dask Dataframes 具備與 Pandas Dataframes 類似的 API,只有聚合和 apply 是延遲計算,你須要經過調用 compute 方法來計算。爲了生成一個 Dask Dataframe,你能夠像在 Pandas 中那樣簡單調用 read_csv 方法,或只調用給定的一個 Pandas Dataframe df

dd = ddf.from_pandas(df, npartitions=N)
複製代碼

ddf 是你使用 DASK Dataframes 導入的名稱,而 nparitions 是一個參數,它告訴 Dataframe 你指望如何對它進行分區。

StackOverflow,建議將 Dataframe 劃分到你計算機內核數目相同的分區中,或是這個數字的幾倍,由於每一個分區都會運行在不一樣的線程上,若是有太多線程,它們之間將變得過於昂貴。

開始:進行基準測試!

我開發了一個 Jupyter 筆記來嘗試使用這個框架,而且發佈在 Github 上,這樣你能夠查看具體信息甚至是親自運行它。

我運行的基準測試能夠在 GitHub 上獲取,這裏列舉了主要內容:

def get_big_mean():
    return dfn.salary.mean().compute()
def get_big_mean_old():
    return df3.salary.mean()

def get_big_max():
    return dfn.salary.max().compute()
def get_big_max_old():
    return df3.salary.max()

def get_big_sum():
    return dfn.salary.sum().compute()
def get_big_sum_old():
    return df3.salary.sum()

def filter_df():
    df = dfn[dfn['salary']>5000]
def filter_df_old():
    df = df3[df3['salary']>5000]
複製代碼

這是一個有着 2500 萬行的常規 df3,內容是使用來自上一篇文章中的腳本生成的(從列表中隨機抽取的列名是 name、surname 以及 salary)。我使用了 50 行數據集,並將其鏈接了 50 萬次,由於我只對它運行所需時間感興趣,對於分析 Per se 卻不感興趣。

dfn 是基於 df3 的 Dask Dataframe。

第一批次的結果:不太樂觀

首先,我嘗試用 3 個分區進行測試,由於我只有 4 個內核,因此不想過分使用個人 PC。我用 Dask 的結果不是很理想,並且還必須等待很長時間才能獲取結果,我擔憂這多是由於我作的分區太少了:

204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old

131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old

120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old

0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old
複製代碼

你能夠看到,當我是用 Dask 時,大多數操做的速度都要慢得多。這給我了一個提示,那就是我可能不得不使用更多的分區。生成延遲評估所花費的數量也是能夠忽略不計的(在某些狀況下不到半秒),若是我重用它們,就不會隨着時間的推移而攤銷。

我還使用了 apply 方法測試它:

def f(x):
    return (13*x+5)%7

def apply_random_old():
    df3['random']= df3['salary'].apply(f)
    
def apply_random():
    dfn['random']= dfn['salary'].apply(f).compute()
複製代碼

結果並沒有差異:

369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old
複製代碼

所以,通常狀況下,儘管過濾器的速度要快得多,但大多數操做的速度仍然是原來的兩倍。我擔憂的是,也許我也應該調用 compute 這個函數,因此把這個結果做爲對比。

更多分區:驚人的速度

再這樣使人沮喪的結果以後,我認爲多是我尚未使用足夠的分區。這樣作的要點是並行運行,或許是我須要更多的並行化?所以我對 8 個分區進行了相同的測試,下面是我獲得的結果(我忽略了非並行 dataframe,由於它們基本是相同的):

3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df

112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test
複製代碼

沒錯,大多數操做的運行速度是常規 Dataframe 的 10 倍以上,apply 得到了更快的速度!我還在 salary 序列上運行了 value_count 方法。對於上下文,請記住,當我在常規的 Dataframe 上運行這個測試時,我等待了 10 分鐘以後,我不得不中止這個過程,這一次只花了 50 秒! 基本上,我只是用錯了工具,並且很是快。比普通的 Dataframes 快得多。

結論

考慮到我在一臺很是舊的 4 核 PC 上,一分鐘內運行 2.5 億行內容,我以爲它會在實際應用中有着舉足輕重的地位。所以我建議,下次你處理本地或從單個 AWS 實例中處理數據集時,能夠考慮使用這個框架,它真的很是高效。

我但願你以爲這盤文章有用或者有趣!編寫他所花費的時間超過個人預期,由於一些基準測試花費的時間太長了。記得告訴我在閱讀以前你是否瞭解過 Dask,或者你是否在工做或項目中使用過它。另外,若是有其餘更棒的功能,記得告訴我,我並無檢測我是否作錯了什麼內容!你的回饋和評論是我寫做的重要緣由之一,由於咱們都在從中成長。

若是你喜歡這篇文章,能夠繼續支持我。能夠繼續支持個人寫做。同時你還能夠在我這裏瞭解更多 Python 教程、提示和技巧!

若是發現譯文存在錯誤或其餘須要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可得到相應獎勵積分。文章開頭的 本文永久連接 即爲本文在 GitHub 上的 MarkDown 連接。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智能等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索