TensorFlow中的通訊機制——Rendezvous(二)gRPC傳輸

背景

[做者:DeepLearningStack,阿里巴巴算法工程師,開源TensorFlow Contributor]html

本篇是TensorFlow通訊機制系列的第二篇文章,主要梳理使用gRPC網絡傳輸部分模塊的結構和源碼。若是讀者對TensorFlow中Rendezvous部分的基本結構和原理還不是很是瞭解,那麼建議先從這篇文章開始閱讀。TensorFlow在最初被開源時還只是個單機的異構訓練框架,在迭代到0.8版本開始正式支持多機分佈式訓練。與其餘分佈式訓練框架不一樣,Google選用了開源項目gRPC做爲TensorFlow的跨機通訊協議做爲支持。gRPC的編程和使用實際上是相對複雜的,TensorFlow爲了能讓gRPC的調用更加平滑,在調用鏈封裝和抽象上面作了較多工做,甚至有些工做例如建立和管理gRPC channel涉及到了GrpcSession模塊。從我的角度來看,利用gRPC進行Tensor通訊的過程已經足夠豐富,因此咱們只針對gRPC傳輸Tensor過程進行梳理,至於涉及到gRPC管理方面的內容會在另外一篇介紹分佈式Session建立和管理的文章中集中梳理。算法

跨進程通訊過程

根據以前寫博客的經驗,直接介紹類圖結構和源碼部分可能會讓人懵圈,仍是先從邏輯上把通訊過程梳理清楚更能作到深刻淺出。其實對於不是很是瞭解分佈式系統或大規模併發系統的讀者而言,TensorFlow中通訊過程是有些「彆扭」的。那麼有的讀者可能會以爲詫異,跨進程通訊過程不就是一方作Send,另外一方作Recv嗎?這是一個理所固然的過程,爲何會「彆扭」呢?是的,整個過程依然是一方作Send,另外一方作Recv。而它的「彆扭」之處就在於——真正的通訊過程由Recv方觸發,而不是Send方!這就是理解TensorFlow中使用gRPC傳輸Tensor過程的最關鍵點。編程

前一篇文章分析過在本地傳輸的場景下Tensor通訊的大致過程,從機制和邏輯上來講,跨進程傳輸過程和本地傳輸沒有很大的差別:TensorFlow使用Rendezvous通訊Tensor,藉助一個相似Table的數據結構做爲傳輸的中轉,而且Send方和Recv方依靠ParsedKey這一惟一傳輸標識符,跨進程通訊也是如此。若是讀者對這部份內容不瞭解,能夠參考這篇文章。設計模式

Send方——將Ready的Tensor掛入本地Table

和本地傳輸場景下的Send過程相同,本地Tensor處於Ready狀態後就被放掛了本地Worker的Table中,至此Send過程就所有完成了。因此Send過程徹底沒有涉及到任何跨網絡傳輸的內容,而且Send過程是非阻塞的。性能優化

Recv方——向Send方主動發出請求,觸發通訊過程

Recv方是Tensor的接收方,它的處理過程是:將所須要的Tensor對應的ParsedKey拼出後,主動向Send方主動發出Request,Send方在接收到Request後當即在本地Table中查找方所須要的Tensor,找到後將Tensor封裝成Response發送回Recv方。在這個過程當中,Recv方能夠認爲是Client,Send方能夠認爲是Server,經過發送Request和Response來完成Tensor的傳輸。網絡

結構設計解析

建議讀者在閱讀本節時適當翻開TensorFlow C++部分源碼,但只須要理解結構關係便可(好比類之間的繼承、組合、依賴關係),暫時不要閱讀類的實現內容。由於RemoteRendezvous部分涉及到的類結構很是多,直接陷入細節的閱讀會深陷其中不能自拔,甚至弄得一頭霧水十分疲憊。在梳理結構時一邊參照下文中的類圖結構,一邊從設計模式和架構的角度嘗試去理解每一個模塊的司職是理解本篇細節的關鍵。先理解宏觀結構看懂架子,再去深刻理解實現細節嘗試去優化是讀任何代碼的正確順序。session

任何場景下,通訊過程幾乎都是能夠經過簡單的圖將功能描述清楚的。可是不能否認的是,任何涉及到分佈式通訊的系統在架構上都會對通訊層作相對複雜的封裝。一方面是由於通訊雖然功能簡單,但其實現自己具備相對較高的複雜性(你們能夠嘗試閱讀gRPC源碼感覺下底層軟件的複雜度)。另外一方面,應用層也須要與通訊底層經過抽象儘可能實現較好的解耦,這樣也方便將應用層模塊被其餘團隊擴展編寫。下面咱們一塊兒來探究TensorFlow中涉及到跨進程通訊的Rendezvous系列。數據結構

兩層抽象繼承關係——RemoteRendezvous與BaseRemoteRendezvous

前一篇在介紹本地傳輸時咱們熟悉了Rendezvous模塊中與本地傳輸相關的類,例如LocalRendezvousImpl,IntraProcessRendezvous和SimpleRendezvous。對應地,跨進程傳輸也有不一樣的Rendezvous,從根源上來講,它們也繼承於Rendezvous接口,而且不一樣的傳輸協議也有各自的Rendezvous。在這裏,咱們再次將前文中展現的整體類結構圖展現出來,此次咱們將涉及到遠程傳輸的類用特殊顏色標出,以下圖所示。多線程

綜合來看,從Rendezvous的繼承結構來看,涉及到跨進程傳輸的Rendezvous有層:架構

1. RemoteRendezvous:只增長了一個Initialize方法,並標記爲純虛函數。這是由於跨進程Rendezvous須要藉助Session作一些初始化工做,因此TensorFlow中全部涉及到跨進程通訊的Rendezvous都須要重寫Initialize函數,使用前也必須強制調用該函數。

2. 各類具體協議Rendezvous的基類——BaseRemoteRendezvous:既然全部涉及跨進程通訊的Rendezvous都須要提供各自協議下實現的Initialize函數,那麼沒有比在RemoteRendezvous和真正特化的Rendezvous之間再添加一層繼承關係更合適的作法了。事實上TensorFlow在此處也是這麼設計的,這個承上啓下的類就是BaseRemoteRendezvous。它還提供了公共的Send和Recv方法,這可讓繼承它的特化Rendezvous盡最大可能作到代碼複用。

BaseRecvTensorCall是通訊的實體抽象,後面分析時會有更深的體會,在這裏先有個印象便可。

開始特化——各類各樣的RemoteRendezvous

TensorFlow目標是通用可擴展,因此被設計成容許底層支持多種通訊協議的結構。事實上到目前爲止,算上contrib目錄的內容(contrib目錄是廣大TensorFlow貢獻者添加的內容),TensorFlow已經支持包括gRPC,RDMA(Remote Direct Memroy Access),GDR(GPU Dirrect)和MPI四種通訊協議,所以包含了四種對應的Rendezvous,他們分別是RpcRemoteRendezvous,RDMARemoteRendezvous,GdrRemoteRendezvous和MPIRemoteRendezvous。每種通訊協議各有其特色,有時候其可用性也取決於硬件和軟件條件(好比RDMA須要支持RDMA協議的網卡,一般跑在Infiniband和RoCE網絡上,若是沒有硬件支持,那麼RDMA將沒法使用,GDR也是這個道理)。從代碼中能夠看出,實現每種具體的RemoteRendezvous都有必定的複雜性,因此很難想象在沒有封裝抽象和代碼複用的結構裏如何實現這些內容。在本篇咱們關注RpcRemoteRendezvous,它是gRPC協議實現的RemoteRendezvous。 

使人熟悉的管理器模式——RendezvousMgr

爲了更好地管理RemoteRendezvous,TensorFlow設計了相應的管理器——RendezvousMgr相關類,併爲每種具體的RemoteRendevzous作了特化。熟悉設計模式的讀者都知道,管理器是一種經典的設計模式,它能使管理職責的變化獨立於類自己。RendezvousMgr主要負責RemoteRendezvous的建立和銷燬,它也定義了兩個本地版本的Recv接口。有的讀者可能會問,管理器爲何還容許作Recv?而且只能作本地的Recv?我我的判斷添加這兩個接口純粹是爲了方便某些地方的使用。至於RendezvousMgr的建立時機和RemoteRendezvous的初始化過程並非本篇解析的範疇,由於這涉及到分佈式場景下建立Server的較長鏈路,這部份內容會在之後的博客中詳細解析。下面是RendezvousMgr相關的類圖結構,咱們能夠看到其接口類中已經定義了Recv接口。

RpcRemoteRendezvous通訊過程與源碼解析

上一小節中對RemoteRendezvous相關類結構和類間的關係作了解析,旨在從架構層面幫助讀者理解各個類的職能。雖然涉及到的內容比較多,可是總體的結構和邏輯仍是很是清晰的。若是讀者嘗試經過閱讀源碼輔助理解上述內容以後仍然感受有些眼花繚亂,沒有關係,咱們在這裏暫時作一個簡單地梳理,將重點內容梳理到如下幾條。

1.  本地Rendezvous和RemoteRendezvous共同繼承了同一個接口;

2. RemoteRendezvous須要支持不一樣的通訊協議,所以派生了各類各樣的實現類;

3. RemoteRendezvous的使用較爲複雜,爲此引入了管理器模式——RendezvousMgr,它負責RemoteRendezvous的建立和銷燬,並添加了兩個額外的Recv接口方便某些場景直接調用;

4. RemoteRendezvous作了兩層繼承結構只是爲了添加一個Initialize方法。

本篇咱們梳理使用gRPC協議的部分,從上文中梳理的結構中不難看出,這部分涉及到的類並很少。

1. Rendezvous相關類——RemoteRendezvous,BaseRemoteRendezvous,RpcRemoteRendezvous;

2. 管理器——BaseRendezvousMgr,RpcRendezvousMgr

3. 其餘類——BaseRecvTensorCall,RpcRecvTensorCall和DefferedCall

畢竟是涉及到了gRPC協議自己的使用,因此有必要在梳理源碼以前從宏觀上對gRPC的工做流程作一個簡單地梳理。

gRPC編程中的代理模式——Stub與Service

在此咱們假設同窗們對gRPC的原理和使用有一些基本的瞭解,好比須要使用Protobuf預先定義Service接口,而且區分Stub和Service等。對此不瞭解的同窗仍是建議先認真閱讀一下gRPC的使用文檔和範例,下面這段文字只對gRPC作一個很是簡單的描述。

在一次RPC調用中,客戶端須要調用服務端的服務,而後將處理結果返回給客戶端。而gRPC作到了「讓客戶端調用遠端函數時就像調用本地函數同樣」的體驗,這得益於一種經典的設計模式——代理模式。負責爲客戶端代理的節點(gRPC中稱之爲Stub)會將請求和參數傳到服務端,並由Service進行實際的處理,而後將結果返回給Stub,最終返回到客戶端中。咱們甚至能夠認爲負責代理的Stub就是客戶端,由於它的職責就是與遠端交互並取得結果。另外,爲了可以讓傳輸量儘量少,也爲了可以讓傳輸不受客戶端和服務端具體的類型限制,gRPC在作跨網絡傳輸前將消息統一序列化成Protobuf格式。下圖是從gRPC官網教程中摘出的工做原理圖。

Send過程

由於Send過程並不涉及跨進程傳輸,只是將Ready的Tensor掛入本地Table之中,因此它和LocalRendezvousImpl的Send徹底相同。不只如此,TensorFlow中的任何RemoteRendezvous的Send過程都要遵循這樣的原理,基於代碼複用的考慮,將這部份內容都被抽象到了公共基類BaseRemoteRendezvous的Send函數裏是一個很好的設計。事實上,BaseRemoteRendezvous的Send過程就是調用了LocalRendezvousImpl的Send過程,因此LocalRendezvousImpl必需要做爲BaseRemoteRendezvous的成員之一。下面的代碼展現了這一過程。

 1 Status BaseRemoteRendezvous::Send(const Rendezvous::ParsedKey& parsed,
 2                                   const Rendezvous::Args& args,
 3                                   const Tensor& val, const bool is_dead) {
 4   VLOG(1) << "BaseRemoteRendezvous Send " << this << " " << parsed.FullKey();
 5   {
 6     mutex_lock l(mu_);
 7     if (!status_.ok()) return status_;
 8     DCHECK(is_initialized_locked());
 9     if (!IsLocalDevice(session_->worker_name, parsed.src_device)) {
10       return errors::InvalidArgument(
11           "Invalid rendezvous key (src): ", parsed.FullKey(), " @ ",
12           session_->worker_name);
13     }
14   }
15   // Buffers "val" and "device_context" in local_.
16   return local_->Send(parsed, args, val, is_dead);
17 }

Recv過程

Recv過程就很是複雜了,由於每種RemoteRendezvous都涉及到不一樣的通訊協議以及管理方式,因此Recv函數是真正須要繼承重寫的模塊。在看RpcRemoteRendezvous具體的實現以前,咱們必須先將gRPC定義服務的接口部分梳理清楚。

gRPC的服務定義接口文件

在TensorFlow的core/protobuf文件中,咱們須要研究一下worker_service.proto文件,這個文件中定義了若干RPC Service接口。

雖然它定義了不少RPC服務接口,可是咱們只須要關注和Tensor接收相關的接口定義便可。準確地說,目前咱們必需要知道的是下面這個Service定義。

  // See worker.proto for details.
  rpc RecvTensor(RecvTensorRequest) returns (RecvTensorResponse) {
    // RecvTensor Method
  }

顯然,這是一個讓服務端處理「接收Tensor」的服務(注意是讓服務端處理名爲「接收Tensor」的服務,而不是讓服務端去接收Tensor。由於客戶端有接收Tensor的需求,但須要服務端發送Tensor,爲客戶端發送Tensor的服務被稱之爲「接收Tensor」),按照註釋提示,咱們能夠在worker.proto中找到RecvTensorRequest和RecvTensorResponse的數據結構,這部分結構讀者能夠本身查閱,很是容易理解。在編譯時,擴展的Protobuf編譯器會對worker_service.proto中的rpc接口生成C++服務接口代碼和Stub代碼(畢竟Stub代碼比較純粹而且和業務邏輯無關,它只是一個向對應Service端發送處理請求的過程),TensorFlow只須要對具體的Service提供實現便可。

與gRPC生成的代碼聯繫起來

gRPC會爲worker_service.proto中每個rpc服務生成C++接口代碼,爲了區分多個rpc服務,特地爲每一個服務生成了特殊的名字。好比RecvTensor服務的名字就是/tensorflow.WorkerService/RecvTensor。爲了避免直接使用冗長的字符串,TensorFlow爲worker_service.proto中的每一個服務都作了enumeration的映射,這部分代碼在tensorflow/core/distributed_runtime/grpc_worker_service_impl.h和同名實現文件中。

 1 // Names of worker methods.
 2 enum class GrpcWorkerMethod {
 3   kGetStatus,
 4   kCreateWorkerSession,
 5   kDeleteWorkerSession,
 6   kRegisterGraph,
 7   kDeregisterGraph,
 8   kRunGraph,
 9   kCleanupGraph,
10   kCleanupAll,
11   kRecvTensor,
12   kRecvBuf,
13   kLogging,
14   kTracing,
15   kCompleteGroup,
16   kCompleteInstance,
17   kGetStepSequence,
18 };

下面是從enumeration類型映射到具體字符串的函數。

 1 const char* GrpcWorkerMethodName(GrpcWorkerMethod id) {
 2   switch (id) {
 3     case GrpcWorkerMethod::kGetStatus:
 4       return "/tensorflow.WorkerService/GetStatus";
 5     case GrpcWorkerMethod::kCreateWorkerSession:
 6       return "/tensorflow.WorkerService/CreateWorkerSession";
 7     case GrpcWorkerMethod::kDeleteWorkerSession:
 8       return "/tensorflow.WorkerService/DeleteWorkerSession";
 9     case GrpcWorkerMethod::kRegisterGraph:
10       return "/tensorflow.WorkerService/RegisterGraph";
11     case GrpcWorkerMethod::kDeregisterGraph:
12       return "/tensorflow.WorkerService/DeregisterGraph";
13     case GrpcWorkerMethod::kRunGraph:
14       return "/tensorflow.WorkerService/RunGraph";
15     case GrpcWorkerMethod::kCleanupGraph:
16       return "/tensorflow.WorkerService/CleanupGraph";
17     case GrpcWorkerMethod::kCleanupAll:
18       return "/tensorflow.WorkerService/CleanupAll";
19     case GrpcWorkerMethod::kRecvTensor:
20       return "/tensorflow.WorkerService/RecvTensor";
21     case GrpcWorkerMethod::kRecvBuf:
22       return "/tensorflow.WorkerService/RecvBuf";
23     case GrpcWorkerMethod::kLogging:
24       return "/tensorflow.WorkerService/Logging";
25     case GrpcWorkerMethod::kTracing:
26       return "/tensorflow.WorkerService/Tracing";
27     case GrpcWorkerMethod::kCompleteGroup:
28       return "/tensorflow.WorkerService/CompleteGroup";
29     case GrpcWorkerMethod::kCompleteInstance:
30       return "/tensorflow.WorkerService/CompleteInstance";
31     case GrpcWorkerMethod::kGetStepSequence:
32       return "/tensorflow.WorkerService/GetStepSequence";
33   }
34   // Shouldn't be reached.
35   LOG(FATAL) << "Invalid id: this line shouldn't be reached.";
36   return "invalid id";
37 }

另外,還須要爲每一個RPC服務註冊爲異步服務,這須要使用gRPC自帶的AddMethod接口和MarkMethodAsync接口,以下所示。

1 WorkerService::AsyncService::AsyncService() {
2   for (int i = 0; i < kGrpcNumWorkerMethods; ++i) {
3     AddMethod(new ::grpc::internal::RpcServiceMethod(
4         GrpcWorkerMethodName(static_cast<GrpcWorkerMethod>(i)),
5         ::grpc::internal::RpcMethod::NORMAL_RPC, nullptr));
6     ::grpc::Service::MarkMethodAsync(i);
7   }
8 }

好了,接下來就是解析源碼中具體的交互過程了。其實TensorFlow在框架層面對gRPC的使用了一些Best Practice,好比異步處理請求的架構和多線程輪詢Completion Queue等。將這些連在一塊兒梳理須要更多的篇幅,一次性展現大量的內容也不利於閱讀,因此咱們只對發送和接收過程作一個梳理。

Client端的調用鏈

從BaseRemoteRendeezvous的RecvAsync出發,逐漸深刻調用鏈底層。時序圖是分析調用鏈的最好工具,下面給出了Client端到Stub的調用過程,這裏面涉及到了幾個新的類。

1. RpcRecvTensorCall:這是一次gRPC調用的抽象,繼承了BaseRecvTensorCall這個抽象基類,它封裝了複雜的後續調用鏈。

2. GrpcRemoteWorker:它也是client端的內容,只不過它是Remote端的代理。

3. RpcState:這是真正封裝了一次RPC調用及狀態的類,它會直接對Stub以及GenericClientAsyncResponseReader進行管理,好比向服務端發送異步請求並等待結果等。

Client端是一個虛擬角色,它能夠是調用RpcRemoteRendezvous的任何一個模塊。咱們能夠看到,RpcRemoteRendezvous的一次RecvRemoteAsync過程很是長,而且Stub的調用時異步的。這裏的代碼確實有些多,因此咱們只展現一下關鍵代碼段,可是建議讀者打開源碼仔細閱讀每一個調用鏈。

下面是RecvRemoteAsync的代碼段,主要作了RpcRecvTensorCall的初始化,註冊以及啓動工做。

 1 void RpcRemoteRendezvous::RecvFromRemoteAsync(
 2     const Rendezvous::ParsedKey& parsed, const Rendezvous::Args& recv_args,
 3     DoneCallback done) {
 4   CHECK(is_initialized());
 5   Status s;
 6 
 7   // Prepare a RecvTensor call that can handle being aborted.
 8   RpcRecvTensorCall* call = get_call_freelist()->New();
 9 
10   // key.src_device identifies a remote device.
11   if (!DeviceNameUtils::SplitDeviceName(parsed.src_device, &call->src_worker_,
12                                         &call->src_rel_device_)) {
13     s = errors::Internal(parsed.src_device,
14                          " is invalid remote source device.");
15   }
16   WorkerSession* sess = session();
17   WorkerInterface* rwi = sess->worker_cache->CreateWorker(call->src_worker_);
18   if (s.ok() && rwi == nullptr) {
19     s = errors::Internal("No worker known as ", call->src_worker_);
20   }
21 
22   Device* dst_device;
23   if (s.ok()) {
24     s = sess->device_mgr()->LookupDevice(parsed.dst_device, &dst_device);
25   }
26   if (!s.ok()) {
27     if (rwi != nullptr) {
28       sess->worker_cache->ReleaseWorker(call->src_worker_, rwi);
29     }
30     get_call_freelist()->Release(call, sess->worker_cache.get());
31     done(s, Args(), recv_args, Tensor{}, false);
32     return;
33   }
34 
35   call->Init(rwi, step_id_, parsed.FullKey(), recv_args.alloc_attrs, dst_device,
36              recv_args, std::move(done));
37 
38   // Record "call" in active_ so that it can be aborted cleanly.
39   RegisterCall(call);
40 
41   // RendezvousMgr already aborted, shouldn't send RPC call any more
42   if (!call->status().ok()) {
43     call->done()(call->status(), Args(), Args(), Tensor(), false);
44     session()->worker_cache->ReleaseWorker(call->src_worker_, call->wi_);
45     call->wi_ = nullptr;
46     get_call_freelist()->Release(call, session()->worker_cache.get());
47     return;
48   }
49 
50   // Start "call".
51   Ref();
52   call->Start([this, call]() {
53     // Removes "call" from active_. Prevent StartAbort().
54     DeregisterCall(call);
55     // If StartAbort was called prior to DeregisterCall, then the
56     // current status should be bad.
57     Status s = call->status();
58     call->done()(s, Args(), call->recv_args(), call->tensor(), call->is_dead());
59     session()->worker_cache->ReleaseWorker(call->src_worker_, call->wi_);
60     call->wi_ = nullptr;
61     get_call_freelist()->Release(call, session()->worker_cache.get());
62     Unref();
63   });
64 }

下面是GrpcRemoteWorker調用RPCState的過程,最後的IssueRequest即開始建立RPCState並觸發stub的調用。

void RecvTensorAsync(CallOptions* call_opts, const RecvTensorRequest* request,
                       TensorResponse* response, StatusCallback done) override {
    VLOG(1) << "RecvTensorAsync req: " << request->DebugString();
    int64 start_usec = Env::Default()->NowMicros();
    // Type-specialized logging for this method.
    bool logging_active = logger_->LoggingActive() || VLOG_IS_ON(2);
    StatusCallback wrapper_done;
    const StatusCallback* cb_to_use;
    if (!logging_active) {
      cb_to_use = &done;  // No additional work to do, so just use done directly
    } else {
      wrapper_done = [this, request, response, done, start_usec](Status s) {
        if (logger_->LoggingActive()) {
          int64 end_usec = Env::Default()->NowMicros();
          int64 step_id = request->step_id();
          int64 bytes = response->tensor().TotalBytes();
          int64 send_start_usec = start_usec;
          // If a send start time was reported by the other side, use
          // that instead.  Maybe we should mark the display if we're using
          // our local time instead of the remote start time?
          if (response->metadata().send_start_micros()) {
            // send_start_micros is the timestamp taken when the
            // remote machine began to send the RecvTensor response.
            // Due to clock skew between source and dest machines, it
            // is possible that send_start_micros can be larger than
            // end_usec or less than start_usec.
            //
            // To respect causality, we enforce the invariants that
            // the RecvTensor response can not have been sent before
            // the RecvTensor request, and must have been sent before
            // it was received.
            send_start_usec = std::max(
                start_usec,
                static_cast<int64>(response->metadata().send_start_micros()));
            send_start_usec = std::min(send_start_usec, end_usec - 1);
          }
          const string& key = request->rendezvous_key();
          std::vector<string> key_parts = str_util::Split(key, ';');
          if (key_parts.size() != 5) {
            LOG(WARNING) << "Bad key: " << key;
          } else {
            logger_->RecordRecvTensor(step_id, send_start_usec, end_usec,
                                      key_parts[3],  // tensor name
                                      key_parts[0],  // src_device
                                      key_parts[2],  // dst_device
                                      bytes);
          }
        }
        VLOG(2) << "done callback, req: " << request->DebugString()
                << " response " << response->metadata().DebugString();
        done(s);
      };
      cb_to_use = &wrapper_done;
    }

    IssueRequest(request, response, recvtensor_, *cb_to_use, call_opts);
  }

最後展現一下Stub的觸發位置,這個函數在RPCState類中,而且在建立RPCState對象時當即被調用。

 1 void StartCall() {
 2     context_.reset(new ::grpc::ClientContext());
 3     context_->set_fail_fast(fail_fast_);
 4 
 5     if (timeout_in_ms_ > 0) {
 6       context_->set_deadline(
 7           gpr_time_from_millis(timeout_in_ms_, GPR_TIMESPAN));
 8     }
 9     if (call_opts_) {
10       call_opts_->SetCancelCallback([this]() { context_->TryCancel(); });
11     }
12 
13     VLOG(2) << "Starting call: " << method_;
14 
15     call_ = std::move(
16         stub_->PrepareUnaryCall(context_.get(), method_, request_buf_, cq_));
17     call_->StartCall();
18     call_->Finish(&response_buf_, &status_, this);
19   }

Server端負責查找Tensor的Service

若是咱們把異步處理請求的架構和多線程輪詢Completion Queue的Best Practice去除,那麼Service端其實並不複雜,調用鏈相對Client端短了不少,下面的時序圖展現了自Server端接收請求後的調用過程,這裏面也涉及到了幾個新的類。

1. GrpcWorkerServiceThread:這是服務端處理請求的線程類。

2. GrpcWorker:這是真正負責處理請求的Worker,是GrpcRemoteWorker的服務端版本;

3. WorkerCall:這是服務端處理一次gRPC請求和響應的類,抽象爲WorkerCall,其實這也是個別名,真實的名稱較長;

4. ServerAsyncResponseWriter:這是gRPC爲用戶端提供的Response writer,是承載響應的實體。

5. Utils:這其實不是一個類,而是多個工具的組合,爲了在時序圖表達方便,統稱爲Utils。

能夠看出,服務端接收到請求後,會調用RecvLocalAsync在本地將客戶端所須要的Tensor查找出來,而後拷貝到CPU上,最後利用gRPC發送回客戶端。一樣,咱們展現關鍵代碼段。

下面是GrpcWorker調用RendezvousMgr的RecvLocalAsync爲客戶端尋找真正Tensor的過程。回調函數中可以看出,在找到對應Tensor後,須要將Tensor作Encode,而後拷貝到CPU端。

 1  env_->rendezvous_mgr->RecvLocalAsync(
 2       step_id, parsed,
 3       [opts, response, done, src_dev, request](
 4           const Status& status, const Rendezvous::Args& send_args,
 5           const Rendezvous::Args& recv_args, const Tensor& val,
 6           const bool is_dead) {
 7         opts->ClearCancelCallback();
 8         if (status.ok()) {
 9           // DMA can only be used for Tensors that do not fall into
10           // the following three odd edge cases: 1) a zero-size
11           // buffer, 2) a dead tensor which has an uninit value, and
12           // 3) the tensor has the on_host allocation attribute,
13           // i.e. it's in CPU RAM *independent of its assigned
14           // device type*.
15           const bool on_host = send_args.alloc_attrs.on_host();
16           {
17             // Non-DMA cases.
18             if (src_dev->tensorflow_gpu_device_info() && (!on_host)) {
19               DeviceContext* send_dev_context = send_args.device_context;
20               AllocatorAttributes alloc_attrs;
21               alloc_attrs.set_gpu_compatible(true);
22               alloc_attrs.set_on_host(true);
23               Allocator* alloc = src_dev->GetAllocator(alloc_attrs);
24               Tensor* copy = new Tensor(alloc, val.dtype(), val.shape());
25               CHECK(send_dev_context)
26                   << "send dev name: " << src_dev->name()
27                   << " gpu_info: " << src_dev->tensorflow_gpu_device_info();
28               // "val" is on an accelerator device. Uses the device_context to
29               // fill the copy on host.
30               StatusCallback copy_ready = [response, done, copy,
31                                            is_dead](const Status& s) {
32                 // The value is now ready to be returned on the wire.
33                 grpc::EncodeTensorToByteBuffer(is_dead, *copy, response);
34                 done(s);
35                 delete copy;
36               };
37 
38               send_dev_context->CopyDeviceTensorToCPU(
39                   &val, request->rendezvous_key(), src_dev, copy, copy_ready);
40             } else {
41               grpc::EncodeTensorToByteBuffer(is_dead, val, response);
42               done(Status::OK());
43             }
44           }
45         } else {
46           //  !s.ok()
47           done(status);
48         }
49       });

至此,咱們的Rendezvous之gRPC傳輸之旅就圓滿結束了,在閱讀本篇時仍是但願讀者可以在理解結構設計後,對照C++源碼仔細閱讀反覆推敲裏面的每個細節,這樣纔能有更深的理解。

一個須要思考的問題——gRPC傳輸Tensor很低效?

是的,確實很低效。爲何?從設計哲學上說,gRPC自己設計並不適合深度學習訓練場景。從細節上來講它有如下幾個缺陷:

1. gRPC發送Tensor前,接收Tensor後必需要作序列化,在Tensor很大的時候這是一個很是討厭的overhead,發送接收延遲過大;

2. 序列化根本沒有對數據作任何壓縮,這是由於Tensor都是稠密的,因此序列化沒有意義;

3. 不能支持RDMA和GPU Direct。雖然這依賴於硬件,可是gRPC在軟件層面也並無作這些適配。

因此大部分人使用TensorFlow分佈式時都會對性能有很大的抱怨,這裏面很大的緣由和gRPC有關。若是你使用NCCL或者MPI,那麼你會獲得不同的性能。

總結

本篇文章篇幅較長,是Rendezvous機制系列的第二篇,主要梳理了涉及到gRPC傳輸的模塊架構設計和源碼細節,而且詳細梳理了通訊過程。理解TensorFlow跨機傳輸的關鍵在於理解一個事實:真正的通訊過程由Recv方觸發,而不是Send方!Send依然將Ready的Tensor掛入本地Table中,而Recv會向Send端發送gRPC請求查詢所須要的Tensor,而後返回所須要的結果,這個過程雖然有些彆扭,但邏輯上並不稀奇。從結構設計上來講,RemoteRendezvous沿用了Rendezvous接口,而且徹底複用了LocalRendezvousImpl的Send代碼,而Recv因爲涉及到具體的通訊細節和管理機制,則各有各的不一樣。另外,RemoteRendezvous相對LocalRendezvous複雜不少,須要管理器進行管理。最後一大部分是Send和Recv的源碼細節展現,由於不管是客戶端仍是服務端,其調用鏈都比較長,因此以時序圖的形式展現各個類之間的調用關係和協做關係較爲清晰,具體每一個調用的細節建議讀者結合源碼逐一分析,並連同本篇文章一塊兒理解較爲深入。最後,咱們總結了gRPC傳輸Tensor的明顯缺陷,固然這也是爲性能優化開闢了新的空間。

相關文章
相關標籤/搜索