若是問 mapreduce 和 spark 什麼關係,或者說有什麼共同屬性,你可能會回答他們都是大數據處理引擎。若是問 spark 與 tensorflow 呢,就可能有點迷糊,這倆關注的領域不太同樣啊。可是再問 spark 與 MPI 呢?這個就更遠了。雖然這樣問多少有些不嚴謹,可是它們都有共同的一部分,這就是咱們今天談論的一個話題,一個比較大的話題:分佈式計算框架。python
不論是 mapreduce,仍是 spark 亦或 tensorflow,它們都是利用分佈式的能力,運行某些計算,解決一些特定的問題。從這個 level 講,它們都定義了一種「分佈式計算模型」,即提出了一種計算的方法,經過這種計算方法,就可以解決大量數據的分佈式計算問題。它們的區別在於提出的分佈式計算模型不一樣。Mapreduce 正如其名,是一個很基本的 map-reduce 式的計算模型(好像沒說同樣)。Spark 定義了一套 RDD 模型,本質上是一系列的 map/reduce 組成的一個 DAG 圖。Tensorflow 的計算模型也是一張圖,可是 tensorflow 的圖比起 spark 來,顯得更「複雜」一點。你須要爲圖中的每一個節點和邊做出定義。根據這些定義,能夠指導 tensorflow 如何計算這張圖。Tensorflow 的這種具體化的定義使它比較適合處理特定類型的的計算,對 tensorflow 來說就是神經網絡。而 spark 的 RDD 模型使它比較適合那種沒有相互關聯的的數據並行任務。那麼有沒有一種通用的、簡單的、性能還高的分佈式計算模型?我覺着挺難。通用每每意味着性能不能針對具體情形做出優化。而爲專門任務寫的分佈式任務又作不到通用,固然也作不到簡單。ios
插一句題外話,分佈式計算模型有一塊伴隨的內容,就是調度。雖然不怎麼受關注,但這是分佈式計算引擎必備的東西。mapreduce 的調度是 yarn,spark 的調度有本身內嵌的調度器,tensorflow 也同樣。MPI 呢?它的調度就是幾乎沒有調度,一切假設集羣有資源,靠 ssh 把全部任務拉起來。調度實際上應當分爲資源調度器和任務調度器。前者用於向一些資源管理者申請一些硬件資源,後者用於將計算圖中的任務下發到這些遠程資源進行計算,其實也就是所謂的兩階段調度。近年來有一些 TensorflowOnSpark 之類的項目。這類項目的本質其實是用 spark 的資源調度,加上 tensorflow 的計算模型。api
當咱們寫完一個單機程序,而面臨數據量上的問題的時候,一個天然的想法就是,我能不能讓它運行在分佈式的環境中?若是可以不加改動或稍加改動就能讓它分佈式化,那就太好了。固然現實是比較殘酷的。一般狀況下,對於一個通常性的程序,用戶須要本身手動編寫它的分佈式版本,利用好比 MPI 之類的框架,本身控制數據的分發、彙總,本身對任務的失敗作容災(一般沒有容災)。若是要處理的目標是剛好是對一批數據進行批量化處理,那麼 能夠用 mapreduce 或者 spark 預約義的 api。對於這一類任務,計算框架已經幫咱們把業務以外的部分(腳手架代碼)作好了。一樣的,若是咱們的任務是訓練一個神經網絡,那麼用 tensorflow pytorch 之類的框架就行了。這段話的意思是,若是你要處理的問題已經有了對應框架,那麼拿來用就行了。可是若是沒有呢?除了本身實現以外有沒有什麼別的辦法呢?服務器
今天注意到一個項目,Ray,聲稱你只須要稍微修改一下你的代碼,就能讓它變爲分佈式的(實際上這個項目早就發佈了,只是一直沒有刻意關注它)。固然這個代碼僅侷限於 python,好比下面這個例子,網絡
| **Basic Python** | **Distributed with Ray** | +------------------------------------------------+----------------------------------------------------+ | | | | # Execute f serially. | # Execute f in parallel. | | | | | | @ray.remote | | def f(): | def f(): | | time.sleep(1) | time.sleep(1) | | return 1 | return 1 | | | | | | | | | ray.init() | | results = [f() for i in range(4)] | results = ray.get([f.remote() for i in range(4)]) | +------------------------------------------------+----------------------------------------------------+
這麼簡單?這樣筆者想到了 openmp(注意不是 openmpi)。來看看,多線程
#include<iostream> #include"omp.h" using namespace std; void main() { #pragma omp parallel for for(int i = 0; i < 10; ++i) { cout << "Test" << endl; } system("pause"); }
把頭文件導入,添加一行預處理指令就能夠了,這段代碼立馬變爲並行執行。固然 openmp 不是分佈式,只是藉助編譯器將代碼中須要並行化的部分編譯爲多線程運行,自己仍是一個進程,所以其並行度收到 CPU 線程數量所限。若是 CPU 是雙線程,那隻能 2 倍加速。在一些服務器上,CPU 能夠是單核 32 線程,天然可以享受到 32 倍加速(被並行化的部分)。不過這些都不重要,在用戶看來,Ray 的這個作法和 openmp 是否是有幾分類似之處?你不須要作過多的代碼改動,就能將代碼變爲分佈式執行(固然 openmp 要更絕一點,由於對於不支持 openmp 的編譯器它就是一行註釋而已)。架構
那麼 Ray 是怎麼作到這一點的呢?其實 Ray 的作法提及來也比較簡單,就是定義了一些 API,相似於 MPI 中的定義的通訊原語。使用的時候,將這些 API 「注入」到代碼合適的位置,那麼代碼就變成了用戶代碼夾雜着一些 Ray 框架層的 API 調用,整個代碼實際上就造成了一張計算圖。接下來的事情就是等待 Ray 把這張計算圖完成返回就行了。Ray 的論文給了個例子:app
@ray.remote def create_policy(): # Initialize the policy randomly. return policy @ray.remote(num_gpus=1) class Simulator(object): def __init__(self): # Initialize the environment. self.env = Environment() def rollout(self, policy, num_steps): observations = [] observation = self.env.current_state() for _ in range(num_steps): action = policy(observation) observation = self.env.step(action) observations.append(observation) return observations @ray.remote(num_gpus=2) def update_policy(policy, *rollouts): # Update the policy. return policy @ray.remote def train_policy(): # Create a policy. policy_id = create_policy.remote() # Create 10 actors. simulators = [Simulator.remote() for _ in range(10)] # Do 100 steps of training. for _ in range(100): # Perform one rollout on each actor. rollout_ids = [s.rollout.remote(policy_id) for s in simulators] # Update the policy with the rollouts. policy_id = update_policy.remote(policy_id, *rollout_ids) return ray.get(policy_id)
生成的計算圖爲框架
因此,用戶要作的事情,就是在本身的代碼里加入適當的 Ray API 調用,而後本身的代碼就實際上變成了一張分佈式計算圖了。做爲對比,咱們再來看看 tensorflow 對圖的定義,dom
import tensorflow as tf # 建立數據流圖:y = W * x + b,其中W和b爲存儲節點,x爲數據節點。 x = tf.placeholder(tf.float32) W = tf.Variable(1.0) b = tf.Variable(1.0) y = W * x + b with tf.Session() as sess: tf.global_variables_initializer().run() # Operation.run fetch = y.eval(feed_dict={x: 3.0}) # Tensor.eval print(fetch) # fetch = 1.0 * 3.0 + 1.0 ''' 輸出: 4.0 '''
能夠看出,tensorflow 中是本身須要本身顯式的、明確的定義出圖的節點,placeholder Variable 等等(這些都是圖節點的具體類型),而 Ray 中圖是以一種隱式的方式定義的。我認爲後者是一種更天然的方式,站在開發者的角度看問題,而前者更像是爲了使用 tensorflow 把本身代碼邏輯去適配這個輪子。
那麼 ray 是否是就咱們要尋找的那個即通用、又簡單、還靈活的分佈式計算框架呢?因爲筆者沒有太多的 ray 的使用經驗,這個問題不太好說。從官方介紹來看,有限的幾個 API 確實是足夠簡單的。僅靠這幾個 API 能不能達成通用且靈活的目的還很差講。本質上來講,Tensorflow 對圖的定義也足夠 General,可是它並非一個通用的分佈式計算框架。因爲某些問題不在於框架,而在於問題自己的分佈式化就存在困難,因此試圖尋求一種通用分佈式計算框架解決單機問題多是個僞命題。
話扯遠了。假設 ray 可以讓咱們以一種比較容易的方式分佈式地執行程序,那麼會怎麼樣呢?前不久 Databricks 開源了一個新項目,Koalas,試圖以 RDD 的框架並行化 pandas。因爲 pandas 的場景是數據分析,和 spark 面對的場景相似,二者的底層存儲結構、概念也是很類似的,所以用 RDD 來分佈式化 pandas 也是可行的。我想,若是 ray 足夠簡單好用,在 pandas 里加一些 ray 的 api 調用花費的時間精力可能會遠遠小於開發一套 koalas。可是在 pandas 里加 ray 就把 pandas 綁定到了 ray 上,即使單機也是這樣,由於 ray 作不到像 openmp 那樣若是支持,很好,不支持也不影響代碼運行。
囉嗦這麼多,其實就想從這麼多引擎的細節中跳出來,思考一下到底什麼是分佈式計算框架,每種框架又是設計的,解決什麼問題,有什麼優缺點。最後拿大佬的一個觀點結束本文。David Patterson 在演講 「New Golden Age For Computer Architecture」 中提到,通用硬件愈來愈逼近極限,要想要達到更高的效率,咱們須要設計面向領域的架構(Domain Specific Architectures)。這是一個計算架構層出不窮的時代,每種架構都是爲了解決其面對的領域問題出現的,必然包含對其問題的特殊優化。通用性不是用戶解決問題的出發點,而更多的是框架設計者的「一廂情願」,用戶關注的永遠是領域問題。從這個意義上講,面向領域的計算架構應該纔是正確的方向。
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。