最近,在 PyCon China 2018 的北京主會場、成都和杭州分會場都分享了咱們最新的工做 Mars,基於矩陣的統一計算框架。本文會以文字的形式對 PyCon 中國上的分享再進行一次闡述。python
聽到 Mars,不少第一次據說的同窗都會靈魂三問:Mars 是什麼,能作什麼,怎麼作的。今天咱們就會從背景,以及一個例子出發,來回答這幾個問題。c++
首先是 scipy 技術棧的全景圖,numpy 是基礎,它提供了多維數組的數據結構,並提供了它上面的各類計算。再往上,重要的有 scipy,主要面向各類科學計算的操做;pandas,其中核心的概念是 DataFrame,他提供對錶類型數據的處理、清洗等功能。往上一層,比較經典的庫,有 scikit-learn,它是最知名的機器學習框架之一。最上面一層,是各類垂直領域的庫,如 astropy 主要面向天文,biopython 面向生物領域等。git
從 scipy 技術棧能夠看出,numpy 是一個核心的地位,大量上層的庫都使用了 numpy 的數據結構和計算。github
咱們真實世界的數據,並不僅是表這種二維類型數據那麼簡單,不少時候,咱們要面對的每每是多維數據,好比咱們常見的圖片處理,首先咱們有圖片的個數,而後有圖片的長寬,以及 RGBA 通道,這就是四維的數據;這樣的例子不勝枚舉。有這樣多維的處理能力,就有處理各類更加複雜,甚至是科學領域的能力;同時,因爲多維數據自己包含二維數據,因此,咱們也所以具有表類型數據的處理能力。數組
另外,若是咱們須要探究數據的內在,光靠對錶數據進行一些統計等操做是絕對不夠的,咱們須要更深層的「數學」 的方法,好比運用矩陣乘法、傅里葉變換等等的能力,來對數據進行更深層次的分析。而 numpy 因爲是數值計算的庫,加上各類上層的庫,咱們認爲它們很適合用來提供這方面的能力。session
那麼,爲何要作 Mars 這個項目呢?咱們不妨從一個例子來看。數據結構
咱們試圖用蒙特卡洛方法來求解 pi,蒙特卡洛方法其實很簡單,就是用隨機數的方法來解決特定的問題。如圖,這裏咱們有個半徑爲1的圓和邊長爲2的正方形,咱們生成不少隨機點的方式,經過右下角的公式,咱們就能夠計算出 pi 的值爲 4 乘以落在圓裏點的個數除以總的點個數。隨機生成的點越多,計算出來的 pi 就越精確。多線程
用純 Python 實現很是簡單,咱們只要遍歷 N 次,生成 x 和 y 點,計算是否是落在圓內便可。運行1千萬個點,須要超過10秒的時間。架構
Cython 是常見加速 Python 代碼的方式,Cython 定義了 Python 語言的超集,把這個語言翻譯到 c/c++,而後再進行編譯來加速執行。這裏,咱們增長了幾個變量的類型,能夠看到比純 Python 提高了 40% 的性能。併發
Cython 如今已經成爲 Python 項目的標配,核心的 Python 三方庫基本都使用 Cython 來加速 Python 代碼的性能。
咱們這個例子裏的數據都是一個類型,咱們能夠想到用專門的數值計算的庫,經過矢量化的方式,能極快加速這個任務的性能。numpy 就是當仁不讓的選擇了,使用 numpy,咱們須要的是面向 array 的思惟方式,咱們應當減小使用循環。這裏先用 numpy.random.uniform
來生成 N*2 的一個二維數組,而後 data ** 2
會對該數組裏的全部數據作平方操做,而後 sum(axis=1)
,會對 axis=1 也就是行方向上求和,這個時候,獲得的是長度爲 N 的 vector,而後咱們用 numpy.sqrt
來對這個 vector 的每個值求開方,<1 會獲得一個布爾值的 vector,即每一個點是否是都是落在圓裏,最後接一個 sum,就能夠求出來總共的點的個數。初次上手 numpy 可能會不太習慣,可是用多了之後,就會發現這種寫法的方便,它實際上是很是符合直覺的。
能夠看到,經過使用 numpy,咱們寫出了更簡單的代碼,可是性能確大幅提高,比純 Python 的寫法性能提高超過 10 倍。
那麼 numpy 的代碼還可以優化麼,答案是確定的,咱們經過一個叫 numexpr 的庫,來將 numpy 的多個操做合併成一個操做執行,來加速 numpy 的執行。
能夠看到,經過 numexpr 優化的代碼,性能比純 Python 代碼提高超過 25 倍。
此時的代碼運行已經至關快了,若是咱們手上有 GPU,那麼咱們能夠利用硬件來加速任務執行。
這裏必需要安利一個庫,叫 cupy,他提供了和 numpy 一致的 API,經過簡單的 import 替換,就能讓 numpy 代碼跑在英偉達的顯卡之上。
這時能夠看到,性能大幅提高超過 270 倍。真的很是誇張了。
爲了讓蒙特卡洛方法計算的結果更加精確,咱們把計算量擴大 1000 倍。會碰到什麼狀況呢?
沒錯,這就是你們不時碰到的,OutOfMemory,內存溢出。更慘的是,在 jupyter 裏,有時候內存溢出致使進程被殺,甚至會致使以前跑的所有結果都丟失。
蒙特卡洛方法仍是比較容易處理的,我把問題分解成 1000 個,每一個求解1千萬數據就行了嘛,寫個循環,作個彙總。但此時,整個計算的時間來到了12分鐘多,太慢了。
此時咱們會發現,整個運行過程當中,其實只有一個 CPU 在幹活,其餘核都在原地吆喝。那麼,咱們怎麼讓 numpy 並行化呢?
首先,numpy 裏有一些操做是能並行的,好比 tensordot 來作矩陣乘法,其餘大部分操做都不能利用多核。那麼,要將 numpy 並行化,咱們能夠:
蒙特卡洛方法算 pi 改寫成多線程和多進程實現仍是很是容易的,咱們寫一個函數來處理1千萬的數據,咱們把這個函數經過 concurrent.futures 的 ThreadPoolExecutor 和 ProcessPoolExecutor 來分別提交函數 1000 遍用多線程和多進程執行便可。能夠看到性能能提高到 2倍和3倍。
可是呢,蒙特卡洛求解 pi 自己就很容易手寫並行,考慮更復雜的狀況。
import numpy as np a = np.random.rand(100000, 100000) (a.dot(a.T) - a).std()
這裏建立了 10萬*10萬的矩陣 a,輸入就有大概 75G,咱們拿 a 矩陣乘 a 的轉置,再減去 a 自己,最終求標準差。這個任務的輸入數據就很難塞進內存,後續的手寫並行就更加困難。
這裏問題就引出來了,咱們須要什麼樣框架呢?
Mars 就是這樣一個框架,它的目標就是解決這幾個問題。目前 Mars 包括了 tensor :分佈式的多維矩陣計算。
100億大小的蒙特卡洛求解 pi的問題規模是 150G,它會致使 OOM。經過 Mars tensor API,只須要將 import numpy as np
替換成 import mars.tensor as mt
,後續的計算徹底一致。不過有一個不一樣,mars tensor 須要經過 execute
觸發執行,這樣作的好處是可以對整個中間過程作儘可能多的優化,好比操做合併等等。不過這種方式對 debug 不太友好,後續咱們會提供 eager mode,來對每一步操做都觸發計算,這樣就和 numpy 代碼徹底一致了。
能夠看到這個計算時間和手寫並行時間至關,峯值內存使用也只是 1個多G,所以能夠看到 Mars tensor 既能充分並行,又能節省內存的使用 。
目前,Mars 實現了 70% 的常見 numpy 接口,完整列表見 這裏。咱們一致在努力提供更多 numpy 和 scipy 的接口,咱們最近剛剛完成了對逆矩陣計算的支持。
Mars tensor 也提供了對 GPU 和稀疏矩陣的支持。eye
是建立單位對角矩陣,它只有對角線上有值爲1,若是用稠密的方式存儲會浪費存儲。不過目前,Mars tensor 還只支持二維稀疏矩陣。
和全部的 dataflow 的框架同樣,Mars 自己也有計算圖的概念,不一樣的是,Mars 包含粗粒度圖和細粒度圖的概念,用戶寫的代碼在客戶端生成粗粒度圖,在提交到服務端後,會有 tile 的過程,將粗粒度圖 tile 成細粒度圖,而後咱們會調度細粒度圖執行。
這裏,用戶寫下的代碼,在內存裏會表達成 Tensor 和 Operand 構成的粗粒度圖。
當用戶調用 execute
方法時,粗粒度的圖會被序列化到服務端,反序列化後,咱們會把這個圖 tile 成細粒度圖。對於輸入 10002000 的矩陣,假設指定每一個維度上的 chunk 大小都是 500,那它會被 tile 成 24 一共 8 個chunk。
後續,咱們會對每一個咱們實現的 operand 也就是算子提供 tile 的操做,將一個粗粒度的圖 tile 成細粒度圖。這時,咱們能夠看到,在單機,若是有8個核,那麼咱們就能夠並行執行整個細粒度圖;另外給定 1/8 大小的內存,咱們就能夠完成整個圖的計算。
不過,咱們在真正執行前,會對整個圖進行 fuse 也就是操做合併的優化,這裏的三個操做真正執行的時候,會被合併成一個算子。針對執行目標的不一樣,咱們會使用 numexpr 和 cupy 的 fuse 支持來分別對 CPU 和 GPU 進行操做合併執行。
上面的例子,都是咱們造出來很容易並行的任務。如咱們先前提到的例子,經過 tile 以後生成的細粒度圖實際上是很是複雜的。真實世界的計算場景,這樣的任務實際上是不少的。
爲了將這些複雜的細粒度圖可以充分調度執行,咱們必需要知足一些基本的準則,才能讓執行足夠高效。
首先,初始節點的分配很是重要。好比上圖,假設咱們有兩個 worker,若是咱們把 1和3 分配到一個 worker,而將 2和4 分配到另外一個 worker,這時當 5 或者 6 被調度的時候,他們就須要觸發遠程數據拉取,這樣執行效率會大打折扣。若是咱們一開始將 1和2 分配到一個 worker,將 3和4 分配到另外一個 worker,這時執行就會很是高效。初始節點的分配對總體的執行影響是很大的,這就須要咱們對整個細粒度的圖有個全局的掌握,咱們才能作到比較好的初始節點分配。
另外,深度優先執行的策略也是至關重要的。假設這時,咱們只有一個 worker,執行完 1和2 後,咱們調度了 3 的話,就會致使 1和2 的內存不能釋放,由於 5 此時尚未被觸發執行。可是,若是咱們執行完 1和2 後,調度了 5 執行,那麼當 5 執行完後,1和2 的內存就能夠釋放,這樣整個執行過程當中的內存就會是最省的。
因此,初始節點分配,以及深度優先執行是兩個最基本的準則,光有這兩點是遠遠不夠的,mars 的整個執行調度中有不少具備挑戰的任務,這也是咱們須要長期優化的對象。
因此,Mars 本質上實際上是一個細粒度的,異構圖的調度系統。咱們把細粒度的算子調度到各個機器上,在真正執行的時候實際上是調用 numpy、cupy、numexpr 等等的庫。咱們充分利用了成熟的、高度優化的單機庫,而不是重複在這些領域造輪子。
在這個過程當中,咱們會遇到一些難點:
咱們的解法是使用 Actor model。Actor模型定義了並行的方式,也就是一切皆 Actor,每一個 Actor 維護一個內部狀態,它們都持有郵箱,Actor 之間經過消息傳遞,消息收到會放在郵箱中,Actor 從郵箱中取消息進行處理,一個 Actor 同時只能處理一個消息。Actor 就是一個最小的並行單元,因爲一個 Actor 同時只能處理一個消息,你徹底不須要擔憂併發的問題,併發應當是 Actor 框架來處理的。而全部 Actor 是否是在同一臺機器上,這在 Actor 模型裏也變得不重要,Actor 在不一樣機器上,只要能完成消息的傳遞就能夠了,這樣 Actor 模型也自然支持分佈式系統。
由於 Actor 是最小的並行單元,咱們在寫代碼的時候,能夠將整個系統分解成不少 Actor,每一個 Actor 是單一職責的,這有點相似面向對象的思想,這樣讓咱們的代碼得以解耦。
另外,Master 解耦成 Actor 以後,咱們可讓這些 Actor 分佈在不一樣的機器上,這樣就讓 Master 再也不成爲單點。同時,咱們讓這些 Actor 根據一致性哈希來進行分配,後續若是有 scheduler 機器掛掉, Actor 能夠根據一致性哈希從新分配並從新建立來達到容錯的目的。
最後,咱們的 actors 是跑在多進程上的,每一個進程裏是不少的協程,這樣,咱們的 worker 也不會受到 GIL 的限制。
像 Scala 或者 Java 這些 JVM 語言 可使用 akka 這個 Actor 框架,對於 Python 來講,咱們並無什麼標準作法,咱們認爲咱們只是須要一個輕量的 Actor 框架就能夠知足咱們使用,咱們不須要 akka 裏面一些高階的功能。所以,咱們開發了 Mars actors,一個輕量的 Actor 框架,咱們 Mars 整個分佈式的 schedulers 和 workers 都在 Mars actors 層之上。
這是咱們 Mars actors 的架構圖,在啓動 Actor pool 的時候,咱們子進程會根據併發啓動若干子進程。主進程上有 socket handler 來接受遠程 socket 鏈接傳遞消息,另外主進程有個 Dispatcher 對象,用來根據消息的目的地來進行分發。咱們全部的 Actor 都在子進程上建立,當 Actor 收到一個消息處理時,咱們會經過協程調用 Actor.on_receive(message)
方法。
一個 Actor 發送消息到另外一個 Actor,分三種狀況。
因爲使用協程做爲子進程內的並行方式,而協程自己在 IO 處理上有很強的性能,因此,咱們的 Actor 框架在 IO 方面也會有很好的性能。
上圖是裸用 Mars actors 來求解蒙特卡洛方法算 pi。這裏定義兩個 Actor,一個 Actor 是 ChunkInside,它接受一個 chunk 的大小,來計算落在圓內點的個數;另一個 Actor 是 PiCaculator,它負責接受總的點個數,來建立 ChunkInside,這個例子就是直接建立 1000 個 ChunkInside,而後經過發送消息來觸發他們計算。create_actor
時指定 address 可讓 Actor 分配在不一樣的機器上。
這裏能夠看到,咱們裸用 Mars actors 的性能是要快過多進程版本的。
這裏咱們總結一下,經過使用 Mars actors,咱們能不受 GIL 限制,編寫分佈式代碼變得很是容易,它讓咱們 IO 變得高效,此外,由於 Actor 解耦,代碼也變得更容易維護。
如今讓咱們看下 Mars 分佈式的完整執行過程。如今有1個 client,3個 scheduler 和 5個worker。用戶建立一個 session,在服務端會建立一個 SessionActor 對象,經過一致性哈希,分配到 scheduler1 上。此時,用戶運行了一個 tensor,首先 SessionActor 會建立一個 GraphActor,它會 tile 粗粒度圖,圖上假設有三個節點,則會建立三個 OperandActor,分別分配到不一樣的 scheduler 上。每一個 OperandActor 會控制 operand 的提交、任務狀態的監督和內存的釋放等操做。此時 1 和 2 的 OperandActor 發現沒有依賴,而且集羣資源充足,那麼他們會把任務提交到相應的 worker 執行,在執行完成後,向 3 通知任務完成,3 發現 1和2 都執行完成後,由於數據在不一樣 worker 執行,決定好執行 worker 後,先觸發數據拉取操做,而後再執行。客戶端這邊,經過輪詢 GraphActor 得知任務完成,則會觸發數據拉取到本地的操做。整個任務就完成了。
咱們對 Mars 分佈式作了兩個 benchmark,第一個是對 36 億數據的每一個元素加一再乘以2,圖中紅色叉是 numpy 的執行時間,能夠看到,咱們比 numpy 有數倍提高,藍色的虛線是理論運行時間,能夠看到咱們真實加速很是接近理論時間加速。第二個 benchmark,咱們增長了數據量,來到 144 億數據,對每一個元素加1乘以2後,再求和,能夠看到單機 numpy 已經不能完成任務了,此時,針對這個任務,咱們也能夠取得不錯的加速比。
Mars 已經在 Github 上源代碼,讓更多同窗來一塊兒參與共建 Mars:https://github.com/mars-project/mars 。
在後續 Mars 的開發計劃上,如上文說,咱們會支持 eager mode,讓每一步觸發執行,提高對性能不敏感的任務開發以及 debug 時的使用體驗;咱們會支持更多 numpy 和 scipy 接口;後續很重要的一個是,咱們會提供 100% 兼容 pandas 的接口,因爲利用了 mars tensor 做爲基礎,咱們也能夠提供 GPU 的支持;咱們會提供兼容 scikit-learn 的機器學習的支持;咱們還會提供在細粒度圖上調度自定義函數和自定義類的功能,加強靈活性;最後,由於咱們客戶端其實並不依賴 Python,任意語言均可以序列化粗粒度圖,因此咱們徹底能夠提供多語言的客戶端版本,不過這點,咱們會視需求而定。
總之,開源對咱們是很重要的,龐大的 scipy 技術棧的並行化,光靠咱們的力量是不夠的,須要你們來一塊兒幫咱們來共建。
最後再 po 一點現場圖片吧,現場觀衆對 Mars 的問題仍是蠻多的。我大體總結下:
本文爲雲棲社區原創內容,未經容許不得轉載。