學習筆記TF065:TensorFlowOnSpark

Hadoop生態大數據系統分爲Yam、 HDFS、MapReduce計算框架。TensorFlow分佈式至關於MapReduce計算框架,Kubernetes至關於Yam調度系統。TensorFlowOnSpark,利用遠程直接內存訪問(Remote Direct Memory Access,RDMA)解決存儲功能和調度,實現深度學習和大數據融合。TensorFlowOnSpark(TFoS),雅虎開源項目。https://github.com/yahoo/Tens... 。支持ApacheSpark集羣分佈式TensorFlow訓練、預測。TensorFlowOnSpark提供橋接程序,每一個Spark Executor啓動一個對應TensorFlow進程,經過遠程進程通訊(RPC)交互。html

TensorFlowOnSpark架構。TensorFlow訓練程序用Spark集羣運行,管理Spark集羣步驟:預留,在Executor執行每一個TensorFlow進程保留一個端口,啓動數據消息監聽器。啓動,在Executor啓動TensorFlow主函數。數據獲取,TensorFlow Readers和QueueRunners機制直接讀取HDFS數據文件,Spark不訪問數據;Feeding,SparkRDD 數據發送TensorFlow節點,數據經過feed_dict機制傳入TensorFlow計算圖。關閉,關閉Executor TensorFlow計算節點、參數服務節點。Spark Driver->Spark Executor->參數服務器->TensorFlow Core->gRPC、RDMA->HDFS數據集。http://yahoohadoop.tumblr.com...node

TensorFlowOnSpark MNIST。https://github.com/yahoo/Tens... 。Standalone模式Spark集羣,一臺計算機。安裝 Spark、Hadoop。部署Java 1.8.0 JDK。下載Spark2.1.0版 http://spark.apache.org/downl... 。下載Hadoop2.7.3版 http://hadoop.apache.org/#Dow... 。0.12.1版本支持較好。
修改配置文件,設置環境變量,啓動Hadoop:$HADOOP_HOME/sbin/start-all.sh。檢出TensorFlowOnSpark源代碼:python

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代碼打包,提交任務使用:git

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

設置TensorFlowOnSpark根目錄環境變量:github

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

啓動Spark主節點(master):apache

$(SPARK_HOME)/sbin/start-master.sh

配置兩個工做節點(worker)實例,master-spark-URL鏈接主節點:服務器

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任務,MNIST zip文件轉換爲HDFS RDD 數據集:微信

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看處理過的數據集:架構

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存圖片、標記向量:框架

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把訓練集、測試集分別保存RDD數據。
https://github.com/yahoo/Tens...

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):
  """Serializes an image/label as a TFExample byte string"""
  example = tf.train.Example(
    features = tf.train.Features(
      feature = {
        'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),
        'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))
      }
    )
  )
  return example.SerializeToString()
def fromTFExample(bytestr):
  """Deserializes a TFExample from a byte string"""
  example = tf.train.Example()
  example.ParseFromString(bytestr)
  return example
def toCSV(vec):
  """Converts a vector/array into a CSV string"""
  return ','.join([str(i) for i in vec])
def fromCSV(s):
  """Converts a CSV string to a vector/array"""
  return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):
  """Writes MNIST image/label vectors into parallelized files on HDFS"""
  # load MNIST gzip into memory
  # MNIST圖像、標記向量寫入HDFS
  with open(input_images, 'rb') as f:
    images = numpy.array(mnist.extract_images(f))
  with open(input_labels, 'rb') as f:
    if format == "csv2":
      labels = numpy.array(mnist.extract_labels(f, one_hot=False))
    else:
      labels = numpy.array(mnist.extract_labels(f, one_hot=True))
  shape = images.shape
  print("images.shape: {0}".format(shape))          # 60000 x 28 x 28
  print("labels.shape: {0}".format(labels.shape))   # 60000 x 10
  # create RDDs of vectors
  imageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)
  labelRDD = sc.parallelize(labels, num_partitions)
  output_images = output + "/images"
  output_labels = output + "/labels"
  # save RDDs as specific format
  # RDDs保存特定格式
  if format == "pickle":
    imageRDD.saveAsPickleFile(output_images)
    labelRDD.saveAsPickleFile(output_labels)
  elif format == "csv":
    imageRDD.map(toCSV).saveAsTextFile(output_images)
    labelRDD.map(toCSV).saveAsTextFile(output_labels)
  elif format == "csv2":
    imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)
  else: # format == "tfr":
    tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))
    # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
    tfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
                                keyClass="org.apache.hadoop.io.BytesWritable",
                                valueClass="org.apache.hadoop.io.NullWritable")
#  Note: this creates TFRecord files w/o requiring a custom Input/Output format
#  else: # format == "tfr":
#    def writeTFRecords(index, iter):
#      output_path = "{0}/part-{1:05d}".format(output, index)
#      writer = tf.python_io.TFRecordWriter(output_path)
#      for example in iter:
#        writer.write(example)
#      return [output_path]
#    tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
#    tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):
  """Reads/verifies previously created output"""
  output_images = output + "/images"
  output_labels = output + "/labels"
  imageRDD = None
  labelRDD = None
  if format == "pickle":
    imageRDD = sc.pickleFile(output_images)
    labelRDD = sc.pickleFile(output_labels)
  elif format == "csv":
    imageRDD = sc.textFile(output_images).map(fromCSV)
    labelRDD = sc.textFile(output_labels).map(fromCSV)
  else: # format.startswith("tf"):
    # requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jar
    tfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
                              keyClass="org.apache.hadoop.io.BytesWritable",
                              valueClass="org.apache.hadoop.io.NullWritable")
    imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))
  num_images = imageRDD.count()
  num_labels = labelRDD.count() if labelRDD is not None else num_images
  samples = imageRDD.take(10)
  print("num_images: ", num_images)
  print("num_labels: ", num_labels)
  print("samples: ", samples)
if __name__ == "__main__":
  import argparse
  from pyspark.context import SparkContext
  from pyspark.conf import SparkConf
  parser = argparse.ArgumentParser()
  parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")
  parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)
  parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")
  parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")
  parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args()

print("args:",args)
  sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize"))
  if not args.read:
    # Note: these files are inside the mnist.zip file
    writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions)
    writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions)
  if args.read or args.verify:
    readMNIST(sc, args.output + "/train", args.format)

提交訓練任務,開始訓練,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 構建TensorFlow 分佈式任務,定義分佈式任務主函數,啓動TensorFlow主函數map_fun,數據獲取方式Feeding。獲取TensorFlow集羣和服務器實例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode調用tfspark.zip TFNode.py文件。

mnist_spark.py文件是訓練主程序,TensorFlowOnSpark部署步驟:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":
  images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",
                              keyClass="org.apache.hadoop.io.BytesWritable",
                              valueClass="org.apache.hadoop.io.NullWritable")
  def toNumpy(bytestr):
    example = tf.train.Example()
    example.ParseFromString(bytestr)
    features = example.features.feature
    image = numpy.array(features['image'].int64_list.value)
    label = numpy.array(features['label'].int64_list.value)
    return (image, label)
  dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:
  if args.format == "csv":
    images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])
    labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])
  else: # args.format == "pickle":
    images = sc.pickleFile(args.images)
    labels = sc.pickleFile(args.labels)
  print("zipping images and labels")
  dataRDD = images.zip(labels)
#1.爲在Executor執行每一個TensorFlow進程保留一個端口
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.啓動Tensorflow主函數
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":
  #3.訓練
  cluster.train(dataRDD, args.epochs)
else:
  #3.預測
  labelRDD = cluster.inference(dataRDD)
  labelRDD.saveAsTextFile(args.output)
#4.關閉Executor TensorFlow計算節點、參數服務節點
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

預測命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

還能夠Amazon EC2運行及在Hadoop集羣採用YARN模式運行。

參考資料:
《TensorFlow技術解析與實戰》

歡迎推薦上海機器學習工做機會,個人微信:qingxingfengzi

相關文章
相關標籤/搜索