Hadoop生態大數據系統分爲Yam、 HDFS、MapReduce計算框架。TensorFlow分佈式至關於MapReduce計算框架,Kubernetes至關於Yam調度系統。TensorFlowOnSpark,利用遠程直接內存訪問(Remote Direct Memory Access,RDMA)解決存儲功能和調度,實現深度學習和大數據融合。TensorFlowOnSpark(TFoS),雅虎開源項目。https://github.com/yahoo/Tens... 。支持ApacheSpark集羣分佈式TensorFlow訓練、預測。TensorFlowOnSpark提供橋接程序,每一個Spark Executor啓動一個對應TensorFlow進程,經過遠程進程通訊(RPC)交互。html
TensorFlowOnSpark架構。TensorFlow訓練程序用Spark集羣運行,管理Spark集羣步驟:預留,在Executor執行每一個TensorFlow進程保留一個端口,啓動數據消息監聽器。啓動,在Executor啓動TensorFlow主函數。數據獲取,TensorFlow Readers和QueueRunners機制直接讀取HDFS數據文件,Spark不訪問數據;Feeding,SparkRDD 數據發送TensorFlow節點,數據經過feed_dict機制傳入TensorFlow計算圖。關閉,關閉Executor TensorFlow計算節點、參數服務節點。Spark Driver->Spark Executor->參數服務器->TensorFlow Core->gRPC、RDMA->HDFS數據集。http://yahoohadoop.tumblr.com... 。node
TensorFlowOnSpark MNIST。https://github.com/yahoo/Tens... 。Standalone模式Spark集羣,一臺計算機。安裝 Spark、Hadoop。部署Java 1.8.0 JDK。下載Spark2.1.0版 http://spark.apache.org/downl... 。下載Hadoop2.7.3版 http://hadoop.apache.org/#Dow... 。0.12.1版本支持較好。
修改配置文件,設置環境變量,啓動Hadoop:$HADOOP_HOME/sbin/start-all.sh。檢出TensorFlowOnSpark源代碼:python
git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git cd TensorFlowOnSpark git submodule init git submodule update --force git submodule foreach --recursive git clean -dfx
源代碼打包,提交任務使用:git
cd TensorflowOnSpark/src zip -r ../tfspark.zip *
設置TensorFlowOnSpark根目錄環境變量:github
cd TensorFlowOnSpark export TFoS_HOME=$(pwd)
啓動Spark主節點(master):apache
$(SPARK_HOME)/sbin/start-master.sh
配置兩個工做節點(worker)實例,master-spark-URL鏈接主節點:服務器
export MASTER=spark://$(hostname):7077 export SPARK_WORKER_INSTANCES=2 export CORES_PER_WORKER=1 export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES))) $(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)
提交任務,MNIST zip文件轉換爲HDFS RDD 數據集:微信
$(SPARK_HOME)/bin/spark-submit \ --master $(MASTER) --conf spark.ui.port=4048 --verbose \ $(TFoS_HOME)/examples/mnist/mnist_data_setup.py \ --output examples/mnist/csv \ --format csv
查看處理過的數據集:架構
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv
查看保存圖片、標記向量:框架
hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels
把訓練集、測試集分別保存RDD數據。
https://github.com/yahoo/Tens... 。
from __future__ import absolute_import from __future__ import division from __future__ import print_function import numpy import tensorflow as tf from array import array from tensorflow.contrib.learn.python.learn.datasets import mnist def toTFExample(image, label): """Serializes an image/label as a TFExample byte string""" example = tf.train.Example( features = tf.train.Features( feature = { 'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))), 'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64"))) } ) ) return example.SerializeToString() def fromTFExample(bytestr): """Deserializes a TFExample from a byte string""" example = tf.train.Example() example.ParseFromString(bytestr) return example def toCSV(vec): """Converts a vector/array into a CSV string""" return ','.join([str(i) for i in vec]) def fromCSV(s): """Converts a CSV string to a vector/array""" return [float(x) for x in s.split(',') if len(s) > 0] def writeMNIST(sc, input_images, input_labels, output, format, num_partitions): """Writes MNIST image/label vectors into parallelized files on HDFS""" # load MNIST gzip into memory # MNIST圖像、標記向量寫入HDFS with open(input_images, 'rb') as f: images = numpy.array(mnist.extract_images(f)) with open(input_labels, 'rb') as f: if format == "csv2": labels = numpy.array(mnist.extract_labels(f, one_hot=False)) else: labels = numpy.array(mnist.extract_labels(f, one_hot=True)) shape = images.shape print("images.shape: {0}".format(shape)) # 60000 x 28 x 28 print("labels.shape: {0}".format(labels.shape)) # 60000 x 10 # create RDDs of vectors imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions) labelRDD = sc.parallelize(labels, num_partitions) output_images = output + "/images" output_labels = output + "/labels" # save RDDs as specific format # RDDs保存特定格式 if format == "pickle": imageRDD.saveAsPickleFile(output_images) labelRDD.saveAsPickleFile(output_labels) elif format == "csv": imageRDD.map(toCSV).saveAsTextFile(output_images) labelRDD.map(toCSV).saveAsTextFile(output_labels) elif format == "csv2": imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output) else: # format == "tfr": tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None)) # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") # Note: this creates TFRecord files w/o requiring a custom Input/Output format # else: # format == "tfr": # def writeTFRecords(index, iter): # output_path = "{0}/part-{1:05d}".format(output, index) # writer = tf.python_io.TFRecordWriter(output_path) # for example in iter: # writer.write(example) # return [output_path] # tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1])) # tfRDD.mapPartitionsWithIndex(writeTFRecords).collect() def readMNIST(sc, output, format): """Reads/verifies previously created output""" output_images = output + "/images" output_labels = output + "/labels" imageRDD = None labelRDD = None if format == "pickle": imageRDD = sc.pickleFile(output_images) labelRDD = sc.pickleFile(output_labels) elif format == "csv": imageRDD = sc.textFile(output_images).map(fromCSV) labelRDD = sc.textFile(output_labels).map(fromCSV) else: # format.startswith("tf"): # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0]))) num_images = imageRDD.count() num_labels = labelRDD.count() if labelRDD is not None else num_images samples = imageRDD.take(10) print("num_images: ", num_images) print("num_labels: ", num_labels) print("samples: ", samples) if __name__ == "__main__": import argparse from pyspark.context import SparkContext from pyspark.conf import SparkConf parser = argparse.ArgumentParser() parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv") parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10) parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data") parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true") parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")
args = parser.parse_args()
print("args:",args) sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize")) if not args.read: # Note: these files are inside the mnist.zip file writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions) writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions) if args.read or args.verify: readMNIST(sc, args.output + "/train", args.format)
提交訓練任務,開始訓練,在HDFS生成mnist_model,命令:
${SPARK_HOME}/bin/spark-submit \ --master ${MASTER} \ --py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \ --conf spark.cores.max=${TOTAL_CORES} \ --conf spark.task.cpus=${CORES_PER_WORKER} \ --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \ ${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \ --cluster_size ${SPARK_WORKER_INSTANCES} \ --images examples/mnist/csv/train/images \ --labels examples/mnist/csv/train/labels \ --format csv \ --mode train \ --model mnist_model
mnist_dist.py 構建TensorFlow 分佈式任務,定義分佈式任務主函數,啓動TensorFlow主函數map_fun,數據獲取方式Feeding。獲取TensorFlow集羣和服務器實例:
cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)
TFNode調用tfspark.zip TFNode.py文件。
mnist_spark.py文件是訓練主程序,TensorFlowOnSpark部署步驟:
from __future__ import absolute_import from __future__ import division from __future__ import print_function from pyspark.context import SparkContext from pyspark.conf import SparkConf import argparse import os import numpy import sys import tensorflow as tf import threading import time from datetime import datetime from tensorflowonspark import TFCluster import mnist_dist sc = SparkContext(conf=SparkConf().setAppName("mnist_spark")) executors = sc._conf.get("spark.executor.instances") num_executors = int(executors) if executors is not None else 1 num_ps = 1 parser = argparse.ArgumentParser() parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100) parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1) parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv") parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format") parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format") parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model") parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors) parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions") parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1) parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000) parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true") parser.add_argument("-X", "--mode", help="train|inference", default="train") parser.add_argument("-c", "--rdma", help="use rdma connection", default=False) args = parser.parse_args() print("args:",args) print("{0} ===== Start".format(datetime.now().isoformat())) if args.format == "tfr": images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat", keyClass="org.apache.hadoop.io.BytesWritable", valueClass="org.apache.hadoop.io.NullWritable") def toNumpy(bytestr): example = tf.train.Example() example.ParseFromString(bytestr) features = example.features.feature image = numpy.array(features['image'].int64_list.value) label = numpy.array(features['label'].int64_list.value) return (image, label) dataRDD = images.map(lambda x: toNumpy(str(x[0]))) else: if args.format == "csv": images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')]) labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')]) else: # args.format == "pickle": images = sc.pickleFile(args.images) labels = sc.pickleFile(args.labels) print("zipping images and labels") dataRDD = images.zip(labels) #1.爲在Executor執行每一個TensorFlow進程保留一個端口 cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK) #2.啓動Tensorflow主函數 cluster.start(mnist_dist.map_fun, args) if args.mode == "train": #3.訓練 cluster.train(dataRDD, args.epochs) else: #3.預測 labelRDD = cluster.inference(dataRDD) labelRDD.saveAsTextFile(args.output) #4.關閉Executor TensorFlow計算節點、參數服務節點 cluster.shutdown() print("{0} ===== Stop".format(datetime.now().isoformat()))
預測命令:
${SPARK_HOME}/bin/spark-submit \ --master ${MASTER} \ --py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \ --conf spark.cores.max=${TOTAL_CORES} \ --conf spark.task.cpus=${CORES_PER_WORKER} \ --conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \ ${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \ --cluster_size ${SPARK_WORKER_INSTANCES} \ --images examples/mnist/csv/test/images \ --labels examples/mnist/csv/test/labels \ --mode inference \ --format csv \ --model mnist_model \ --output predictions
還能夠Amazon EC2運行及在Hadoop集羣採用YARN模式運行。
參考資料:
《TensorFlow技術解析與實戰》
歡迎推薦上海機器學習工做機會,個人微信:qingxingfengzi