深度學習須要一個樣本數據處理、模型訓練、模型檢驗、模型部署的完整處理過程,而傳統的深度學習引擎主要完成訓練計算和模型調用的核心功能,在用於規模化的生產級應用時還須要大量的開發工做,運維管理也較爲複雜。html
Apache Spark上的深度學習流水線提供了一個高階的API接口,能夠經過Python支持深度學習的規模伸縮能力。這得益於Spark的集羣計算和分佈式內存架構,能夠快速存取大規模的數據以及調用多個節點上的計算能力。python
深度學習流水線(Deep Learning Pipelines)提供了高級API,經過Python進行深度學習的規模伸縮,運行於Spark計算集羣之上。git
該支持庫來自於Databricks和 Spark的兩大優點:github
目前,TensorFlow和TensorFlow支持下的Keras深度學習引擎已經支持,主要聚焦於:sql
下一步,將爲數據科學家和機器學習專家提供工具,使其能將深度學習模型轉化爲SQL函數,從而能讓更多的用戶羣體所使用。這不是簡單地執行單個模型的分佈式訓練,而是一個活躍的研究領域,咱們將可以爲大多數深度學習的適用場景提供現實可操做的解決方案。apache
對該庫的概覽描述,參見Databricks的博客(blog post),對深度學習流水線進行了介紹。對於該軟件庫服務的多種應用案例,查看下面的快速使用參考部分(Quick user guide)。session
該支持庫還在早期開發階段,還有任何人提出反饋及做出貢獻。架構
開發維護者: Bago Amirbekian, Joseph Bradley, Yogesh Garg, Sue Ann Hong, Tim Hunter, Siddharth Murching, Tomas Nykodym, Lu Wangapp
爲了編譯該項目, 從項目主目錄運行 build/sbt assembly
。這將啓動 Scala unit tests。運維
爲了運行Python的 unit tests, 在e python/
目錄下啓動 run-tests.sh
腳本 (編譯以後)。首先須要設置幾個環境變量。
# Be sure to run build/sbt assembly before running the Python tests sparkdl$ SPARK_HOME=/usr/local/lib/spark-2.3.0-bin-hadoop2.7 PYSPARK_PYTHON=python3 SCALA_VERSION=2.11.8 SPARK_VERSION=2.3.0 ./python/run-tests.sh
爲了使用最新的代碼,Spark 2.3.0 是必須的,建議使用Python 3.6 和 Scala 2.11。查看 travis config 得到一般的測試所用的軟件組合。
每一版本的兼容性要求列在版本( Releases)一節。
提問和參與開發討論,到 DL Pipelines Google group.
提交bug報告或者特性要求,在 Github issues 中建立條目或參與已有的話題。
深度學習流水線(Deep Learning Pipelines)提供了一系列工具,用於使用深度學習進行影像處理。包含的分類以下:
爲了運行下面的例子,獲取Databricks notebook( Databricks docs for Deep Learning Pipelines), 能夠在最新的 Deep Learning Pipelines版本下運行. 這裏是與老版本( 0.1.0, 0.2.0, 0.3.0, 1.0.0.)兼容的一些 Databricks notebooks。
應用深度學習於影像的第一步就是載入影像。Spark和Deep Learning Pipelines包含載入數百萬張圖像到 Spark DataFrame 的實用函數,並且以分佈式方式自動解碼,容許可擴展地操做。
使用Spark's ImageSchema:
from pyspark.ml.image import ImageSchema image_df = ImageSchema.readImages("/data/myimages")
或炸,使用本身的 image library:
from sparkdl.image import imageIO as imageIO image_df = imageIO.readImagesWithCustomFn("/data/myimages",decode_f=<your image library, see imageIO.PIL_decode>)
結果 DataFrame 包含字符串列, "image" 包含 image struct ,其 schema == ImageSchema.
image_df.show()
Why images? 深度學習已經證實了在影像相關的任務處理的強大能力,所以咱們決定對 Spark 加入影像數據的內置支持。最終目的是爲了支持更多的數據類型,如文本和時間序列,創建於社區的具體需求。
Deep Learning Pipelines 提供了實用工具,用於實現影像的transfer learning , 這是利用深度學習的最快方法之一。使用Deep Learning Pipelines,,只須要幾行代碼就能完成。
from pyspark.ml.classification import LogisticRegression from pyspark.ml.evaluation import MulticlassClassificationEvaluator from pyspark.ml import Pipeline from sparkdl import DeepImageFeaturizer featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3") lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label") p = Pipeline(stages=[featurizer, lr]) model = p.fit(train_images_df) # train_images_df is a dataset of images and labels # Inspect training error df = model.transform(train_images_df.limit(10)).select("image", "probability", "uri", "label") predictionAndLabels = df.select("prediction", "label") evaluator = MulticlassClassificationEvaluator(metricName="accuracy") print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
在深度學習中,爲了對於訓練參數的不一樣值獲得最好的結果,一個重要的步驟叫作超參數調優( hyperparameter tuning)。由於Deep Learning Pipelines將深度學習做爲Spark的機器學習流水線的一個步驟,用戶可使用已經整合到Spark MLlib的超參數調優架構。
對於Keras用戶
爲了執行Keras模型上的超參數調優, KerasImageFileEstimator
用於構建一個 Estimator ,而後使用 MLlib的工具來跳有超參數(e.g. CrossValidator)。KerasImageFileEstimator
與image URI columns一塊兒工做 (不是 ImageSchema columns),爲了容許自定義的影像載入和處理函數,這在 keras中常常會用到。
爲了使用 KerasImageFileEstimator
構建 estimator , 咱們須要一個存儲爲文件的 Keras model。這能夠是 Keras 內置的模型或者用戶訓練好的模型。
from keras.applications import InceptionV3 model = InceptionV3(weights="imagenet") model.save('/tmp/model-full.h5')
咱們還須要建立一個影像載入函數,用於從URI讀取影像數據,預處理,而後返回numerical tensor到keras Model的輸入格式。而後,咱們建立KerasImageFileEstimator,接收保存的模型文件。
import PIL.Image import numpy as np from keras.applications.imagenet_utils import preprocess_input from sparkdl.estimators.keras_image_file_estimator import KerasImageFileEstimator def load_image_from_uri(local_uri): img = (PIL.Image.open(local_uri).convert('RGB').resize((299, 299), PIL.Image.ANTIALIAS)) img_arr = np.array(img).astype(np.float32) img_tnsr = preprocess_input(img_arr[np.newaxis, :]) return img_tnsr estimator = KerasImageFileEstimator( inputCol="uri", outputCol="prediction", labelCol="one_hot_label", imageLoader=load_image_from_uri, kerasOptimizer='adam', kerasLoss='categorical_crossentropy', modelFile='/tmp/model-full-tmp.h5' # local file path for model )
咱們使用其進行超參數調優,使用 CrossValidataor
執行grid search來實現。
from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml.tuning import CrossValidator, ParamGridBuilder paramGrid = ( ParamGridBuilder() .addGrid(estimator.kerasFitParams, [{"batch_size": 32, "verbose": 0}, {"batch_size": 64, "verbose": 0}]) .build() ) bc = BinaryClassificationEvaluator(rawPredictionCol="prediction", labelCol="label" ) cv = CrossValidator(estimator=estimator, estimatorParamMaps=paramGrid, evaluator=bc, numFolds=2) cvModel = cv.fit(train_df)
Spark DataFrames是應用深度學習模型到大規模數據集的天然選擇。Deep Learning Pipelines 提供了一些列Spark MLlib Transformers,將TensorFlow Graphs和基於TensorFlow的Keras Models擴展到集羣上。 這些Transformers背後由Tensorframes庫支持,在Spark worker節點上高效地處理分佈式模型和數據 。
Deep Learning Pipelines提供了幾種方法應用影像模型並擴展到集羣:
應用通用 image models
已經有不少你們都知道的影像深度學習模型。 若是要作的處理與模型提供的很像(如基於with ImageNet classes的對象識別), 或者處於探索的目的,可使用Transformer DeepImagePredictor
,簡單地指定model的名稱便可。
from pyspark.ml.image import ImageSchema from sparkdl import DeepImagePredictor image_df = ImageSchema.readImages(sample_img_dir) predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels", modelName="InceptionV3", decodePredictions=True, topK=10) predictions_df = predictor.transform(image_df)
對於 TensorFlow 用戶
Deep Learning Pipelines提供MLlib Transformer,能夠將給定的TensorFlow Graph應用於包含影像列的DataFrame (影像使用前面描述的方法載入)。這裏是一個很是簡單的例子,演示了 TensorFlow Graph 如何用於 Transformer. 實踐中,TensorFlow Graph將從文件中載入,而後用於調用 TFImageTransformer
。
from pyspark.ml.image import ImageSchema from sparkdl import TFImageTransformer import sparkdl.graph.utils as tfx # strip_and_freeze_until was moved from sparkdl.transformers to sparkdl.graph.utils in 0.2.0 from sparkdl.transformers import utils import tensorflow as tf graph = tf.Graph() with tf.Session(graph=graph) as sess: image_arr = utils.imageInputPlaceholder() resized_images = tf.image.resize_images(image_arr, (299, 299)) # the following step is not necessary for this graph, but can be for graphs with variables, etc frozen_graph = tfx.strip_and_freeze_until([resized_images], graph, sess, return_graph=True) transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph, inputTensor=image_arr, outputTensor=resized_images, outputMode="image") image_df = ImageSchema.readImages(sample_img_dir) processed_image_df = transformer.transform(image_df)
對於 Keras 用戶
爲了在Spark中用分佈式的方法應用Keras models,KerasImageFileTransformer
與TensorFlow做爲引擎的 Keras models一塊兒工做。
與TFImageTransformer
的API的不一樣在於,一般Keras workflows有一些很是特殊的載入和重設尺寸等方法,而這些功能一般不是 TensorFlow Graph的一部分。
爲了使用transformer, 咱們首先須要一個存儲在文件中的Keras model。 咱們直接使用Keras內置的 InceptionV3 model,就不用本身來訓練了。
from keras.applications import InceptionV3 model = InceptionV3(weights="imagenet") model.save('/tmp/model-full.h5')
再使用模型來進行預測:
from keras.applications.inception_v3 import preprocess_input from keras.preprocessing.image import img_to_array, load_img import numpy as np import os from pyspark.sql.types import StringType from sparkdl import KerasImageFileTransformer def loadAndPreprocessKerasInceptionV3(uri): # this is a typical way to load and prep images in keras image = img_to_array(load_img(uri, target_size=(299, 299))) # image dimensions for InceptionV3 image = np.expand_dims(image, axis=0) return preprocess_input(image) transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions", modelFile='/tmp/model-full-tmp.h5', # local file path for model imageLoader=loadAndPreprocessKerasInceptionV3, outputMode="vector") files = [os.path.abspath(os.path.join(dirpath, f)) for f in os.listdir("/data/myimages") if f.endswith('.jpg')] uri_df = sqlContext.createDataFrame(files, StringType()).toDF("uri") keras_pred_df = transformer.transform(uri_df)
Deep Learning Pipelines 也提供了使用tensor inputs應用模型 (到 2 dimensions), 由通用的 deep learning libraries提供:
對於 TensorFlow 用戶
TFTransformer
應用一個用戶指定的TensorFlow graph到tensor inputs(最多二維)。TensorFlow graph 能夠做爲TensorFlow graph objects (tf.Graph
指定,或者是一個引用 tf.GraphDef
),或者是 checkpoint ,或者是 SavedModel
objects (查看input object class 得到更多細節).。 transform()
函數應用 TensorFlow graph 到輸入 DataFrame 的column of arrays (這裏每個 array 對應於一個 Tensor),而且輸出 column of arrays,對應於每個 graph。
首先,咱們建立一個二維點的樣本數據集, 圍繞兩個不一樣中心點的高斯分佈。
import numpy as np from pyspark.sql.types import Row n_sample = 1000 center_0 = [-1.5, 1.5] center_1 = [1.5, -1.5] def to_row(args): xy, l = args return Row(inputCol = xy, label = l) samples_0 = [np.random.randn(2) + center_0 for _ in range(n_sample//2)] labels_0 = [0 for _ in range(n_sample//2)] samples_1 = [np.random.randn(2) + center_1 for _ in range(n_sample//2)] labels_1 = [1 for _ in range(n_sample//2)] rows = map(to_row, zip(map(lambda x: x.tolist(), samples_0 + samples_1), labels_0 + labels_1)) sdf = spark.createDataFrame(rows)
下一步,編寫一個函數返回tensorflow graph和它的input:
import tensorflow as tf def build_graph(sess, w0): X = tf.placeholder(tf.float32, shape=[None, 2], name="input_tensor") model = tf.sigmoid(tf.matmul(X, w0), name="output_tensor") return model, X
而後,就是使用Tensorflow在單個節點上進行預測而編寫的代碼。
w0 = np.array([[1], [-1]]).astype(np.float32) with tf.Session() as sess: model, X = build_graph(sess, w0) output = sess.run(model, feed_dict = { X : samples_0 + samples_1 })
如今,你可使用下面的 Spark MLlib Transformer應用 model到DataFrame,按照分佈式的方式運行。
from sparkdl import TFTransformer from sparkdl.graph.input import TFInputGraph import sparkdl.graph.utils as tfx graph = tf.Graph() with tf.Session(graph=graph) as session, graph.as_default(): _, _ = build_graph(session, w0) gin = TFInputGraph.fromGraph(session.graph, session, ["input_tensor"], ["output_tensor"]) transformer = TFTransformer( tfInputGraph=gin, inputMapping={'inputCol': tfx.tensor_name("input_tensor")}, outputMapping={tfx.tensor_name("output_tensor"): 'outputCol'}) odf = transformer.transform(sdf)
對於 Keras 用戶
KerasTransformer
應用基於TensorFlow的Keras model到tensor inputs,不超過二維。它從給定的模型文件路徑載入 Keras model,而後應用model到column of arrays (一個array對應於 Tensor),輸出column of arrays。
from sparkdl import KerasTransformer from keras.models import Sequential from keras.layers import Dense import numpy as np # Generate random input data num_features = 10 num_examples = 100 input_data = [{"features" : np.random.randn(num_features).tolist()} for i in range(num_examples)] input_df = sqlContext.createDataFrame(input_data) # Create and save a single-hidden-layer Keras model for binary classification # NOTE: In a typical workflow, we'd train the model before exporting it to disk, # but we skip that step here for brevity model = Sequential() model.add(Dense(units=20, input_shape=[num_features], activation='relu')) model.add(Dense(units=1, activation='sigmoid')) model_path = "/tmp/simple-binary-classification" model.save(model_path) # Create transformer and apply it to our input data transformer = KerasTransformer(inputCol="features", outputCol="predictions", modelFile=model_path) final_df = transformer.transform(input_df)
將模型提高到生產級的方法之一是將其部署到Spark SQL用戶定義函數(UDF,User Defined Function),從而容許任何熟悉SQL的人都能使用。Deep Learning Pipelines提供了一種機制,,能夠將深度學習模型register 爲一個Spark SQL User Defined Function (UDF)。尤爲是,Deep Learning Pipelines 0.2.0添加了Keras models建立爲 SQL UDFs,能夠與影像數據一塊兒工做。
結果UDF獲取 column (格式化爲image struct "SpImage") 而且產生給定的Keras model輸出,好比對於 Inception V3,將產生 real valued score vector over the ImageNet object categories。
咱們能夠爲 Keras model 註冊一個 UDF,能夠用於影像處理,像下面這樣:
from keras.applications import InceptionV3 from sparkdl.udf.keras_image_model import registerKerasImageUDF registerKerasImageUDF("inceptionV3_udf", InceptionV3(weights="imagenet"))
一樣,咱們也能夠從模型文件register一個UDF:
registerKerasImageUDF("my_custom_keras_model_udf", "/tmp/model-full-tmp.h5")
在Keras處理影像的流程中,一般有一些預處理步驟,而後纔將模型應用於影像數據。若是我媽的模型須要預處理,咱們可選提供預處理函數給 UDF registration過程。預處理器經過接收一個文件路徑,返回一個image array,下面是一個簡單的例子:
from keras.applications import InceptionV3 from sparkdl.udf.keras_image_model import registerKerasImageUDF def keras_load_img(fpath): from keras.preprocessing.image import load_img, img_to_array import numpy as np img = load_img(fpath, target_size=(299, 299)) return img_to_array(img).astype(np.uint8) registerKerasImageUDF("inceptionV3_udf_with_preprocessing", InceptionV3(weights="imagenet"), keras_load_img)
一旦 UDF 註冊完畢,就能夠在SQL查詢中使用了。以下所示:
from pyspark.ml.image import ImageSchema image_df = ImageSchema.readImages(sample_img_dir) image_df.registerTempTable("sample_images")
SELECT my_custom_keras_model_udf(image) as predictions from sample_images
DeepImageFeaturizer
和 DeepImagePredictor
使用) 經過 MIT license提供,位於 https://github.com/fchollet/keras/blob/master/LICENSE ,其餘的由代碼或文檔說明。更多信息,請查看 Keras applications page 。