Ray是UC Berkeley RISELab新推出的高性能分佈式執行框架,它使用了和傳統分佈式計算系統不同的架構和對分佈式計算的抽象方式,具備比Spark更優異的計算性能。html
Ray目前還處於實驗室階段,最新版本爲0.2.2版本。雖然Ray自稱是面向AI應用的分佈式計算框架,可是它的架構具備通用的分佈式計算抽象。本文對Ray進行簡單的介紹,幫助你們更快地瞭解Ray是什麼,若有描述不當的地方,歡迎不吝指正。node
首先來看一下最簡單的Ray程序是如何編寫的。python
# 導入ray,並初始化執行環境 import ray ray.init() # 定義ray remote函數 @ray.remote def hello(): return "Hello world !" # 異步執行remote函數,返回結果id object_id = hello.remote() # 同步獲取計算結果 hello = ray.get(object_id) # 輸出計算結果 print hello
在Ray裏,經過Python註解@ray.remote
定義remote函數。使用此註解聲明的函數都會自帶一個默認的方法remote
,經過此方法發起的函數調用都是以提交分佈式任務的方式異步執行的,函數的返回值是一個對象id,使用ray.get
內置操做能夠同步獲取該id對應的對象。熟悉Java裏的Future機制的話對此應該並不陌生,或許會有人疑惑這和普通的異步函數調用沒什麼大的區別,可是這裏最大的差別是,函數hello是分佈式異步執行的。git
remote函數是Ray分佈式計算抽象中的核心概念,經過它開發者擁有了動態定製計算依賴(任務DAG)的能力。好比:github
@ray.remote def A(): return "A" @ray.remote def B(): return "B" @ray.remote def C(a, b): return "C" a_id = A.remote() b_id = B.remote() c_id = C.remote(a_id, b_id) print ray.get(c_id)
例子代碼中,對函數A、B的調用是徹底並行執行的,可是對函數C的調用依賴於A、B函數的返回結果。Ray能夠保證函數C須要等待A、B函數的結果然正計算出來後纔會執行。若是將函數A、B、C類比爲DAG的節點的話,那麼DAG的邊就是函數C參數對函數A、B計算結果的依賴,自由的函數調用方式容許Ray能夠自由地定製DAG的結構和計算依賴關係。另外,說起一點的是Python的函數能夠定義函數具備多個返回值,這也使得Python的函數更自然具有了DAG節點多入和多出的特色。web
Ray是使用什麼樣的架構對分佈式計算作出如上抽象的呢,一下給出了Ray的系統架構(來自Ray論文,參考文獻1)。redis
做爲分佈式計算系統,Ray仍舊遵循了典型的Master-Slave的設計:Master負責全局協調和狀態維護,Slave執行分佈式計算任務。不過和傳統的分佈式計算系統不一樣的是,Ray使用了混合任務調度的思路。在集羣部署模式下,Ray啓動了如下關鍵組件:數組
須要說明的是,Ray的論文中說起,全局調度器能夠啓動一到多個,而目前Ray的實現文檔裏討論的內容都是基於一個全局調度器的狀況。我猜想多是Ray尚在建設中,一些機制還未完善,後續讀者能夠留意此處的細節變化。瀏覽器
Ray的任務也是經過相似Spark中Driver的概念的方式進行提交的,有所不一樣的是:安全
論文給出的架構圖裏並未畫出Driver的概念,所以我在其基礎上作了一些修改和擴充。
Ray的Driver節點和和Slave節點啓動的組件幾乎相同,不過卻有如下區別:
基於以上架構,咱們簡單討論一下Ray中關鍵的操做和流程。
在PythonShell中,使用ray.init()
能夠在本地啓動ray,包括Driver、HeadNode(Master)和若干Slave。
import ray ray.init()
若是是直連已有的Ray集羣,只須要指定RedisServer的地址便可。
ray.init(redis_address="<redis-address>")
本地啓動Ray獲得的輸出以下:
>>> ray.init() Waiting for redis server at 127.0.0.1:58807 to respond... Waiting for redis server at 127.0.0.1:23148 to respond... Allowing the Plasma store to use up to 13.7439GB of memory. Starting object store with directory /tmp and huge page support disabled Starting local scheduler with 8 CPUs, 0 GPUs ====================================================================== View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5 ====================================================================== {'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'} >>>
本地啓動Ray時,能夠看到Ray的WebUI的訪問地址。
使用ray.put()
能夠將Python對象存入本地ObjectStore,而且異步返回一個惟一的ObjectID。經過該ID,Ray能夠訪問集羣中任一個節點上的對象(遠程對象經過查閱Master的對象表得到)。
對象一旦存入ObjectStore便不可更改,Ray的remote函數能夠將直接將該對象的ID做爲參數傳入。使用ObjectID做爲remote函數參數,能夠有效地減小函數參數的寫ObjectStore的次數。
@ray.remote def f(x): pass x = "hello" # 對象x往ObjectStore拷貝里10次 [f.remote(x) for _ in range(10)] # 對象x僅往ObjectStore拷貝1次 x_id = ray.put(x) [f.remote(x_id) for _ in range(10)]
使用ray.get()
能夠經過ObjectID獲取ObjectStore內的對象並將之轉換爲Python對象。對於數組類型的對象,Ray使用共享內存機制減小數據的拷貝成本。而對於其它對象則須要將數據從ObjectStore拷貝到進程的堆內存中。
若是調用ray.get()
操做時,對象還沒有建立好,則get操做會阻塞,直到對象建立完成後返回。get操做的關鍵流程以下:
另外,ray.get()
能夠一次性讀取多個對象的數據:
result_ids = [ray.put(i) for i in range(10)] ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Ray中使用註解@ray.remote
能夠聲明一個remote function。remote函數時Ray的基本任務調度單元,remote函數定義後會當即被序列化存儲到RedisServer中,而且分配了一個惟一的ID,這樣就保證了集羣的全部節點均可以看到這個函數的定義。
不過,這樣對remote函數定義有了一個潛在的要求,即remote函數內若是調用了其它的用戶函數,則必須提早定義,不然remote函數沒法找到對應的函數定義內容。
remote函數內也能夠調用其它的remote函數,Driver和Slave每次調用remote函數時,其實都是向集羣提交了一個計算任務,從這裏也能夠看到Ray的分佈式計算的自由性。
Ray中調用remote函數的關鍵流程以下:
ray.put()
操做存入ObjectStore而後返回ObjectID)、函數返回值對象的ID。@ray.remote
註解有一個參數num_return_vals
用於聲明remote函數的返回值個數,基於此實現remote函數的多返回值機制。
@ray.remote(num_return_vals=2) def f(): return 1, 2 x_id, y_id = f.remote() ray.get(x_id) # 1 ray.get(y_id) # 2
@ray.remote
註解的另外一個參數num_gpus
能夠爲任務指定GPU的資源。使用內置函數ray.get_gpu_ids()
能夠獲取當前任務可使用的GPU信息。
@ray.remote(num_gpus=1) def gpu_method(): return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
ray.wait()
操做支持批量的任務等待,基於此能夠實現一次性獲取多個ObjectID對應的數據。
# 啓動5個remote函數調用任務 results = [f.remote(i) for i in range(5)] # 阻塞等待4個任務完成,超時時間爲2.5s ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
上述例子中,results包含了5個ObjectID,使用ray.wait
操做能夠一直等待有4個任務完成後返回,並將完成的數據對象放在第一個list類型返回值內,未完成的ObjectID放在第二個list返回值內。若是設置了超時時間,那麼在超時時間結束後仍未等到預期的返回值個數,則已超時完成時的返回值爲準。
使用ray.error_info()能夠獲取任務執行時產生的錯誤信息。
>>> import time >>> @ray.remote >>> def f(): >>> time.sleep(5) >>> raise Exception("This task failed!!") >>> f.remote() Remote function __main__.f failed with: Traceback (most recent call last): File "<stdin>", line 4, in f Exception: This task failed!! You can inspect errors by running ray.error_info() If this driver is hanging, start a new one with ray.init(redis_address="127.0.0.1:65452") >>> ray.error_info() [{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]
Ray的remote函數只能處理無狀態的計算需求,有狀態的計算需求須要使用Ray的Actor實現。在Python的class定義前使用@ray.remote
能夠聲明Actor。
@ray.remote class Counter(object): def __init__(self): self.value = 0 def increment(self): self.value += 1 return self.value
使用以下方式建立Actor對象。
a1 = Counter.remote() a2 = Counter.remote()
Ray建立Actor的流程爲:
從流程能夠看出,Actor對象的建立時並行的。
經過調用Actor對象的方法使用Actor。
a1.increment.remote() # ray.get returns 1 a2.increment.remote() # ray.get returns 1
調用Actor對象的方法的流程爲:
爲了保證Actor狀態的一致性,對同一個Actor的方法調用是串行執行的。
若是隻是使用Ray,可使用以下命令直接安裝。
pip intall ray
若是須要編譯Ray的最新源碼進行安裝,按照以下步驟進行(MaxOS):
# 更新編譯依賴包 brew update brew install cmake pkg-config automake autoconf libtool boost wget pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six # 下載源碼編譯安裝 git clone https://github.com/ray-project/ray.git cd ray/python python setup.py install # 測試 python test/runtest.py # 安裝WebUI須要的庫[可選] pip install jupyter ipywidgets bokeh # 編譯Ray文檔[可選] cd ray/doc pip install -r requirements-doc.txt make html open _build/html/index.html
我在MacOS上安裝jupyter時,遇到了Python的setuptools庫沒法升級的狀況,緣由是MacOS的安全性設置問題,可使用以下方式解決:
Command+R
進入Mac保護模式。csrutils disable
關閉系統安全策略。csrutils enable
,再次重啓便可。進入PythonShell,輸入代碼本地啓動Ray:
import ray ray.init()
瀏覽器內打開WebUI界面以下: