漫談分佈式計算框架

若是問 mapreduce 和 spark 什麼關係,或者說有什麼共同屬性,你可能會回答他們都是大數據處理引擎。若是問 spark 與 tensorflow 呢,就可能有點迷糊,這倆關注的領域不太同樣啊。可是再問 spark 與 MPI 呢?這個就更遠了。雖然這樣問多少有些不嚴謹,可是它們都有共同的一部分,這就是咱們今天談論的一個話題,一個比較大的話題:分佈式計算框架。python

不論是 mapreduce,仍是 spark 亦或 tensorflow,它們都是利用分佈式的能力,運行某些計算,解決一些特定的問題。從這個 level 講,它們都定義了一種「分佈式計算模型」,即提出了一種計算的方法,經過這種計算方法,就可以解決大量數據的分佈式計算問題。它們的區別在於提出的分佈式計算模型不一樣。Mapreduce 正如其名,是一個很基本的 map-reduce 式的計算模型(好像沒說同樣)。Spark 定義了一套 RDD 模型,本質上是一系列的 map/reduce 組成的一個 DAG 圖。Tensorflow 的計算模型也是一張圖,可是 tensorflow 的圖比起 spark 來,顯得更「複雜」一點。你須要爲圖中的每一個節點和邊做出定義。根據這些定義,能夠指導 tensorflow 如何計算這張圖。Tensorflow 的這種具體化的定義使它比較適合處理特定類型的的計算,對 tensorflow 來說就是神經網絡。而 spark 的 RDD 模型使它比較適合那種沒有相互關聯的的數據並行任務。那麼有沒有一種通用的、簡單的、性能還高的分佈式計算模型?我覺着挺難。通用每每意味着性能不能針對具體情形做出優化。而爲專門任務寫的分佈式任務又作不到通用,固然也作不到簡單。ios

插一句題外話,分佈式計算模型有一塊伴隨的內容,就是調度。雖然不怎麼受關注,但這是分佈式計算引擎必備的東西。mapreduce 的調度是 yarn,spark 的調度有本身內嵌的調度器,tensorflow 也同樣。MPI 呢?它的調度就是幾乎沒有調度,一切假設集羣有資源,靠 ssh 把全部任務拉起來。調度實際上應當分爲資源調度器和任務調度器。前者用於向一些資源管理者申請一些硬件資源,後者用於將計算圖中的任務下發到這些遠程資源進行計算,其實也就是所謂的兩階段調度。近年來有一些 TensorflowOnSpark 之類的項目。這類項目的本質其實是用 spark 的資源調度,加上 tensorflow 的計算模型。api

當咱們寫完一個單機程序,而面臨數據量上的問題的時候,一個天然的想法就是,我能不能讓它運行在分佈式的環境中?若是可以不加改動或稍加改動就能讓它分佈式化,那就太好了。固然現實是比較殘酷的。一般狀況下,對於一個通常性的程序,用戶須要本身手動編寫它的分佈式版本,利用好比 MPI 之類的框架,本身控制數據的分發、彙總,本身對任務的失敗作容災(一般沒有容災)。若是要處理的目標是剛好是對一批數據進行批量化處理,那麼 能夠用 mapreduce 或者 spark 預約義的 api。對於這一類任務,計算框架已經幫咱們把業務以外的部分(腳手架代碼)作好了。一樣的,若是咱們的任務是訓練一個神經網絡,那麼用 tensorflow pytorch 之類的框架就行了。這段話的意思是,若是你要處理的問題已經有了對應框架,那麼拿來用就行了。可是若是沒有呢?除了本身實現以外有沒有什麼別的辦法呢?服務器

今天注意到一個項目,Ray,聲稱你只須要稍微修改一下你的代碼,就能讓它變爲分佈式的(實際上這個項目早就發佈了,只是一直沒有刻意關注它)。固然這個代碼僅侷限於 python,好比下面這個例子,網絡

| **Basic Python**                               | **Distributed with Ray**                           |
+------------------------------------------------+----------------------------------------------------+
|                                                |                                                    |
|  # Execute f serially.                         |  # Execute f in parallel.                          |
|                                                |                                                    |
|                                                |  @ray.remote                                       |
|  def f():                                      |  def f():                                          |
|      time.sleep(1)                             |      time.sleep(1)                                 |
|      return 1                                  |      return 1                                      |
|                                                |                                                    |
|                                                |                                                    |
|                                                |  ray.init()                                        |
|  results = [f() for i in range(4)]             |  results = ray.get([f.remote() for i in range(4)]) |
+------------------------------------------------+----------------------------------------------------+

這麼簡單?這樣筆者想到了 openmp(注意不是 openmpi)。來看看,多線程

#include<iostream>
#include"omp.h"

using namespace std;

void main() {
#pragma omp parallel for
    for(int i = 0; i < 10; ++i) {
        cout << "Test" << endl;
    }
    system("pause");
}

把頭文件導入,添加一行預處理指令就能夠了,這段代碼立馬變爲並行執行。固然 openmp 不是分佈式,只是藉助編譯器將代碼中須要並行化的部分編譯爲多線程運行,自己仍是一個進程,所以其並行度收到 CPU 線程數量所限。若是 CPU 是雙線程,那隻能 2 倍加速。在一些服務器上,CPU 能夠是單核 32 線程,天然可以享受到 32 倍加速(被並行化的部分)。不過這些都不重要,在用戶看來,Ray 的這個作法和 openmp 是否是有幾分類似之處?你不須要作過多的代碼改動,就能將代碼變爲分佈式執行(固然 openmp 要更絕一點,由於對於不支持 openmp 的編譯器它就是一行註釋而已)。架構

那麼 Ray 是怎麼作到這一點的呢?其實 Ray 的作法提及來也比較簡單,就是定義了一些 API,相似於 MPI 中的定義的通訊原語。使用的時候,將這些 API 「注入」到代碼合適的位置,那麼代碼就變成了用戶代碼夾雜着一些 Ray 框架層的 API 調用,整個代碼實際上就造成了一張計算圖。接下來的事情就是等待 Ray 把這張計算圖完成返回就行了。Ray 的論文給了個例子:app

@ray.remote
def create_policy():
    # Initialize the policy randomly.
    return policy
@ray.remote(num_gpus=1)
class Simulator(object):
    def __init__(self):
        # Initialize the environment.
        self.env = Environment()
    def rollout(self, policy, num_steps):
        observations = []
        observation = self.env.current_state()
        for _ in range(num_steps):
            action = policy(observation)
            observation = self.env.step(action)
            observations.append(observation)
        return observations
@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
    # Update the policy.
    return policy
@ray.remote
def train_policy():
    # Create a policy.
    policy_id = create_policy.remote()
    # Create 10 actors.
    simulators = [Simulator.remote() for _ in range(10)]
    # Do 100 steps of training.
    for _ in range(100):
        # Perform one rollout on each actor.
        rollout_ids = [s.rollout.remote(policy_id)
        for s in simulators]
        # Update the policy with the rollouts.
        policy_id = update_policy.remote(policy_id, *rollout_ids)
    return ray.get(policy_id)

生成的計算圖爲框架

因此,用戶要作的事情,就是在本身的代碼里加入適當的 Ray API 調用,而後本身的代碼就實際上變成了一張分佈式計算圖了。做爲對比,咱們再來看看 tensorflow 對圖的定義,dom

import tensorflow as tf
# 建立數據流圖:y = W * x + b,其中W和b爲存儲節點,x爲數據節點。
x = tf.placeholder(tf.float32)
W = tf.Variable(1.0)
b = tf.Variable(1.0)
y = W * x + b
with tf.Session() as sess:
    tf.global_variables_initializer().run() # Operation.run
    fetch = y.eval(feed_dict={x: 3.0})      # Tensor.eval
    print(fetch)                            # fetch = 1.0 * 3.0 + 1.0
'''
輸出:
4.0
'''

能夠看出,tensorflow 中是本身須要本身顯式的、明確的定義出圖的節點,placeholder Variable 等等(這些都是圖節點的具體類型),而 Ray 中圖是以一種隱式的方式定義的。我認爲後者是一種更天然的方式,站在開發者的角度看問題,而前者更像是爲了使用 tensorflow 把本身代碼邏輯去適配這個輪子。

那麼 ray 是否是就咱們要尋找的那個即通用、又簡單、還靈活的分佈式計算框架呢?因爲筆者沒有太多的 ray 的使用經驗,這個問題不太好說。從官方介紹來看,有限的幾個 API 確實是足夠簡單的。僅靠這幾個 API 能不能達成通用且靈活的目的還很差講。本質上來講,Tensorflow 對圖的定義也足夠 General,可是它並非一個通用的分佈式計算框架。因爲某些問題不在於框架,而在於問題自己的分佈式化就存在困難,因此試圖尋求一種通用分佈式計算框架解決單機問題多是個僞命題。

話扯遠了。假設 ray 可以讓咱們以一種比較容易的方式分佈式地執行程序,那麼會怎麼樣呢?前不久 Databricks 開源了一個新項目,Koalas,試圖以 RDD 的框架並行化 pandas。因爲 pandas 的場景是數據分析,和 spark 面對的場景相似,二者的底層存儲結構、概念也是很類似的,所以用 RDD 來分佈式化 pandas 也是可行的。我想,若是 ray 足夠簡單好用,在 pandas 里加一些 ray 的 api 調用花費的時間精力可能會遠遠小於開發一套 koalas。可是在 pandas 里加 ray 就把 pandas 綁定到了 ray 上,即使單機也是這樣,由於 ray 作不到像 openmp 那樣若是支持,很好,不支持也不影響代碼運行。

囉嗦這麼多,其實就想從這麼多引擎的細節中跳出來,思考一下到底什麼是分佈式計算框架,每種框架又是設計的,解決什麼問題,有什麼優缺點。最後拿大佬的一個觀點結束本文。David Patterson 在演講 「New Golden Age For Computer Architecture」 中提到,通用硬件愈來愈逼近極限,要想要達到更高的效率,咱們須要設計面向領域的架構(Domain Specific Architectures)。這是一個計算架構層出不窮的時代,每種架構都是爲了解決其面對的領域問題出現的,必然包含對其問題的特殊優化。通用性不是用戶解決問題的出發點,而更多的是框架設計者的「一廂情願」,用戶關注的永遠是領域問題。從這個意義上講,面向領域的計算架構應該纔是正確的方向。


原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索