Spark上的深度學習流水線

Spark上的深度學習流水線

深度學習須要一個樣本數據處理、模型訓練、模型檢驗、模型部署的完整處理過程,而傳統的深度學習引擎主要完成訓練計算和模型調用的核心功能,在用於規模化的生產級應用時還須要大量的開發工做,運維管理也較爲複雜。html

Deep Learning Pipeline

Apache Spark上的深度學習流水線提供了一個高階的API接口,能夠經過Python支持深度學習的規模伸縮能力。這得益於Spark的集羣計算和分佈式內存架構,能夠快速存取大規模的數據以及調用多個節點上的計算能力。python

概覽

深度學習流水線(Deep Learning Pipelines)提供了高級API,經過Python進行深度學習的規模伸縮,運行於Spark計算集羣之上。git

該支持庫來自於Databricks和 Spark的兩大優點:github

  1. 在Spark的指導原則和Spark MLlib的支持下,提供了易於使用的API,只需數行代碼便可實現深度學習能力。
  2. 使用Spark的強大的分佈式引擎使深度學習在處理海量數據集時實現規模伸縮。

目前,TensorFlow和TensorFlow支持下的Keras深度學習引擎已經支持,主要聚焦於:sql

  • 大規模的推理/評分。
  • 影像數據的轉移學習(transfer learning)和超參數(hyperparameter )調優。

下一步,將爲數據科學家和機器學習專家提供工具,使其能將深度學習模型轉化爲SQL函數,從而能讓更多的用戶羣體所使用。這不是簡單地執行單個模型的分佈式訓練,而是一個活躍的研究領域,咱們將可以爲大多數深度學習的適用場景提供現實可操做的解決方案。apache

對該庫的概覽描述,參見Databricks的博客(blog post),對深度學習流水線進行了介紹。對於該軟件庫服務的多種應用案例,查看下面的快速使用參考部分(Quick user guide)。session

該支持庫還在早期開發階段,還有任何人提出反饋及做出貢獻。架構

開發維護者: Bago Amirbekian, Joseph Bradley, Yogesh Garg, Sue Ann Hong, Tim Hunter, Siddharth Murching, Tomas Nykodym, Lu Wangapp

構建和運行單元測試

爲了編譯該項目, 從項目主目錄運行 build/sbt assembly 。這將啓動 Scala unit tests。運維

爲了運行Python的 unit tests, 在e python/ 目錄下啓動 run-tests.sh 腳本 (編譯以後)。首先須要設置幾個環境變量。

# Be sure to run build/sbt assembly before running the Python tests
sparkdl$ SPARK_HOME=/usr/local/lib/spark-2.3.0-bin-hadoop2.7 PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.3.0 ./python/run-tests.sh

Spark 版本兼容性

爲了使用最新的代碼,Spark 2.3.0 是必須的,建議使用Python 3.6 和 Scala 2.11。查看 travis config 得到一般的測試所用的軟件組合。

每一版本的兼容性要求列在版本( Releases)一節。

支持

提問和參與開發討論,到 DL Pipelines Google group.

提交bug報告或者特性要求,在 Github issues 中建立條目或參與已有的話題。

發行版本

  • 1.0.0 版本: 要求Spark 2.3.0. Python 3.6 & Scala 2.11 爲建議。要求TensorFlow 1.6.0。
    1. 使用Spark 2.3.0的影像定義,新的定義使用 BGR channel ordering for 3-channel images,instead of the RGB ordering used in this project before the change.
    2. Persistence for DeepImageFeaturizer (both Python and Scala).

快速使用指南

深度學習流水線(Deep Learning Pipelines)提供了一系列工具,用於使用深度學習進行影像處理。包含的分類以下:

爲了運行下面的例子,獲取Databricks notebook( Databricks docs for Deep Learning Pipelines), 能夠在最新的 Deep Learning Pipelines版本下運行. 這裏是與老版本( 0.1.0, 0.2.0, 0.3.0, 1.0.0.)兼容的一些 Databricks notebooks。

與Sparkd的 images 對象一塊兒使用

應用深度學習於影像的第一步就是載入影像。Spark和Deep Learning Pipelines包含載入數百萬張圖像到 Spark DataFrame 的實用函數,並且以分佈式方式自動解碼,容許可擴展地操做。

使用Spark's ImageSchema:

from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")

或炸,使用本身的 image library:

from sparkdl.image import imageIO as imageIO
image_df = imageIO.readImagesWithCustomFn("/data/myimages",decode_f=<your image library, see imageIO.PIL_decode>)

結果 DataFrame 包含字符串列, "image" 包含 image struct ,其 schema == ImageSchema.

image_df.show()

Why images? 深度學習已經證實了在影像相關的任務處理的強大能力,所以咱們決定對 Spark 加入影像數據的內置支持。最終目的是爲了支持更多的數據類型,如文本和時間序列,創建於社區的具體需求。

遷移學習(Transfer learning)

Deep Learning Pipelines 提供了實用工具,用於實現影像的transfer learning , 這是利用深度學習的最快方法之一。使用Deep Learning Pipelines,,只須要幾行代碼就能完成。

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df)    # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability",  "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

分佈式超參數調試

在深度學習中,爲了對於訓練參數的不一樣值獲得最好的結果,一個重要的步驟叫作超參數調優( hyperparameter tuning)。由於Deep Learning Pipelines將深度學習做爲Spark的機器學習流水線的一個步驟,用戶可使用已經整合到Spark MLlib的超參數調優架構。

對於Keras用戶

爲了執行Keras模型上的超參數調優, KerasImageFileEstimator 用於構建一個 Estimator ,而後使用 MLlib的工具來跳有超參數(e.g. CrossValidator)。KerasImageFileEstimator 與image URI columns一塊兒工做 (不是 ImageSchema columns),爲了容許自定義的影像載入和處理函數,這在 keras中常常會用到。

爲了使用 KerasImageFileEstimator 構建 estimator , 咱們須要一個存儲爲文件的 Keras model。這能夠是  Keras 內置的模型或者用戶訓練好的模型。

from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')

咱們還須要建立一個影像載入函數,用於從URI讀取影像數據,預處理,而後返回numerical tensor到keras Model的輸入格式。而後,咱們建立KerasImageFileEstimator,接收保存的模型文件。

import PIL.Image
import numpy as np
from keras.applications.imagenet_utils import preprocess_input
from sparkdl.estimators.keras_image_file_estimator import KerasImageFileEstimator

def load_image_from_uri(local_uri):
  img = (PIL.Image.open(local_uri).convert('RGB').resize((299, 299), PIL.Image.ANTIALIAS))
  img_arr = np.array(img).astype(np.float32)
  img_tnsr = preprocess_input(img_arr[np.newaxis, :])
  return img_tnsr

estimator = KerasImageFileEstimator( inputCol="uri",
                                     outputCol="prediction",
                                     labelCol="one_hot_label",
                                     imageLoader=load_image_from_uri,
                                     kerasOptimizer='adam',
                                     kerasLoss='categorical_crossentropy',
                                     modelFile='/tmp/model-full-tmp.h5' # local file path for model
                                   )

咱們使用其進行超參數調優,使用 CrossValidataor執行grid search來實現。

from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = (
  ParamGridBuilder()
  .addGrid(estimator.kerasFitParams, [{"batch_size": 32, "verbose": 0},
                                      {"batch_size": 64, "verbose": 0}])
  .build()
)
bc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label" )
cv = CrossValidator(estimator=estimator, estimatorParamMaps=paramGrid, evaluator=bc, numFolds=2)

cvModel = cv.fit(train_df)

深度學習模型擴容

Spark DataFrames是應用深度學習模型到大規模數據集的天然選擇。Deep Learning Pipelines 提供了一些列Spark MLlib Transformers,將TensorFlow Graphs和基於TensorFlow的Keras Models擴展到集羣上。 這些Transformers背後由Tensorframes庫支持,在Spark worker節點上高效地處理分佈式模型和數據 。

應用deep learning models於影像並擴展

Deep Learning Pipelines提供了幾種方法應用影像模型並擴展到集羣:

  • 通用images models能夠直接處理,不須要TensorFlow或Keras的代碼處理。
  • TensorFlow graphs 用於處理 images。
  • Keras models 用於處理 images。

應用通用 image models

已經有不少你們都知道的影像深度學習模型。 若是要作的處理與模型提供的很像(如基於with ImageNet classes的對象識別), 或者處於探索的目的,可使用Transformer DeepImagePredictor ,簡單地指定model的名稱便可。

from pyspark.ml.image import ImageSchema
from sparkdl import DeepImagePredictor

image_df = ImageSchema.readImages(sample_img_dir)

predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels", modelName="InceptionV3", decodePredictions=True, topK=10)
predictions_df = predictor.transform(image_df)

對於 TensorFlow 用戶

Deep Learning Pipelines提供MLlib Transformer,能夠將給定的TensorFlow Graph應用於包含影像列的DataFrame (影像使用前面描述的方法載入)。這裏是一個很是簡單的例子,演示了 TensorFlow Graph 如何用於 Transformer. 實踐中,TensorFlow Graph將從文件中載入,而後用於調用 TFImageTransformer

from pyspark.ml.image import ImageSchema
from sparkdl import TFImageTransformer
import sparkdl.graph.utils as tfx  # strip_and_freeze_until was moved from sparkdl.transformers to sparkdl.graph.utils in 0.2.0
from sparkdl.transformers import utils
import tensorflow as tf

graph = tf.Graph()
with tf.Session(graph=graph) as sess:
    image_arr = utils.imageInputPlaceholder()
    resized_images = tf.image.resize_images(image_arr, (299, 299))
    # the following step is not necessary for this graph, but can be for graphs with variables, etc
    frozen_graph = tfx.strip_and_freeze_until([resized_images], graph, sess, return_graph=True)

transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph,
                                 inputTensor=image_arr, outputTensor=resized_images,
                                 outputMode="image")

image_df = ImageSchema.readImages(sample_img_dir)
processed_image_df = transformer.transform(image_df)

對於 Keras 用戶

爲了在Spark中用分佈式的方法應用Keras models,KerasImageFileTransformer 與TensorFlow做爲引擎的 Keras models一塊兒工做。

  • 內部建立一個 DataFrame,包含影像列,載入用戶指定的影像和處理函數,輸入到包含有影像列的 DataFrame。
  • 載入 Keras model,從給定的文件路徑讀入。
  • 應用model到image DataFrame。

TFImageTransformer 的API的不一樣在於,一般Keras workflows有一些很是特殊的載入和重設尺寸等方法,而這些功能一般不是 TensorFlow Graph的一部分。

爲了使用transformer, 咱們首先須要一個存儲在文件中的Keras model。 咱們直接使用Keras內置的 InceptionV3 model,就不用本身來訓練了。

from keras.applications import InceptionV3

model = InceptionV3(weights="imagenet")
model.save('/tmp/model-full.h5')

再使用模型來進行預測:

from keras.applications.inception_v3 import preprocess_input
from keras.preprocessing.image import img_to_array, load_img
import numpy as np
import os
from pyspark.sql.types import StringType
from sparkdl import KerasImageFileTransformer

def loadAndPreprocessKerasInceptionV3(uri):
  # this is a typical way to load and prep images in keras
  image = img_to_array(load_img(uri, target_size=(299, 299)))  # image dimensions for InceptionV3
  image = np.expand_dims(image, axis=0)
  return preprocess_input(image)

transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions",
                                        modelFile='/tmp/model-full-tmp.h5',  # local file path for model
                                        imageLoader=loadAndPreprocessKerasInceptionV3,
                                        outputMode="vector")

files = [os.path.abspath(os.path.join(dirpath, f)) for f in os.listdir("/data/myimages") if f.endswith('.jpg')]
uri_df = sqlContext.createDataFrame(files, StringType()).toDF("uri")

keras_pred_df = transformer.transform(uri_df)

應用 deep learning models 到 tensors 並擴容

Deep Learning Pipelines 也提供了使用tensor inputs應用模型 (到 2 dimensions), 由通用的 deep learning libraries提供:

  • TensorFlow graphs
  • Keras models

對於 TensorFlow 用戶

TFTransformer 應用一個用戶指定的TensorFlow graph到tensor inputs(最多二維)。TensorFlow graph 能夠做爲TensorFlow graph objects (tf.Graph 指定,或者是一個引用 tf.GraphDef),或者是 checkpoint ,或者是 SavedModel objects (查看input object class 得到更多細節).。 transform() 函數應用 TensorFlow graph 到輸入 DataFrame 的column of arrays (這裏每個 array 對應於一個 Tensor),而且輸出 column of arrays,對應於每個 graph。

首先,咱們建立一個二維點的樣本數據集, 圍繞兩個不一樣中心點的高斯分佈。

import numpy as np
from pyspark.sql.types import Row

n_sample = 1000
center_0 = [-1.5, 1.5]
center_1 = [1.5, -1.5]

def to_row(args):
  xy, l = args
  return Row(inputCol = xy, label = l)

samples_0 = [np.random.randn(2) + center_0 for _ in range(n_sample//2)]
labels_0 = [0 for _ in range(n_sample//2)]
samples_1 = [np.random.randn(2) + center_1 for _ in range(n_sample//2)]
labels_1 = [1 for _ in range(n_sample//2)]

rows = map(to_row, zip(map(lambda x: x.tolist(), samples_0 + samples_1), labels_0 + labels_1))
sdf = spark.createDataFrame(rows)

下一步,編寫一個函數返回tensorflow graph和它的input:

import tensorflow as tf

def build_graph(sess, w0):
  X = tf.placeholder(tf.float32, shape=[None, 2], name="input_tensor")
  model = tf.sigmoid(tf.matmul(X, w0), name="output_tensor")
  return model, X

而後,就是使用Tensorflow在單個節點上進行預測而編寫的代碼。

w0 = np.array([[1], [-1]]).astype(np.float32)
with tf.Session() as sess:
  model, X = build_graph(sess, w0)
  output = sess.run(model, feed_dict = {
    X : samples_0 + samples_1
  })

如今,你可使用下面的 Spark MLlib Transformer應用 model到DataFrame,按照分佈式的方式運行。

from sparkdl import TFTransformer
from sparkdl.graph.input import TFInputGraph
import sparkdl.graph.utils as tfx

graph = tf.Graph()
with tf.Session(graph=graph) as session, graph.as_default():
    _, _ = build_graph(session, w0)
    gin = TFInputGraph.fromGraph(session.graph, session,
                                 ["input_tensor"], ["output_tensor"])

transformer = TFTransformer(
    tfInputGraph=gin,
    inputMapping={'inputCol': tfx.tensor_name("input_tensor")},
    outputMapping={tfx.tensor_name("output_tensor"): 'outputCol'})

odf = transformer.transform(sdf)

對於 Keras 用戶

KerasTransformer 應用基於TensorFlow的Keras model到tensor inputs,不超過二維。它從給定的模型文件路徑載入 Keras model,而後應用model到column of arrays (一個array對應於 Tensor),輸出column of arrays。

from sparkdl import KerasTransformer
from keras.models import Sequential
from keras.layers import Dense
import numpy as np

# Generate random input data
num_features = 10
num_examples = 100
input_data = [{"features" : np.random.randn(num_features).tolist()} for i in range(num_examples)]
input_df = sqlContext.createDataFrame(input_data)

# Create and save a single-hidden-layer Keras model for binary classification
# NOTE: In a typical workflow, we'd train the model before exporting it to disk,
# but we skip that step here for brevity
model = Sequential()
model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
model.add(Dense(units=1, activation='sigmoid'))
model_path = "/tmp/simple-binary-classification"
model.save(model_path)

# Create transformer and apply it to our input data
transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path)
final_df = transformer.transform(input_df)

部署 models 到 SQL 函數

將模型提高到生產級的方法之一是將其部署到Spark SQL用戶定義函數(UDF,User Defined Function),從而容許任何熟悉SQL的人都能使用。Deep Learning Pipelines提供了一種機制,,能夠將深度學習模型register 爲一個Spark SQL User Defined Function (UDF)。尤爲是,Deep Learning Pipelines 0.2.0添加了Keras models建立爲 SQL UDFs,能夠與影像數據一塊兒工做。

結果UDF獲取 column (格式化爲image struct "SpImage") 而且產生給定的Keras model輸出,好比對於 Inception V3,將產生 real valued score vector over the ImageNet object categories。

咱們能夠爲 Keras model 註冊一個 UDF,能夠用於影像處理,像下面這樣:

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

registerKerasImageUDF("inceptionV3_udf", InceptionV3(weights="imagenet"))

一樣,咱們也能夠從模型文件register一個UDF:

registerKerasImageUDF("my_custom_keras_model_udf", "/tmp/model-full-tmp.h5")

在Keras處理影像的流程中,一般有一些預處理步驟,而後纔將模型應用於影像數據。若是我媽的模型須要預處理,咱們可選提供預處理函數給 UDF registration過程。預處理器經過接收一個文件路徑,返回一個image array,下面是一個簡單的例子:

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

def keras_load_img(fpath):
    from keras.preprocessing.image import load_img, img_to_array
    import numpy as np
    img = load_img(fpath, target_size=(299, 299))
    return img_to_array(img).astype(np.uint8)

registerKerasImageUDF("inceptionV3_udf_with_preprocessing", InceptionV3(weights="imagenet"), keras_load_img)

一旦 UDF 註冊完畢,就能夠在SQL查詢中使用了。以下所示:

from pyspark.ml.image import ImageSchema

image_df = ImageSchema.readImages(sample_img_dir)
image_df.registerTempTable("sample_images")
SELECT my_custom_keras_model_udf(image) as predictions from sample_images

許可與受權

相關文章
相關標籤/搜索