Spark整合Ray思路漫談

什麼是Ray

以前花了大概兩到三天把Ray相關的論文,官網文檔看了一遍,同時特地去找了一些中文資料看Ray當前在國內的發展狀況(以及目前國內大部分人對Ray的認知程度)。python

先來簡單介紹下我對Ray的認知。算法

首先基因很重要,因此咱們先須要探查下Ray最初是爲了解決什麼問題而產生的。Ray的論文顯示,它最先是爲了解決加強學習的挑戰而設計的。加強學習的難點在於它是一個須要邊學習,邊作實時作預測的應用場景,這意味會有不一樣類型的tasks同時運行,而且他們之間存在複雜的依賴關係,tasks會在運行時動態產生產生新的tasks,現有的一些計算模型確定是沒辦法解決的。若是Ray只是爲了解決RL事情可能沒有那麼複雜,可是做者但願它不只僅能跑加強學習相關的,但願是一個通用的分佈式機器學習框架,這就意味着Ray必然要進行分層抽象了,也就是至少要分紅系統層應用層sql

系統層面,既然是分佈式的應用,那麼確定須要有一個應用內的resource/task調度和管理。首先是Yarn,K8s等資源調度框架是應用程序級別的的調度,Ray做爲一個爲了解決具體業務問題的應用,應該要跑在他們上面而不是取代他們,而像Spark/Flink雖然也是基於task級別的資源調度框架,可是由於他們在設計的時候是爲了解決一個比較具體的抽象問題,因此係統對task/資源都作了比較高的封裝,通常用戶是面向業務編程,很難直接操控task以及對應的resource。咱們以Spark爲例,用戶定義好了數據處理邏輯,至於如何將這些邏輯分紅多少個Job,Stage,Task,最後佔用多少Resource (CPU,GPU,Memory,Disk)等等,都是由框架自行決定,而用戶沒法染指。這也是我一直詬病Spark的地方。因此Ray在系統層面,是一個通用的以task爲調度級別的,同時能夠針對每一個task控制資源粒度的一個通用的分佈式task執行系統。記住,在Ray裏,你須要明肯定義Task以及Task的依賴,而且爲每一個task指定合適(數量,資源類型)的資源。好比你須要用三個task處理一份數據,那麼你就須要本身啓動三個task,而且指定這些task須要的資源(GPU,CPU)以及數量(能夠是小數或者整數)。而在Spark,Flink裏這是不大可能的。Ray爲了讓咱們作這些事情,默認提供了Python的語言接口,你能夠像使用Numpy那樣去使用Ray。實際上,也已經有基於Ray作Backend的numpy實現了,固然它屬於應用層面的東西了。Ray系統層面很簡單,也是典型的master-worker模式。相似spark的driver-executor模式,不一樣的是,Ray的worker相似yarn的worker,是負責Resource管理的,具體任務它會啓動Python worker去執行你的代碼,而spark的executor雖然也會啓動Python worker執行python代碼,可是對應的executor也執行業務邏輯,和python worker有數據交換和傳輸。docker

應用層面,你能夠基於Ray的系統進行編程,由於Ray默認提供了Python的編程接口,因此你能夠本身實現加強學習庫(RLLib),也能夠整合已有的算法框架,好比tensorflow,讓tensorflow成爲Ray上的一個應用,而且輕鬆實現分佈式。我記得知乎上有人說Ray其實就是一個Python的分佈式RPC框架,這麼說是對的,可是顯然會有誤導,由於這極可能讓人覺得他只是「Python分佈式RPC框架」。編程

如何和Spark協做

根據前面我講述的,咱們是能夠徹底基於Ray實現Spark的大部分API的,只是是Ray backend而非Spark core backend。實際上Ray目前正在作流相關的功能,他們如今要作的就是要兼容Flink的API。雖然官方宣稱Ray是一個新一代的機器學習分佈式框架,可是他徹底能夠cover住當前大數據和AI領域的大部分事情,可是任重道遠,還須要大量的事情。因此對我而言,我看中的是它良好的Python支持,以及系統層面對資源和task的控制,這使得:json

1.咱們能夠輕易的把咱們的單機Python算法庫在Ray裏跑起來(雖然算法自身不是分佈式的),可是咱們能夠很好的利用Ray的資源管理和調度功能,從而解決AI平臺的資源管理問題。服務器

2.Ray官方提供了大量的機器學習算法的實現,以及對當前機器學習框架如Tensorflow,Pytorch的整合,而分佈式能力則比這些庫原生提供的模式更靠譜和易用。畢竟對於這些框架而言,支持他們分佈式運行的那些輔助庫(好比TensorFlow提供parameter servers)至關簡陋。架構

可是,咱們知道,數據處理它自身有一個很大的生態,好比你的用戶畫像數據都在數據湖裏,你須要把這些數據進行很是複雜的計算才能做爲特徵餵給你的機器學習算法。而若是這個時候,你還要面向資源編程(或者使用一個還不夠成熟的上層應用)而不是面向「業務」編程,這就顯得很難受了,好比我就想用SQL處理數據,我只關注處理的業務邏輯,這個當前Ray以及之上的應用顯然仍是作不到如Spark那麼便利的(畢竟Spark就是爲了數據處理而生的),因此最好的方式是,數據的獲取和加工依然是在Spark之上,可是數據準備好了就應該丟給用戶基於Ray寫的代碼處理了。Ray能夠經過Arrow項目讀取HDFS上Spark已經處理好的數據,而後進行訓練,而後將模型保存爲HDFS。固然對於預測,Ray能夠本身消化掉或者丟給其餘系統完成。咱們知道Spark 在整合Python生態方面作出了很是多的努力,好比他和Ray同樣,也提供了python 編程接口,因此spark也較爲容易的整合譬如Tensorflow等框架,可是沒辦法很好的管控資源(好比GPU),並且,spark 的executor 會在他所在的服務器上啓動python worker,而spark通常而言是跑在yarn上的,這就對yarn形成了很大的管理麻煩,並且一般yarn 和hdfs之類的都是在一塊兒的,python環境還有資源(CPU/GPU)除了管理難度大之外,還有一個很大的問題是可能會對yarn的集羣形成比較大的穩定性風險。app

因此最好的模式是按以下步驟開發一個機器學習應用:負載均衡

寫一個python腳本,
在數據處理部分,使用pyspark,
在程序的算法訓練部分,使用ray,
spark 運行在yarn(k8s)上,
ray運行在k8s裏

好處顯而易見:用戶徹底無感知他的應用實際上是跑在兩個集羣裏的,對他來講就是一個普通python腳本。

從架構角度來說,複雜的python環境管理問題均可以丟給ray集羣來完成,spark只要能跑基本的pyspark相關功能便可,數據銜接經過數據湖裏的表(其實就是一堆parquet文件)便可。固然,若是最後結果數據不大,也能夠直接經過client完成pyspark到ray的交互。

Spark和Ray的架構和部署

如今咱們來思考一個比較好的部署模式,架構圖大概相似這樣:

首先,你們能夠理解爲k8s已經解決一切了,咱們spark,ray都跑在K8s上。可是,若是咱們但願一個spark 是實例多進程跑的時候,咱們並不但願是像傳統的那種方式,全部的節點都跑在K8s上,而是將executor部分放到yarn cluster. 在咱們的架構裏,spark driver 是一個應用,咱們能夠啓動多個pod從而得到多個spark driver實例,對外提供負載均衡,roll upgrade/restart 等功能。也就是k8s應該是面向應用的。可是複雜的計算,咱們依然但願留給Yarn,尤爲是還涉及到數據本地性,計算和存儲放到一塊兒(yarn和HDFS一般是在一塊兒的),避免k8s和HDFS有大量數據交換。

由於Yarn對Java/Scala友好,可是對Python並不友好,尤爲是在yarn裏涉及到Python環境問題會很是難搞(主要是Yarn對docker的支持仍是不夠優秀,對GPU支持也很差),而機器學習其實必定重度依賴Python以及很是複雜的本地庫以及Python環境,而且對資源調度也有比較高的依賴,由於算法是很消耗機器資源的,必須也有資源池,因此咱們但願機器學習部分能跑在K8s裏。可是咱們但願整個數據處理和訓練過程是一體的,算法的同窗應該沒法感知到k8s/yarn的區別。爲了達到這個目標,用戶依然使用pyspark來完成計算,而後在pyspark裏使用ray的API作模型訓練和預測,數據處理部分自動在yarn中完成,而模型訓練部分則自動被分發到k8s中完成。而且由於ray自身的優點,算法能夠很好的控制本身須要的資源,好比此次訓練須要多少GPU/CPU/內存,支持全部的算法庫,在作到對算法最少干擾的狀況下,算法的同窗們有最好的資源調度能夠用。

下面展現一段MLSQL代碼片斷展現如何利用上面的架構:

-- python 訓練模型的代碼
set py_train='''
import ray
ray.init()
@ray.remote(num_cpus=2, num_gpus=1)
def f(x):
    return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures))
''';
load script.`py_train` as py_train;

-- 設置須要的python環境描述
set py_env='''
''';
load script.`py_env` as py_env;

-- 加載hive的表
load hive.`db1.table1` as table1;

-- 對Hive作處理,好比作一些特徵工程
select features,label from table1 as data;

-- 提交Python代碼到Ray裏,此時是運行在k8s裏的
train data as PythonAlg.`/tmp/tf/model`
where scripts="py_train"
and entryPoint="py_train"
and condaFile="py_env"
and  keepVersion="true"
and fitParam.0.fileFormat="json" -- 還能夠是parquet
and `fitParam.0.psNum`="1";

下面是PySpark的示例代碼:

from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql import SparkSession
import logging
import ray

from pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteType
from sklearn.naive_bayes import GaussianNB
import os
from sklearn.externals import joblib
import pickle
import scipy.sparse as sp
from sklearn.svm import SVC
import io
import codecs

os.environ["PYSPARK_PYTHON"] = "/Users/allwefantasy/deepavlovpy3/bin/python3"
logger = logging.getLogger(__name__)

base_dir = "/Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latest"
spark = SparkSession.builder.master("local[*]").appName("example").getOrCreate()

data = spark.read.format("libsvm").load(base_dir + "/data/mllib/sample_libsvm_data.txt")

## 廣播數據
dataBr = spark.sparkContext.broadcast(data.collect())

## 訓練模型 這部分代碼會在spark executor裏的python worker執行
def train(row):
    import ray
    ray.init()
    train_data_id = ray.put(dataBr.value)
    ## 這個函數的python代碼會在K8s裏的Ray裏執行
    @ray.remote
    def ray_train(x):
        X = []
        y = []
        for i in ray.get(train_data_id):
            X.append(i["features"])
            y.append(i["label"])
        if row["model"] == "SVC":
            gnb = GaussianNB()
            model = gnb.fit(X, y)
            # 爲何還須要encode一下?
            pickled = codecs.encode(pickle.dumps(model), "base64").decode()
            return [row["model"], pickled]
        if row["model"] == "BAYES":
            svc = SVC()
            model = svc.fit(X, y)
            pickled = codecs.encode(pickle.dumps(model), "base64").decode()
            return [row["model"], pickled]

    result = ray_train.remote(row)
    ray.get(result)

##訓練模型 將模型結果保存到HDFS上
rdd = spark.createDataFrame([["SVC"], ["BAYES"]], ["model"]).rdd.map(train)
spark.createDataFrame(rdd, schema=StructType([StructField(name="modelType", dataType=StringType()),
                                              StructField(name="modelBinary", dataType=StringType())])).write. \
    format("parquet"). \
    mode("overwrite").save("/tmp/wow")

這是一個標準的Python程序,只是使用了pyspark/ray的API,咱們就完成了上面全部的工做,同時訓練兩個模型,而且數據處理的工做在spark中,模型訓練的在ray中。

完美結合!最重要的是解決了資源管理的問題!

做者介紹:祝威廉,資深數據架構,11年研發經驗。同時維護和開發多個開源項目。擅長大數據/AI領域的一些思路和工具。現專一於構建集大數據和機器學習於一體的綜合性平臺,下降AI落地成本相關工做上。


本文做者:祝威廉

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索