高性能分佈式執行框架——Ray

Ray是UC Berkeley RISELab新推出的高性能分佈式執行框架,它使用了和傳統分佈式計算系統不同的架構和對分佈式計算的抽象方式,具備比Spark更優異的計算性能。html

Ray目前還處於實驗室階段,最新版本爲0.2.2版本。雖然Ray自稱是面向AI應用的分佈式計算框架,可是它的架構具備通用的分佈式計算抽象。本文對Ray進行簡單的介紹,幫助你們更快地瞭解Ray是什麼,若有描述不當的地方,歡迎不吝指正。node

1、簡單開始

首先來看一下最簡單的Ray程序是如何編寫的。python

# 導入ray,並初始化執行環境
import ray
ray.init()

# 定義ray remote函數
@ray.remote
def hello():
    return "Hello world !"

# 異步執行remote函數,返回結果id
object_id = hello.remote()

# 同步獲取計算結果
hello = ray.get(object_id)

# 輸出計算結果
print hello

在Ray裏,經過Python註解@ray.remote定義remote函數。使用此註解聲明的函數都會自帶一個默認的方法remote,經過此方法發起的函數調用都是以提交分佈式任務的方式異步執行的,函數的返回值是一個對象id,使用ray.get內置操做能夠同步獲取該id對應的對象。熟悉Java裏的Future機制的話對此應該並不陌生,或許會有人疑惑這和普通的異步函數調用沒什麼大的區別,可是這裏最大的差別是,函數hello是分佈式異步執行的。git

remote函數是Ray分佈式計算抽象中的核心概念,經過它開發者擁有了動態定製計算依賴(任務DAG)的能力。好比:github

@ray.remote
def A():
    return "A"

@ray.remote
def B():
    return "B"

@ray.remote
def C(a, b):
    return "C"

a_id = A.remote()
b_id = B.remote()
c_id = C.remote(a_id, b_id)
print ray.get(c_id)

例子代碼中,對函數A、B的調用是徹底並行執行的,可是對函數C的調用依賴於A、B函數的返回結果。Ray能夠保證函數C須要等待A、B函數的結果然正計算出來後纔會執行。若是將函數A、B、C類比爲DAG的節點的話,那麼DAG的邊就是函數C參數對函數A、B計算結果的依賴,自由的函數調用方式容許Ray能夠自由地定製DAG的結構和計算依賴關係。另外,說起一點的是Python的函數能夠定義函數具備多個返回值,這也使得Python的函數更自然具有了DAG節點多入和多出的特色。web

2、系統架構

Ray是使用什麼樣的架構對分佈式計算作出如上抽象的呢,一下給出了Ray的系統架構(來自Ray論文,參考文獻1)。redis

做爲分佈式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全局協調和狀態維護,Slave執行分佈式計算任務。不過和傳統的分佈式計算系統不一樣的是,Ray使用了混合任務調度的思路。在集羣部署模式下,Ray啓動了如下關鍵組件:數組

  1. GlobalScheduler:Master上啓動了一個全局調度器,用於接收本地調度器提交的任務,並將任務分發給合適的本地任務調度器執行。
  2. RedisServer:Master上啓動了一到多個RedisServer用於保存分佈式任務的狀態信息(ControlState),包括對象機器的映射、任務描述、任務debug信息等。
  3. LocalScheduler:每一個Slave上啓動了一個本地調度器,用於提交任務到全局調度器,以及分配任務給當前機器的Worker進程。
  4. Worker:每一個Slave上能夠啓動多個Worker進程執行分佈式任務,並將計算結果存儲到ObjectStore。
  5. ObjectStore:每一個Slave上啓動了一個ObjectStore存儲只讀數據對象,Worker能夠經過共享內存的方式訪問這些對象數據,這樣能夠有效地減小內存拷貝和對象序列化成本。ObjectStore底層由Apache Arrow實現。
  6. Plasma:每一個Slave上的ObjectStore都由一個名爲Plasma的對象管理器進行管理,它能夠在Worker訪問本地ObjectStore上不存在的遠程數據對象時,主動拉取其它Slave上的對象數據到當前機器。

須要說明的是,Ray的論文中說起,全局調度器能夠啓動一到多個,而目前Ray的實現文檔裏討論的內容都是基於一個全局調度器的狀況。我猜想多是Ray尚在建設中,一些機制還未完善,後續讀者能夠留意此處的細節變化。瀏覽器

Ray的任務也是經過相似Spark中Driver的概念的方式進行提交的,有所不一樣的是:安全

  1. Spark的Driver提交的是任務DAG,一旦提交則不可更改。
  2. 而Ray提交的是更細粒度的remote function,任務DAG依賴關係由函數依賴關係自由定製。

論文給出的架構圖裏並未畫出Driver的概念,所以我在其基礎上作了一些修改和擴充。

Ray的Driver節點和和Slave節點啓動的組件幾乎相同,不過卻有如下區別:

  1. Driver上的工做進程DriverProcess通常只有一個,即用戶啓動的PythonShell。Slave能夠根據須要建立多個WorkerProcess。
  2. Driver只能提交任務,卻不能接收來自全局調度器分配的任務。Slave能夠提交任務,也能夠接收全局調度器分配的任務。
  3. Driver能夠主動繞過全局調度器給Slave發送Actor調用任務(此處設計是否合理尚不討論)。Slave只能接收全局調度器分配的計算任務。

3、核心操做

基於以上架構,咱們簡單討論一下Ray中關鍵的操做和流程。

1. ray.init()

在PythonShell中,使用ray.init()能夠在本地啓動ray,包括Driver、HeadNode(Master)和若干Slave。

import ray
ray.init()

若是是直連已有的Ray集羣,只須要指定RedisServer的地址便可。

ray.init(redis_address="<redis-address>")

本地啓動Ray獲得的輸出以下:

>>> ray.init()
Waiting for redis server at 127.0.0.1:58807 to respond...
Waiting for redis server at 127.0.0.1:23148 to respond...
Allowing the Plasma store to use up to 13.7439GB of memory.
Starting object store with directory /tmp and huge page support disabled
Starting local scheduler with 8 CPUs, 0 GPUs

======================================================================
View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5
======================================================================

{'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'}
>>>

本地啓動Ray時,能夠看到Ray的WebUI的訪問地址。

2. ray.put()

使用ray.put()能夠將Python對象存入本地ObjectStore,而且異步返回一個惟一的ObjectID。經過該ID,Ray能夠訪問集羣中任一個節點上的對象(遠程對象經過查閱Master的對象表得到)。

對象一旦存入ObjectStore便不可更改,Ray的remote函數能夠將直接將該對象的ID做爲參數傳入。使用ObjectID做爲remote函數參數,能夠有效地減小函數參數的寫ObjectStore的次數。

@ray.remote
def f(x):
    pass

x = "hello"

# 對象x往ObjectStore拷貝里10次
[f.remote(x) for _ in range(10)]

# 對象x僅往ObjectStore拷貝1次
x_id = ray.put(x)
[f.remote(x_id) for _ in range(10)]

3. ray.get()

使用ray.get()能夠經過ObjectID獲取ObjectStore內的對象並將之轉換爲Python對象。對於數組類型的對象,Ray使用共享內存機制減小數據的拷貝成本。而對於其它對象則須要將數據從ObjectStore拷貝到進程的堆內存中。

若是調用ray.get()操做時,對象還沒有建立好,則get操做會阻塞,直到對象建立完成後返回。get操做的關鍵流程以下:

  1. Driver或者Worker進程首先到ObjectStore內請求ObjectID對應的對象數據。
  2. 若是本地ObjectStore沒有對應的對象數據,本地對象管理器Plasma會檢查Master上的對象表查看對象是否存儲其它節點的ObjectStore。
  3. 若是對象數據在其它節點的ObjectStore內,Plasma會發送網絡請求將對象數據拉到本地ObjectStore。
  4. 若是對象數據尚未建立好,Master會在對象建立完成後通知請求的Plasma讀取。
  5. 若是對象數據已經被全部的ObjectStore移除(被LRU策略刪除),本地調度器會根據任務血緣關係執行對象的從新建立工做。
  6. 一旦對象數據在本地ObjectStore可用,Driver或者Worker進程會經過共享內存的方式直接將對象內存區域映射到本身的進程地址空間中,並反序列化爲Python對象。

另外,ray.get()能夠一次性讀取多個對象的數據:

result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

4. @ray.remote

Ray中使用註解@ray.remote能夠聲明一個remote function。remote函數時Ray的基本任務調度單元,remote函數定義後會當即被序列化存儲到RedisServer中,而且分配了一個惟一的ID,這樣就保證了集羣的全部節點均可以看到這個函數的定義。

不過,這樣對remote函數定義有了一個潛在的要求,即remote函數內若是調用了其它的用戶函數,則必須提早定義,不然remote函數沒法找到對應的函數定義內容。

remote函數內也能夠調用其它的remote函數,Driver和Slave每次調用remote函數時,其實都是向集羣提交了一個計算任務,從這裏也能夠看到Ray的分佈式計算的自由性。

Ray中調用remote函數的關鍵流程以下:

  1. 調用remote函數時,首先會建立一個任務對象,它包含了函數的ID、參數的ID或者值(Python的基本對象直接傳值,複雜對象會先經過ray.put()操做存入ObjectStore而後返回ObjectID)、函數返回值對象的ID。
  2. 任務對象被髮送到本地調度器。
  3. 本地調度器決定任務對象是在本地調度仍是發送給全局調度器。若是任務對象的依賴(參數)在本地的ObejctStore已經存在且本地的CPU和GPU計算資源充足,那麼本地調度器將任務分配給本地的WorkerProcess執行。不然,任務對象被髮送給全局調度器並存儲到任務表(TaskTable)中,全局調度器根據當前的任務狀態信息決定將任務發給集羣中的某一個本地調度器。
  4. 本地調度器收到任務對象後(來自本地的任務或者全局調度分配的任務),會將其放入一個任務隊列中,等待計算資源和本地依賴知足後分配給WorkerProcess執行。
  5. Worker收到任務對象後執行該任務,並將函數返回值存入ObjectStore,並更新Master的對象表(ObjectTable)信息。

@ray.remote註解有一個參數num_return_vals用於聲明remote函數的返回值個數,基於此實現remote函數的多返回值機制。

@ray.remote(num_return_vals=2)
def f():
    return 1, 2

x_id, y_id = f.remote()
ray.get(x_id)  # 1
ray.get(y_id)  # 2

@ray.remote註解的另外一個參數num_gpus能夠爲任務指定GPU的資源。使用內置函數ray.get_gpu_ids()能夠獲取當前任務可使用的GPU信息。

@ray.remote(num_gpus=1)
def gpu_method():
    return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())

5. ray.wait()

ray.wait()操做支持批量的任務等待,基於此能夠實現一次性獲取多個ObjectID對應的數據。

# 啓動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,超時時間爲2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)

上述例子中,results包含了5個ObjectID,使用ray.wait操做能夠一直等待有4個任務完成後返回,並將完成的數據對象放在第一個list類型返回值內,未完成的ObjectID放在第二個list返回值內。若是設置了超時時間,那麼在超時時間結束後仍未等到預期的返回值個數,則已超時完成時的返回值爲準。

6. ray.error_info()

使用ray.error_info()能夠獲取任務執行時產生的錯誤信息。

>>> import time
>>> @ray.remote
>>> def f():
>>>     time.sleep(5)
>>>     raise Exception("This task failed!!")
>>> f.remote()
Remote function __main__.f failed with:

Traceback (most recent call last):
  File "<stdin>", line 4, in f
Exception: This task failed!!


  You can inspect errors by running

      ray.error_info()

  If this driver is hanging, start a new one with

      ray.init(redis_address="127.0.0.1:65452")
>>> ray.error_info()
[{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n  File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]

7. Actor

Ray的remote函數只能處理無狀態的計算需求,有狀態的計算需求須要使用Ray的Actor實現。在Python的class定義前使用@ray.remote能夠聲明Actor。

@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

使用以下方式建立Actor對象。

a1 = Counter.remote()
a2 = Counter.remote()

Ray建立Actor的流程爲:

  1. Master選取一個Slave,並將Actor建立任務分發給它的本地調度器。
  2. 建立Actor對象,並執行它的構造函數。

從流程能夠看出,Actor對象的建立時並行的。

經過調用Actor對象的方法使用Actor。

a1.increment.remote()  # ray.get returns 1
a2.increment.remote()  # ray.get returns 1

調用Actor對象的方法的流程爲:

  1. 首先建立一個任務。
  2. 該任務被Driver直接分配到建立該Actor對應的本地執行器執行,這個操做繞開了全局調度器(Worker是否也可使用Actor直接分配任務尚存疑問)。
  3. 返回Actor方法調用結果的ObjectID。

爲了保證Actor狀態的一致性,對同一個Actor的方法調用是串行執行的。

4、安裝Ray

若是隻是使用Ray,可使用以下命令直接安裝。

pip intall ray

若是須要編譯Ray的最新源碼進行安裝,按照以下步驟進行(MaxOS):

# 更新編譯依賴包
brew update
brew install cmake pkg-config automake autoconf libtool boost wget
pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six
# 下載源碼編譯安裝
git clone https://github.com/ray-project/ray.git
cd ray/python
python setup.py install
# 測試
python test/runtest.py

# 安裝WebUI須要的庫[可選]
pip install jupyter ipywidgets bokeh

# 編譯Ray文檔[可選]
cd ray/doc
pip install -r requirements-doc.txt
make html
open _build/html/index.html

我在MacOS上安裝jupyter時,遇到了Python的setuptools庫沒法升級的狀況,緣由是MacOS的安全性設置問題,可使用以下方式解決:

  1. 重啓電腦,啓動時按住Command+R進入Mac保護模式。
  2. 打開命令行,輸入命令csrutils disable關閉系統安全策略。
  3. 重啓電腦,繼續安裝jupyter。
  4. 安裝完成後,重複如上的方式執行csrutils enable,再次重啓便可。

進入PythonShell,輸入代碼本地啓動Ray:

import ray
ray.init()

瀏覽器內打開WebUI界面以下:

參考資料

  1. Ray論文:Real-Time Machine Learning: The Missing Pieces
  2. Ray開發手冊:http://ray.readthedocs.io/en/latest/index.html
  3. Ray源代碼:https://github.com/ray-project/ray
相關文章
相關標籤/搜索