繼Spark以後,UC Berkeley 的新一代AI計算引擎——Ray

導讀

繼 Spark 以後,UC Berkeley AMP 實驗室又推出一重磅高性能AI計算引擎——Ray,號稱支持每秒數百萬次任務調度。那麼它是怎麼作到的呢?在試用以後,簡單總結一下:node

  1. 極簡 Python API 接口:在函數或者類定義時加上 ray.remote 的裝飾器並作一些微小改變,就能將單機代碼變爲分佈式代碼。這意味着不只能夠遠程執行純函數,還能夠遠程註冊一個類(Actor模型),在其中維護大量context(成員變量),並遠程調用其成員方法來改變這些上下文。
  2. 高效數據存儲和傳輸:每一個節點上經過共享內存(多進程訪問無需拷貝)維護了一塊局部的對象存儲,而後利用專門優化過的 Apache Arrow格式來進行不一樣節點間的數據交換。
  3. 動態圖計算模型:這一點得益於前兩點,將遠程調用返回的 future 句柄傳給其餘的遠程函數或者角色方法,即經過遠程函數的嵌套調用構建複雜的計算拓撲,並基於對象存儲的發佈訂閱模式來進行動態觸發執行。
  4. 全局狀態維護:將全局的控制狀態(而非數據)利用 Redis 分片來維護,使得其餘組件能夠方便的進行平滑擴展和錯誤恢復。固然,每一個 redis 分片經過 chain-replica 來避免單點。
  5. 兩層調度架構:分本地調度器和全局調度器;任務請求首先被提交到本地調度器,本地調度器會盡可能在本地執行任務,以減小網絡開銷。在資源約束、數據依賴或者負載情況不符合指望時,會轉給全局調度器來進行全局調度。

固然,還有一些須要優化的地方,好比 Job 級別的封裝(以進行多租戶資源配給),待優化的垃圾回收算法(針對對象存儲,如今只是粗暴的 LRU),多語言支持(最近支持了Java,但不知道好很差用)等等。可是瑕不掩瑜,其架構設計實現思路仍是有不少能夠借鑑的地方。python

動機和需求

開發 Ray 的動機始於強化學習(RL),可是因爲其計算模型強大表達能力,使用毫不限於 RL。這一小節是以描述 RL 系統需求爲契機,引出 Ray 的初始設計方向。可是因爲不大熟悉強化學習,一些名詞可能表達翻譯不許確。若是隻對其架構感興趣,徹底能夠跳過這一節git

RL system example

圖1:一個 RL 系統的例子github

咱們從考慮 RL 系統的基本組件開始,逐漸完善 Ray 的需求。如圖1所示,在一個 RL 系統的的設定中,*智能體(agent)會反覆與環境(environment)進行交互。智能體的目標是學習出一種最大化獎勵(reward)*的策略。*策略(policy)本質上是從環境中狀態到行爲抉擇(action)*的一種映射。至於環境,智能體,狀態,行爲和獎勵值的詳細定義,則是由具體的應用所決定的。redis

爲了學習策略,智能體一般要進行兩步操做:1)策略評估(policy evaluation)和 2)策略優化(policy improvement)。爲了評估一個策略,智能體和環境持續進行交互(通常是仿真的環境)以產生軌跡(trajectories)。軌跡是在當前環境和給定策略下產生的一個二元組(狀態,獎勵值)序列。而後,智能體根據這些軌跡來反饋優化該策略,即,向最大化獎勵值的梯度方向更新策略。圖2展現了智能體用來學習策略一個例子的僞碼。該僞碼經過調用 rollout(environment, policy) 來評估策略,進而產生仿真軌跡。train_policy() 接着會用這些軌跡做爲輸入,調用 policy.update(trajectories) 來優化當前策略。會重複迭代這個過程直到策略收斂。算法

// evaluate policy by interacting with env. (e.g., simulator) 
rollout(policy, environment):
    trajectory = []
    state = environment.initial_state()
    while (not environment.has_terminated()):
        action = policy.compute(state) // Serving
        state, reward = environment.step(action) // Simulation 
        trajectory.append(state, reward)
    return trajectory
    
// improve policy iteratively until it converges 
train_policy(environment):
    policy = initial_policy()
    while (policy has not converged):
        trajectories = [] 
        for i from 1 to k:
            // evaluate policy by generating k rollouts 
            trajectories.append(rollout(policy, environment)) 
            // improve policy
            policy = policy.update(trajectories) // Training 
    return policy
複製代碼

圖2:一段用於學習策略的典型的僞代碼apache

由此看來,針對 RL 應用的計算框架須要高效的支持模型訓練(training),在線預測(serving)平臺仿真(simulation)(如圖1所示)。接下來,咱們簡要說明一下這些工做負載(workloads)。編程

模型訓練通常會涉及到在分佈式的環境中跑隨機梯度降低模型(stochastic gradient descent,SGD)來更新策略。而分佈式 SGD 一般依賴於 allreduce 聚合步驟或參數服務器(parameter server).promise

在線預測 使用已經訓練好的策略並基於當前環境來給出動做決策。預測系統一般要求下降預測延遲,提升決策頻次。爲了支持擴展,最好可以將負載均攤到多節點上來協同進行預測。緩存

最後,大多數現存的 RL 應用使用仿真(simulations) 來對策略進行評估——由於現有的 RL 算法不足以單獨依賴從與物理世界的交互中高效的進行取樣。這些仿真器在複雜度上跨度極大。也許只須要幾毫秒(如模擬國際象棋遊戲中的移動),也許會須要幾分鐘(如爲了一個自動駕駛的車輛模擬真實的環境)。

與模型訓練和在線預測能夠在不一樣系統中進行處理的監督學習相比, RL 中全部三種工做負載都被緊耦合在了單個應用中,而且對不一樣負載間的延遲要求很苛刻。現有的系統中尚未能同時支持三種工做負載的。理論上,能夠將多個專用系統組合到一塊來提供全部能力,但實際上,子系統間的結果傳輸的延遲在 RL 下是不可忍受的。所以,RL 的研究人員和從業者不得不針對每一個需求單獨構建多套一次性的專用系統。

這些現狀要求爲 RL 開發全新的分佈式框架,能夠有效地支持訓練,預測和仿真。尤爲是,這樣的框架應具備如下能力:

支持細粒度,異構的計算。RL 計算的運行持續時間每每從數毫秒(作一個簡單的動做)到數小時(訓練一個複雜的策略)。此外,模型訓練一般須要各類異構的硬件支持(如CPU,GPU或者TPU)。

提供靈活的計算模型。RL 應用同時具備有狀態和無狀態類型的計算。無狀態的計算能夠在系統中的任何節點進行執行,從而能夠方便的進行負載均衡和按需的數據傳輸。所以,無狀態的計算很是適合細粒度的仿真和數據處理,例如從視頻或圖片中提取特徵。相比之下,有狀態的計算適合用來實現參數服務器、在支持 GPU 運算的數據上進行重複迭代或者運行不暴露內部狀態參數的第三方仿真器。

動態的執行能力。RL 應用中的不少模塊要求動態的進行執行,由於他們計算完成的順序並不老是預先肯定(例如:仿真的完成順序),而且,一個計算的運行結果能夠決定是否執行數個未來的計算(如,某個仿真的運行結果將決定咱們是否運行更多的仿真)。

除此以外,咱們提出了兩個額外的要求。首先,爲了高效的利用大型集羣,框架必須支持每秒鐘數百萬次的任務調度。其次,框架不是爲了支持從頭開始實現深度神經網絡或者複雜的仿真器,而是必須和現有的仿真器(OpenAI gym等)和深度學習框架(如TensorFlow,MXNet,Caffe, PyTorch)無縫集成。

語言和計算模型

Ray 實現了動態任務圖計算模型,即,Ray 將應用建模爲一個在運行過程當中動態生成依賴的任務圖。在此模型之上,Ray 提供了角色模型(Actor)和並行任務模型(task-parallel)的編程範式。Ray 對混合計算範式的支持使其有別於與像 CIEL 同樣只提供並行任務抽象和像 OrleansAkka 同樣只提供角色模型抽象的系統。

編程模型

任務模型(Tasks)。一個任務表示一個在無狀態工做進程執行的遠程函數(remote function)。當一個遠程函數被調用的時候,表示任務結果的 future 會當即被返回(也就是說全部的遠程函數調用都是異步的,調用後會當即返回一個任務句柄)。能夠將 Futures傳給 ray.get() 以阻塞的方式獲取結果,也能夠將 Futures 做爲參數傳給其餘遠程函數,以非阻塞、事件觸發的方式進行執行(後者是構造動態拓撲圖的精髓)。Futures 的這兩個特性讓用戶在構造並行任務的同時指定其依賴關係。下表是 Ray 的全部 API(至關簡潔而強大,可是實現起來會有不少坑,畢竟全部裝飾有 ray.remote 的函數或者類及其上下文都要序列化後傳給遠端節點,序列化用的和 PySpark 同樣的 cloudpickle)。

Name Description
futures = f.remote(args) Execute function f remotely. f.remote() can take objects or futures as inputs and returns one or more futures. This is non-blocking.
objects = ray.get(futures) Return the values associated with one or more futures. This is blocking.
ready futures = ray.wait(futures, k, timeout) Return the futures whose corresponding tasks have completed as soon as either k have completed or the timeout expires.
actor = Class.remote(args)
futures = actor.method.remote(args)
Instantiate class Class as a remote actor, and return a handle to it. Call a method on the remote actor and return one or more futures. Both are non-blocking.

表1 Ray API

遠程函數做用於不可變的物體上,而且應該是無狀態的而且沒有反作用的:這些函數的輸出僅取決於他們的輸入(純函數)。這意味着冪等性(idempotence),獲取結果出錯時只須要從新執行該函數便可,從而簡化容錯設計。

角色模型(Actors)。一個角色對象表明一個有狀態的計算過程。每一個角色對象暴露了一組能夠被遠程調用,而且按調用順序依次執行的成員方法(即在同一個角色對象內是串行執行的,以保證角色狀態正確的進行更新)。一個角色方法的執行過程和普通任務同樣,也會在遠端(每一個角色對象會對應一個遠端進程)執行而且當即返回一個 future;但不一樣的是,角色方法會運行在一個*有狀態(stateful)的工做進程上。一個角色對象的句柄(handle)*能夠傳遞給其餘角色對象或者遠程任務,從而使他們可以在該角色對象上調用這些成員函數。

Tasks Actors
細粒度的負載均衡 粗粒度的負載均衡
支持對象的局部性(對象存儲cache) 比較差的局部性支持
微小更新開銷很高 微小更新開銷不大
高效的錯誤處理 檢查點(checkpoint)恢復帶來較高開銷

表2 任務模型 vs. 角色模型的對比

表2 比較了任務模型和角色模型在不一樣維度上的優劣。任務模型利用集羣節點的負載信息依賴數據的位置信息來實現細粒度的負載均衡,即每一個任務能夠被調度到存儲了其所需參數對象的空閒節點上;而且不須要過多的額外開銷,由於不須要設置檢查點和進行中間狀態的恢復。與之相比,角色模型提供了極高效的細粒度的更新支持,由於這些更新做用在內部狀態(即角色成員變量所維護的上下文信息)而非外部對象(好比遠程對象,須要先同步到本地)。後者一般來講須要進行序列化和反序列化(還須要進行網絡傳輸,所以每每很費時間)。例如,角色模型能夠用來實現參數服務器(parameter servers)和基於GPU 的迭代式計算(如訓練)。此外,角色模型能夠用來包裹第三方仿真器(simulators)或者其餘難以序列化的對象(好比某些模型)。

爲了知足異構性和可擴展性,咱們從三個方面加強了 API 的設計。首先,爲了處理長短不一的併發任務,咱們引入了 ray.wait() ,它能夠等待前 k 個結果知足了就返回;而不是像 ray.get() 同樣,必須等待全部結果都知足後才返回。其次,爲了處理對不一樣資源緯度( resource-heterogeneous)需求的任務,咱們讓用戶能夠指定所需資源用量(例如裝飾器:ray.remote(gpu_nums=1)),從而讓調度系統能夠高效的管理資源(即提供一種交互手段,讓調度系統在調度任務時相對不那麼盲目)。最後,爲了提靈活性,咱們容許構造嵌套遠程函數(nested remote functions),意味着在一個遠程函數內能夠調用另外一個遠程函數。這對於得到高擴展性是相當重要的,由於它容許多個進程以分佈式的方式相互調用(這一點是很強大的,經過合理設計函數,可使得能夠並行部分都變成遠程函數,從而提升並行性)。

計算模型

Ray 採用的動態圖計算模型,在該模型中,當輸入可用(即任務依賴的全部輸入對象都被同步到了任務所在節點上)時,遠程函數和角色方法會自動被觸發執行。在這一小節,咱們會詳細描述如何從一個用戶程序(圖3)來構建計算圖(圖4)。該程序使用了表1 的API 實現了圖2 的僞碼。

@ray.remote
def create_policy():
# Initialize the policy randomly. return policy

@ray.remote(num_gpus=1)
class Simulator(object):
  def __init__(self):
  # Initialize the environment. self.env = Environment()
    def rollout(self, policy, num_steps):
      observations = []
      observation = self.env.current_state()
      for _ in range(num_steps):
        action = policy(observation)
        observation = self.env.step(action)
        observations.append(observation)
      return observations

@ray.remote(num_gpus=2)
def update_policy(policy, *rollouts):
  # Update the policy.
  return policy

@ray.remote
def train_policy():
  # Create a policy.
  policy_id = create_policy.remote()
  # Create 10 actors.
  simulators = [Simulator.remote() for _ in range(10)] # Do 100 steps of training.
  for _ in range(100):
      # Perform one rollout on each actor.
      rollout_ids = [s.rollout.remote(policy_id)
                     for s in simulators]
      # Update the policy with the rollouts.
      policy_id =
          update_policy.remote(policy_id, *rollout_ids)
   return ray.get(policy_id)
複製代碼

圖3:在 Ray 中實現圖2邏輯的代碼,注意裝飾器 @ray.remote 會將被註解的方法或類聲明爲遠程函數或者角色對象。調用遠程函數或者角色方法後會當即返回一個 future 句柄,該句柄能夠被傳遞給隨後的遠程函數或者角色方法,以此來表達數據間的依賴關係。每一個角色對象包含一個環境對象 self.env ,這個環境狀態爲全部角色方法所共享。

在不考慮角色對象的狀況下,在一個計算圖中有兩種類型的點:數據對象(data objects)和遠程函數調用(或者說任務)。一樣,也有兩種類型的邊:數據邊(data edges)和控制邊(control edges)。數據邊表達了數據對象任務間的依賴關係。更確切來講,若是數據對象 D 是任務 T 的輸出,咱們就會增長一條從 TD 的邊。相似的,若是 D 是 任務 T 的輸入,咱們就會增長一條 DT 的邊。控制邊表達了因爲遠程函數嵌套調用所形成的計算依賴關係,即,若是任務 T1 調用任務 T2, 咱們就會增長一條 T1T2 的控制邊。

在計算圖中,角色方法調用也被表示成了節點。除了一個關鍵不一樣點外,他們和任務調用間的依賴關係基本同樣。爲了表達同一個角色對象上的連續方法調用所造成的狀態依賴關係,咱們向計算圖添加第三種類型的邊:在同一個角色對象上,若是角色方法 Mj 緊接着 Mi 被調用,咱們就會添加一條 MiMj 的狀態邊(即 Mi 調用後會改變角色對象中的某些狀態,或者說成員變量;而後這些變化後的成員變量會做爲 Mj 調用的隱式輸入;由此,Mi 到 Mj 間造成了某種隱式依賴關係)。這樣一來,做用在同一角色對象上的全部方法調用會造成一條由狀態邊串起來的調用鏈(chain,見圖4)。這條調用鏈表達了同一角色對象上方法被調用的先後相繼的依賴關係。

task graph

圖4該圖與圖3 train_policy.remote() 調用相對應。遠程函數調用和角色方法調用對應圖中的任務(tasks)。該圖中顯示了兩個角色對象A10和A20,每一個角色對象的方法調用(被標記爲 A1i 和 A2i 的兩個任務)之間都有狀態邊(stateful edge)鏈接,表示這些調用間共享可變的角色狀態。從 train_policy 到被其調用的任務間有控制邊鏈接。爲了並行地訓練多種策略,咱們能夠調用 train_policy.remote() 屢次

狀態邊讓咱們將角色對象嵌入到無狀態的任務圖中,由於他們表達出了共享狀態、先後相繼的兩個角色方法調用之間的隱式數據依賴關係。狀態邊的添加還可讓咱們維護譜系圖(lineage),如其餘數據流系統同樣,咱們也會跟蹤數據的譜系關係以在必要的時候進行數據的重建。經過顯式的將狀態邊引入數據譜系圖中,咱們能夠方便的對數據進行重建,無論這些數據是遠程函數產生的仍是角色方法產生的(小節4.2.3中會詳細講)。

架構

Ray 的架構組成包括兩部分:

  1. 實現 API 的應用層,如今包括 Python 和 Java分別實現的版本。
  2. 提供高擴展性和容錯的系統層,用 C++ 寫的,以CPython的形式嵌入包中。

ray architecture

圖5:Ray 的架構包括兩部分:系統層和應用層。前者實現了API和計算模型,後者實現了任務調度和數據管理,以知足性能要求和容錯需求

應用層

應用層包括三種類型的進程:

  • 驅動進程(Driver): 用來執行用戶程序。
  • 工做進程(Worker):用來執行 Driver 或者其餘 Worker 指派的任務(remote functions,就是用戶代碼中裝飾了@ray.remote 的那些函數)的無狀態進程。工做進程在節點啓動時被自動啓動,通常來講會在每一個物理機上啓動與 CPU 一樣數量的 Worker(這裏還有些問題:若是節點是容器的話,獲取的仍然是其所在物理機的 CPU 數)。當一個遠程函數被聲明時,會被註冊到全局,並推送到全部 Worker。每一個 Worker 順序的執行任務,而且不維護本地狀態。
  • 角色進程(Actor):用來執行角色方法的有狀態進程。與 Worker 被自動的啓動不一樣,每一個 Actor 會根據需求(即被調用時)被工做進程或者驅動進程顯示啓動。和 Worker 同樣,Actor 也會順序的執行任務,不一樣的是,下一個任務的執行依賴於前一個任務生成或改變的狀態(即 Actor 的成員變量)。

系統層

系統層包括三個主要組件:全局控制存儲(GCS,global control store),分佈式調度器(distributed scheduler)和分佈式對象存儲(distributed object store)。全部組件均可以進行水平擴展而且支持容錯。

全局控制存儲(GCS)

全局狀態存儲維護着系統全局的控制狀態信息,是咱們系統首創的一個部件。其核心是一個能夠進行發佈訂閱的鍵值對存儲。咱們經過分片(sharding)來應對擴展,每片存儲經過鏈式副本(將全部數據副本組織成鏈表,來保證強一致性,見04年的一篇論文)來提供容錯。提出和設計這樣一個GCS的動機在於使系統可以每秒進行數百萬次的任務建立和調度,而且延遲較低,容錯方便。

對於節點故障的容錯須要一個可以記錄譜系信息(lineage information)的方案。現有的基於譜系的解決方法側重粗粒度(好比 Spark 的 rdd)的並行,所以能夠只利用單個節點(如Master or Driver)存儲譜系信息,而不影響性能。然而,這種設計並不適合像仿真(simulation)同樣的細粒度、動態的做業類型(workload)。所以咱們將譜系信息的存儲與系統其它模塊解耦,使之能夠獨立地動態擴容。

保持低延遲意味着要儘量下降任務調度的開銷。具體來講,一個調度過程包括選擇節點,分派任務,拉取遠端依賴對象等等。不少現有的信息流系統,將其全部對象的位置、大小等信息集中存儲在調度器上,使得上述調度過程耦合在一塊。當調度器不是瓶頸的時候,這是一個很簡單天然的設計。然而,考慮到 Ray 要處理的數據量級和數據粒度,須要將中心調度器從關鍵路徑中移出(不然若是全部調度都要全局調度器經手處理,它確定會成爲瓶頸)。對於像 allreduce 這樣的(傳輸頻繁,對延遲敏感)分佈式訓練很重要的原語來講,每一個對象傳輸時都經手調度器的開銷是不可容忍的。 所以,咱們將對象的元數據存儲在 GCS 中而不是中央調度器裏,從而將任務分派與任務調度徹底解耦。

總的來講,GCS 極大地簡化了 Ray 的總體設計,由於它將全部狀態攬下,從而使得系統中其餘部分都變成無狀態。這不只使得對容錯支持簡化了不少(即,每一個故障節點恢復時只須要從 GCS 中讀取譜系信息就行),也使得分佈式的對象存儲和調度器能夠進行獨立的擴展(由於全部組件能夠經過 GCS 來獲取必要的信息)。還有一個額外的好處,就是能夠更方便的開發調試、監控和可視化工具。

自下而上的分佈式調度系統(Bottom-up Distributed Scheduler)

如前面提到的,Ray 須要支持每秒數百萬次任務調度,這些任務可能只持續短短數毫秒。大部分已知的調度策略都不知足這些需求。常見的集羣計算框架,如 SparkCIELDryad 都實現了一箇中心的調度器。這些調度器具備很好的局部性(局部性原理)的特色,可是每每會有數十毫秒的延遲。像 work stealingSparrowCanary 同樣的的分佈式調度器的確能作到高併發,可是每每不考慮數據的局部性特色,或者假設任務(tasks)屬於不一樣的做業(job),或者假設計算拓撲是提早知道的。

爲了知足上述需求,咱們設計了一個兩層調度架構,包括一個全局調度器(global scheduler)和每一個節點上的本地調度器(local scheduler)。爲了不全局調度器過載,每一個節點(node)上建立的任務會被先提交到本地調度器。本地調度器老是先嚐試將任務在本地執行,除非其所在機器過載(好比任務隊列超過了預約義的閾值)或者不能知足任務任務的資源需求(好比,缺乏 GPU)。若是本地調度器發現不能在本地執行某個任務,會將其轉發給全局調度器。因爲調度系統都傾向於首先在本地調度任務(即在調度結構層級中的葉子節點),咱們將其稱爲自下而上的調度系統(能夠看出,本地調度器只是根據本節點的局部負載信息進行調度,而全局調度器會根據全局負載來分派任務;固然前提是資源約束首先得被知足)。

ray distributed scheduler

圖6 這是調度系統示意圖,任務自下而上被提交:任務首先被驅動進程(Drivers)或者工做進程(Workers)提交到本地調度器,而後在須要的時候會由本地調度器轉給全局調度器進行處理。圖中箭頭的粗細程度表明其請求的繁忙程度。

全局調度器根據每一個節點的負載情況和資源請求約束來決定調度策略。細化一下就是,全局調度器首先肯定全部知足任務資源要求的節點,而後在其中選擇具備最小預估排隊時間(estimated waiting time)的一個,將任務調度過去。在給定的節點上,預估排隊時間是下述兩項時間的和:1)任務在節點上的排隊時間 (任務隊列長度乘上平均執行時間); 2)任務依賴的遠程對象的預估傳輸時間(全部遠程輸入的大小除以平均帶寬)。全局調度器經過心跳獲取到每一個節點的任務排隊狀況和可用資源信息,從 GCS 中獲得任務全部輸入的位置和大小。而後,全局調度器經過移動指數平均(exponential averaging)的方法來計算任務平均執行時間和平均傳輸帶寬。若是全局調度器成爲了系統瓶頸,咱們能夠實例化更多的副原本分攤流量,它們經過 GCS來共享全局狀態信息。如此一來,咱們的調度架構具備極高可擴展性。

任務生命週期

(注:這部分是從代碼中的設計文檔翻譯而來,注意這只是截止到2019.04.21 的設計)

在實現的時候,每一個任務具備如下幾種狀態。任意時刻,任務都會處在這幾種狀態之一。

  • 可放置(Placeable):任務已經準備好被調度到(本地或者遠程)節點上,具體如何調度,前一段已經說明。注意該狀態不表示放置位置已經最終肯定,還可能被再一次被從某處調度出去。
  • 等待角色建立(WaitActorCreation):一個角色方法(task)等待其所在角色實例化完畢。一旦角色被建立,該任務會被轉給運行該角色的遠端機器進行處理。
  • 等待中(Waiting):等待該任務參數需求被知足,即,等待全部遠端參數對象傳送到本地對象存儲中。
  • 準備好(Ready):任務準備好了被運行,也就說全部所需參數已經在本地對象存儲中就位了。
  • 運行中(Running):任務已經被分派,而且正在本地工做進程(worker)或者角色進程(actor)中運行。
  • 被阻塞(Blocked):當前任務因爲其依賴的數據不可用而被阻塞住。如,嵌套調用時,該任務啓動了另外的遠程任務而且等待其完成,以取得結果。
  • 不可行(infeasible):任務的資源要求在任何一臺機器上都得不到知足。
---------------------------------
                                 |                                 |
                                 |     forward                     | forward
                                 |----------------                 |
node with                  ------|                |   arguments    |
resources          forward|      |   resource     |     local      |   actor/worker
joins                     |      v  available     |    -------->   |    available
  ---------------------- Placeable ----------> Waiting           Ready ---------> Running
|                       | |  ^                    ^    <--------   ^               |   ^
|             |---------  |  |                    |    local arg   |               |   |
|             |           |  |                    |     evicted    |        worker |   | worker
|             |     actor |  |                    |                |       blocked |   | unblocked
|   resources |   created |  | actor              | ---------------                |   |
|  infeasible |           |  | created            | actor                          |   |
|             |           |  | (remote)           | created                        v   |
|             |           v  |                    | (local)                              Blocked
|             |     WaitForActorCreation----------
|             v
 ----Infeasible
複製代碼

基於內存的分佈式對象存儲

爲了下降任務的延遲,咱們實現了一個基於內存的分佈式存儲系統以存儲每一個任務(無狀態的計算過程)的輸入和輸出。在每一個節點上,咱們以共享內存(shared memory)的方式實現了對象存儲。這使得同一節點上的不一樣任務以零拷貝的代價進行數據共享。至於數據格式,咱們選擇了 Apache Arrow

若是一個任務的輸入(即函數的參數對象)不在本地,在該任務執行以前,輸入會被拷貝到本地的對象存儲中。同時,任務執行完畢後,會將輸出也寫到本地得對象存儲中。對象拷貝消除了熱數據所形成的潛在的瓶頸,而且經過將任務的數據讀寫都限制在本地內存中以縮短執行時間。這些作法增長了計算密集型工做任務的吞吐量,而不少 AI 應用都是計算密集型的。爲了下降延遲,咱們將用到的對象所有放在內存中,只有在內存不夠的時候才經過 LRU 算法將一些對象擠出內存(從API 能夠看出,每一個節點的內存上限能夠在啓動節點時經過參數指定。此外用 LRU 做爲垃圾回收算法仍是有點粗暴,若是不一樣類型的任務負載跑在同一個 ray 集羣上,可能致使資源的互相爭搶,從而有大量的資源換出而後重建,從而嚴重影響效率)。

和現有的計算框架的集羣(如Spark, Dryad)同樣,對象存儲只接受不可變數據(immutable data)。這種設計避免了對複雜的一致性協議的需求(由於對象數據歷來不須要進行更新),而且簡化了數據的容錯支持。當有節點出現故障時,Ray 經過從新執行對象譜系圖來恢復任意所需對象(也就是說不用整個恢復該宕機節點全部狀態,只須要按需恢復後面計算所需數據,用不到的數據丟了就丟了吧)。在工做開始以前,存放在 GCS 的譜系信息追蹤了全部無狀態的任務和有狀態的角色;咱們利用前者對丟失對象進行重建(結合上一段,若是一個任務有大量的迭代,而且都是遠程執行,會形成大量的中間結果對象,將內存擠爆,從而使得較少使用可是稍後可能使用的全局變量擠出內存,因此 LRU 有點粗暴,據說如今在醞釀基於引用計數的GC)。

爲了簡化實現,咱們的對象存儲不支持分佈式的對象。也就是說,每一個對象必須可以在單節點內存下,而且只存在於單節點中。對於大矩陣、樹狀結構等大對象,能夠在應用層來拆分處理,好比說實現爲一個集合。

實現

Ray 是一個由加州大學伯克利分校開發的一個活躍的開源項目。Ray 深度整合了 Python,你能夠經過 pip install ray 來安裝 ray。整個代碼實現包括大約 40K 行,其中有 72% C++ 實現的系統層和 28% 的 Python 實現的應用層(截止目前,又增長了對 Java 的支持)。GCS 的每一個分片使用了一個 Redis 的 key-val 存儲,而且只設計單個鍵值對操做。GCS 的表經過按任務ID、數據對象集合進行切分來進行平滑擴展。每一片利用鏈式冗餘策略(chained-replcated)來容錯。咱們將本地調度器和全局調度器都實現爲了單線程、事件驅動的進程。本地調度器緩存了本地對象元信息,被阻塞的任務隊列和等待調度的任務隊列。爲了在不一樣節點的對象存儲之間無感知的傳輸超大對象,咱們將大對象切片,利用多條 TCP 鏈接來並行傳。

將全部碎片捏一塊

圖 7 經過一個簡單的 aba,b能夠是標量,向量或者矩陣)而後返回 c 的例子展現了 Ray 端到端的工做流。遠程函數 add() 在初始化 ( ray.init ) 的時候,會自動地被註冊到 GCS 中,進而分發到集羣中的每一個工做進程。(圖7a 的第零步)

圖7a 展現了當一個驅動進程(driver)調用 add.remote(a, b) ,而且 a, b 分別存在節點 N1N2 上時 ,Ray 的每一步操做。驅動進程將任務 add(a, b) 提交到本地調度器(步驟1),而後該任務請求被轉到全局調度器(步驟2)(如前所述,若是本地任務排隊隊列沒有超過設定閾值,該任務也能夠在本地進行執行)。接着,全局調度器開始在 GCS 中查找 add(a, b) 請求中參數 a, b 的位置(步驟3),從而決定將該任務調度到節點 N2 上(由於 N2 上有其中一個參數 b)(步驟4)。N2 節點上的本地調度器收到請求後(發現知足本地調度策略的條件,如知足資源約束,排隊隊列也沒超過閾值,就會在本地開始執行該任務),會檢查本地對象存儲中是否存在任務 add(a, b) 的全部輸入參數(步驟5)。因爲本地對象存儲中沒有對象 a,工做進程會在 GCS 中查找 a 的位置(步驟6)。 這時候發現 a 存儲在 N1 中,因而將其同步到本地的對象存儲中(步驟7)。因爲任務 add() 全部的輸入參數對象都存在了本地存儲中,本地調度器將在本地工做進程中執行 add() (步驟8),並經過共享存儲訪問輸入參數(步驟9)。

ray execute example

圖 7b 展示了在 N1 上執行 ray.get() 和在 N2 上執行 add()後所觸發的逐步的操做。一旦 ray.get(id)被調用,N1 上的用戶驅動進程會在本地對象存儲中查看該 id (即由遠程調用 add() 返回的 future 值,全部 object id 是全局惟一的,GCS 能夠保證這一點)對應的對象 c 是否存在(步驟1)。因爲本地對象存儲中沒有 c , 驅動進程會去 GCS 中查找 c 的位置。在此時,發現 GCS 中並無 c 的存在,由於 c 根本尚未被建立出來。 因而,N1 的對象存儲向 GCS 中的對象表(Object Table)註冊了一個回調函數,以監聽 c 對象被建立事件(步驟2)。與此同時,在節點 N2 上,add() 任務執行完畢,將結果 c 存到其本地對象存儲中(步驟3),同時也將 c 的位置信息添加到 GCS 的對象存儲表中(步驟4)。GCS 監測到 c 的建立,會去觸發以前 N1 的對象存儲註冊的回調函數(步驟5)。接下來,N1 的對象存儲將 cN2 中同步過去(步驟6),從而結束該任務。

ray execute example b

儘管這個例子中涉及了大量的 RPC調用,但對於大部分狀況來講,RPC 的數量會小的多,由於大部分任務會在本地被調度執行,並且 GCS 回覆的對象信息會被本地調度器和全局調度器緩存(可是另外一方面,執行了大量遠程任務以後,本地對象存儲很容易被撐爆)。

名詞對照

workloads:工做負載,即描述任務須要作的工做。

GCS: Global Control Store,全局控制信息存儲。

Object Table:存在於 GCS 中的對象表,記錄了全部對象的位置等信息(objectId -> location)。

Object Store:本地對象存儲,在實現中叫 Plasma,即存儲任務所需對象的實例。

Lineage:血統信息,譜系信息;即計算時的數據變換先後的相繼關係圖。

Node:節點;Ray 集羣中的每一個物理節點。

Driver、Worker:驅動進程和工做進程,物理表現形式都是 Node 上的進程。但前者是用戶側使用 ray.init 時候生成的,隨着 ray.shutdown 會進行銷燬。後者是 ray 在啓動的時在每一個節點啓動的無狀態的駐留工做進程,通常和物理機 CPU 數量相同。

Actor:角色對象,語言層面,就是一個類;物理層面,表現爲某個節點上的一個角色進程,維護了該角色對象內的全部上下文(角色成員變量)。

Actor method:角色方法,語言層面,就是類的成員方法;其全部輸入包括顯式的函數參數和隱式的成員變量。

Remote function:遠程函數,即經過 @ray.remote 註冊到系統的函數。在其被調度時,稱爲一個任務(Task)。

Job,Task:文中用到了很多 Job 和 Task 的概念,可是這兩個概念在 CS 中其實定義比較模糊,不如進程和線程通常明確。Task 在本論文是對一個遠程函數(remote action)或者一個 actor 的遠程方法(remote method)的封裝。而 Job 在當前的實現中並不存在,只是一個邏輯上的概念,其含義爲運行一次用戶側代碼所所涉及到的全部生成的 Task 以及產生的狀態的集合。

Scheduler:paper 中統一用的 scheduler,可是有的是指部分(local scheduler 和 global scheduler),這時我翻譯爲調度器,有時候是指 Ray 中全部調度器構成的總體,這時我翻譯爲調度系統

exponential averaging:我翻譯成了移動指數平均,雖然他沒有寫移動。對於剛過去的前 n 項,以隨着時間漸進指數增加的權重作加權平均。計算時候能夠經過滑動窗口的概念方便的遞推計算。

Future:這個不大好翻譯,大概意思就是對於異步調用中的返回值句柄。相信信息能夠參見維基百科 Future 和 promise

引用

[1] 官方文檔:ray.readthedocs.io/en/latest/

[2] 系統論文:www.usenix.org/system/file…

[3] 系統源碼:github.com/ray-project…

相關文章
相關標籤/搜索