大數據時代,基於單機的建模很難知足企業不斷增加的數據量級的需求,開發者須要使用分佈式的開發方式,在集羣上進行建模。而單機和分佈式的開發代碼有必定的區別,本文就將爲開發者們介紹,基於TensorFlow進行分佈式開發的兩種方式,幫助開發者在實踐的過程當中,更好地選擇模塊的開發方向。python
基於TensorFlow原生的分佈式開發服務器
分佈式開發會涉及到更新梯度的方式,有同步和異步的兩個方案,同步更新的方式在模型的表現上能更快地進行收斂,而異步更新時,迭代的速度則會更加快。兩種更新方式的圖示以下:app
同步更新流程框架
(圖片來源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)異步
異步更新流程分佈式
(圖片來源:TensorFlow:Large-Scale Machine Learning on Heterogeneous Distributed Systems)大數據
TensorFlow是基於ps、work 兩種服務器進行分佈式的開發。ps服務器能夠只用於參數的彙總更新,讓各個work進行梯度的計算。spa
基於TensorFlow原生的分佈式開發的具體流程以下:code
首先指定ps 服務器啓動參數 –job_name=ps:server
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=ps --task_index=0
接着指定work服務器參數(啓動兩個work 節點) –job_name=work2:
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=0
python distribute.py --ps_hosts=192.168.100.42:2222 --worker_hosts=192.168.100.42:2224,192.168.100.253:2225 --job_name=worker --task_index=1
以後,上述指定的參數 worker_hosts ps_hosts job_name task_index 都須要在py文件中接受使用:
tf.app.flags.DEFINE_string("worker_hosts", "默認值", "描述說明")
接收參數後,須要分別註冊ps、work,使他們各司其職:
ps_hosts = FLAGS.ps_hosts.split(",") worker_hosts = FLAGS.worker_hosts.split(",") cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts}) server = tf.train.Server(cluster,job_name=FLAGS.job_name,task_index=FLAGS.task_index) issync = FLAGS.issync if FLAGS.job_name == "ps": server.join() elif FLAGS.job_name == "worker": with tf.device(tf.train.replica_device_setter( worker_device="/job:worker/task:%d" % FLAGS.task_index, cluster=cluster)):
繼而更新梯度。
(1)同步更新梯度:
rep_op = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=len(worker_hosts), replica_id=FLAGS.task_index, total_num_replicas=len(worker_hosts), use_locking=True) train_op = rep_op.apply_gradients(grads_and_vars,global_step=global_step) init_token_op = rep_op.get_init_tokens_op() chief_queue_runner = rep_op.get_chief_queue_runner()
(2)異步更新梯度:
train_op = optimizer.apply_gradients(grads_and_vars,global_step=global_step)
最後,使用tf.train.Supervisor 進行真的迭代
另外,開發者還要注意,若是是同步更新梯度,則還須要加入以下代碼:
sv.start_queue_runners(sess, [chief_queue_runner])
sess.run(init_token_op)
須要注意的是,上述異步的方式須要自行指定集羣IP和端口,不過,開發者們也能夠藉助TensorFlowOnSpark,使用Yarn進行管理。
基於TensorFlowOnSpark的分佈式開發
做爲個推面向開發者服務的移動APP數據統計分析產品,個數所具備的用戶行爲預測功能模塊,即是基於TensorFlowOnSpark這種分佈式來實現的。基於TensorFlowOnSpark的分佈式開發使其能夠在屏蔽了端口和機器IP的狀況下,也可以作到較好的資源申請和分配。而在多個千萬級應用同時建模的狀況下,集羣也有良好的表現,在sparkUI中也能看到相對應的資源和進程的狀況。最關鍵的是,TensorFlowOnSpark能夠在單機過分到分佈式的狀況下,使代碼方便修改,且容易部署。
基於TensorFlowOnSpark的分佈式開發的具體流程以下:
首先,須要使用spark-submit來提交任務,同時指定spark須要運行的參數(–num-executors 6等)、模型代碼、模型超參等,一樣須要接受外部參數:
parser = argparse.ArgumentParser() parser.add_argument("-i", "--tracks", help="數據集路徑") args = parser.parse_args()
以後,準備好參數和訓練數據(DataFrame),調用模型的API進行啓動。
其中,soft_dist.map_fun是要調起的方法,後面均是模型訓練的參數。
estimator = TFEstimator(soft_dist.map_fun, args) \ .setInputMapping({'tracks': 'tracks', 'label': 'label'}) \ .setModelDir(args.model) \ .setExportDir(args.serving) \ .setClusterSize(args.cluster_size) \ .setNumPS(num_ps) \ .setEpochs(args.epochs) \ .setBatchSize(args.batch_size) \ .setSteps(args.max_steps) model = estimator.fit(df)
接下來是soft_dist定義一個 map_fun(args, ctx)的方法:
def map_fun(args, ctx): ... worker_num = ctx.worker_num # worker數量 job_name = ctx.job_name # job名 task_index = ctx.task_index # 任務索引 if job_name == "ps": # ps節點(主節點) time.sleep((worker_num + 1) * 5) cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma) num_workers = len(cluster.as_dict()['worker']) if job_name == "ps": server.join() elif job_name == "worker": with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % task_index, cluster=cluster)):
以後,可使用tf.train.MonitoredTrainingSession高級API,進行模型訓練和預測。
總結
基於TensorFlow的分佈式開發大體就是本文中介紹的兩種狀況,第二種方式能夠用於實際的生產環境,穩定性會更高。
在運行結束的時候,開發者們也可經過設置郵件的通知,及時地瞭解到模型運行的狀況。
同時,若是開發者使用SessionRunHook來保存最後輸出的模型,也須要了解到,框架代碼中的一個BUG,即它只能在規定的時間內保存,超出規定時間,即便運行沒有結束,程序也會被強制結束。若是開發者使用的版本是未修復BUG的版本,則要自行處理,放寬運行時間。