貼一張掛在官網的圖片:https://grpc.io/docs/what-is-... javascript
能夠理解 gRPC 是 RPC(遠程過程調用)框架的一種實現,關於 RPC 的介紹由於並非本次的主題,因此放個連接來幫助你們理解:https://www.zhihu.com/questio... java
我所理解 RPC 整個執行的過程就是 Client
調用方法 -> 序列化請求參數 -> 傳輸數據 -> 反序列化請求參數 -> Server 處理請求 -> 序列化返回數據 -> 傳輸數據 -> Client
接收到方法返回值:node
其主要邏輯會集中在 數據的序列化/反序列化 以及 數據的傳輸上,而這兩項 gRPC 分別選用了 Protocol Buffers 和 HTTP2 來做爲默認選項。c++
gRPC 在 Node.js 的實現上一共有兩個官方版本,一個是基於 c++ addon 的版本,另外一個是純 JS 實現的版本。git
除了上邊提到的兩個 gRPC 的實現,在 Node.js 中還存在一些其餘的模塊用來輔助使用 gRPC。github
此次筆記主要是針對 grpc-node 方式的實現,在 c++ addon 模塊的實現下,並非一個 gRPC 的完整實現,作的事情更多的是一個銜接的工做,經過 JS、c++ 兩層封裝將 c++ 版本的 gRPC 能力暴露出來供用戶使用。 web
之因此選擇它是由於以爲邏輯會較 grpc-js 清晰一些,更適合理解 gRPC 總體的運行邏輯。 c#
在項目倉庫中,兩個目錄下是咱們須要關注的:服務器
ext 中的代碼主要用於調用 c++ 版本 gRPC 的接口,並經過 NAN 提供 c++ addon 模塊。
src 中的代碼則是調用了 ext 編譯後的模塊,並進行一層應用上的封裝。
而做爲使用 gRPC 的用戶就是引用的 src 下的文件了。 併發
咱們先經過官方的 hello world 示例來講明咱們是如何使用 gRPC 的,由於 gRPC 默認的數據序列化方式採用的 protobuf,因此首先咱們須要有一個 proto 文件,而後經過 gRPC 提供的文件來生成對應的代碼,生成出來的文件包含了 proto 中所定義的 service、method、message 等各類結構的定義,並可以讓咱們用比較熟悉的方式去使用。
示例中的 proto 文件:
package helloworld; // The greeting service definition. service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {} } // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
grpc_tools 是用來生成 proto 對應代碼的,這個命令行工具提供了多種語言的生成版本。
在 Node 中,會生成兩個文件,通常命名規則爲 xxx_pb.js
、xxx_grpc_pb.js
,xxx_pb.js
是 proto 中各類 service、method 以及 message 的結構描述及如何使用的接口定義,而 xxx_grpc_pb.js
主要則是針對 xxx_pb.js
的一個整合,按照 proto 文件中定義的結構生成對應的代碼,在用戶使用的時候,使用前者多半用於構造消息結構,使用後者則是方法的調用。
生成後的關鍵代碼(XXX_grpc_pb.js):
const grpc = require('@grpc/grpc'); const helloworld_pb = require('./helloworld_pb.js'); function serialize_helloworld_HelloReply(arg) { if (!(arg instanceof helloworld_pb.HelloReply)) { throw new Error('Expected argument of type helloworld.HelloReply'); } return Buffer.from(arg.serializeBinary()); } function deserialize_helloworld_HelloReply(buffer_arg) { return helloworld_pb.HelloReply.deserializeBinary(new Uint8Array(buffer_arg)); } function serialize_helloworld_HelloRequest(arg) { if (!(arg instanceof helloworld_pb.HelloRequest)) { throw new Error('Expected argument of type helloworld.HelloRequest'); } return Buffer.from(arg.serializeBinary()); } function deserialize_helloworld_HelloRequest(buffer_arg) { return helloworld_pb.HelloRequest.deserializeBinary(new Uint8Array(buffer_arg)); } // The greeting service definition. const GreeterService = exports.GreeterService = { // Sends a greeting sayHello: { path: '/helloworld.Greeter/SayHello', requestStream: false, responseStream: false, requestType: helloworld_pb.HelloRequest, responseType: helloworld_pb.HelloReply, requestSerialize: serialize_helloworld_HelloRequest, requestDeserialize: deserialize_helloworld_HelloRequest, responseSerialize: serialize_helloworld_HelloReply, responseDeserialize: deserialize_helloworld_HelloReply, }, }; exports.GreeterClient = grpc.makeGenericClientConstructor(GreeterService);
最終導出的 sayHello
就是咱們在 proto 文件中定義的 SayHello
方法,因此咱們在做爲 Client
的時候使用,就是很簡單的調用 sayHello
就好了:
const messages = require('./helloworld_pb'); const services = require('./helloworld_grpc_pb'); const grpc = require('grpc'); const client = new services.GreeterClient( target, grpc.credentials.createInsecure() ); const request = new messages.HelloRequest(); request.setName('Niko'); client.sayHello(request, function(err, response) { console.log('Greeting:', response.getMessage()); });
其實真實寫的代碼也就上邊的幾行,實例化了一個 Client
,實例化一個 Message
並構建數據,而後經過 client
調用對應的 method
傳入 message
,就完成了一個 gRPC 請求的發送。
在這個過程當中,咱們直接可見的用到了 grpc-node
的 credentials
以及 makeGenericClientConstructor
,咱們就拿這兩個做爲入口,首先從 makeGenericClientConstructor
來講。
在翻看 index.js 文件中能夠發現, makeGenericClientConstructor
實際上是 client.makeClientConstructor
的一個別名,因此咱們須要去查看 src/client.js 中對應函數的定義,就像函數名同樣,它是用來生成一個 Client 的構造函數的,這個構造函數就是咱們在上邊示例中的 GreeterClient
。
源碼所在位置: https://github.com/grpc/grpc-...
當對照着 xxx_grpc_pb.js
與源碼來看時,會發現調用函數只傳入了一個參數,而函數定義卻存在三個參數,這個實際上是歷史緣由致使的,咱們能夠直接忽略後邊的兩個參數。
精簡後的源碼:
exports.makeClientConstructor = function(methods) { function ServiceClient(address, credentials, options) { Client.call(this, address, credentials, options); } util.inherits(ServiceClient, Client); ServiceClient.prototype.$method_definitions = methods; ServiceClient.prototype.$method_names = {}; Object.keys(methods).forEach(name => { const attrs = methods[name]; if (name.indexOf('$') === 0) { throw new Error('Method names cannot start with $'); } var method_type = common.getMethodType(attrs); var method_func = function() { return requester_funcs[method_type].apply(this, [ attrs.path, attrs.requestSerialize, attrs.responseDeserialize ] .concat([].slice.call(arguments)) ); }; ServiceClient.prototype[name] = method_func; ServiceClient.prototype.$method_names[attrs.path] = name; // Associate all provided attributes with the method Object.assign(ServiceClient.prototype[name], attrs); if (attrs.originalName) { ServiceClient.prototype[attrs.originalName] = ServiceClient.prototype[name]; } }); ServiceClient.service = methods; return ServiceClient; };
methods
參數就是咱們上邊文件中生成的對象,包括服務地址、是否使用 stream、以及 請求/返回值 的類型及對應的序列化/反序列化 方式。
大體的邏輯就是建立一個繼承自 Client
的子類,而後遍歷咱們整個 service
來看裏邊有多少個 method
,並根據 method
不一樣的傳輸類型來區分使用不一樣的函數進行數據的傳輸,最後以 method
爲 key 放到 Client
子類的原型鏈上。
common.getMethodType
就是用來區分 method
到底是什麼類型的請求的,目前 gRPC
一共分了四種類型,雙向 Stream
、兩個單向 Stream
,以及 Unary
模式:
exports.getMethodType = function(method_definition) { if (method_definition.requestStream) { if (method_definition.responseStream) { return constants.methodTypes.BIDI_STREAMING; } else { return constants.methodTypes.CLIENT_STREAMING; } } else { if (method_definition.responseStream) { return constants.methodTypes.SERVER_STREAMING; } else { return constants.methodTypes.UNARY; } } };
在最後幾行有一處判斷 originalName
是否存在的操做,這個是在 proto-loader 中存在的一個邏輯,將 methodName 轉換成純小寫放了進去,單純看註釋的話,這並非一個長期的解決方案: https://github.com/grpc/grpc-...
P.S. proto-loader 是 JS 裏邊一種動態加載 proto 文件的方式,性能比經過 grpc_tools 預生成代碼的方式要低一些。
全部的請求方式,都被放在了一個叫作 requester_funcs
的對象中,源碼中的定義是這樣的:
var requester_funcs = { [methodTypes.UNARY]: Client.prototype.makeUnaryRequest, [methodTypes.CLIENT_STREAMING]: Client.prototype.makeClientStreamRequest, [methodTypes.SERVER_STREAMING]: Client.prototype.makeServerStreamRequest, [methodTypes.BIDI_STREAMING]: Client.prototype.makeBidiStreamRequest };
從這裏就能夠看出,實際上是和咱們 getMethodType
所對應的四種處理方式。
最終,將繼承自 Client
的子類返回,完成了整個函數的執行。
首先咱們須要看看繼承的 Client
構造函數究竟作了什麼事情。
拋開參數類型的檢查,首先是針對攔截器的處理,咱們能夠經過兩種方式來實現攔截器,一個是提供攔截器的具體函數,這個在全部 method
觸發時都會執行,還有一個能夠經過傳入 interceptor_provider
來實現動態的生成攔截器,函數會在初始化 Client
的時候觸發,並要求返回一個新的 interceptor
對象用於執行攔截器的邏輯。
// interceptors 用法 const interceptor = function(options, nextCall) { console.log('trigger') return new InterceptingCall(nextCall(options)); } const client = new services.GreeterClient( target, grpc.credentials.createInsecure(), { interceptors: [interceptor] } ); // interceptor_providers 用法 const interceptor = function(options, nextCall) { console.log('trigger') return new InterceptingCall(nextCall(options)); } const interceptorProvider = (methodDefinition) => { console.log('call interceptorProvider', methodDefinition) return interceptor } const client = new services.GreeterClient( target, grpc.credentials.createInsecure(), { interceptor_providers: [interceptorProvider] } );
P.S. 須要注意的是,若是傳入 interceptor_providers,則會在兩個地方觸發調用,一個是實例化 Client 的時候,還有一個是在 method 真實調用的時候,每次調用都會觸發,因此若是要複用 interceptor,最好在函數以外構建出函數體
可是這樣的攔截器實際上是沒有太多意義的,咱們不可以針對 metadata
、message
來作本身的修改,若是咱們觀察 InterceptingCall
的具體函數簽名,會發現它支持兩個參數的傳入。
function InterceptingCall(next_call, requester) { this.next_call = next_call; this.requester = requester; }
上邊示例只介紹了第一個參數,這個參數預期接受一個對象,對象會提供多個方法,咱們能夠經過console.log(nextCall(options).constructor.prototype)
來查看都有哪些,例如 sendMessage
、start
之類的。
而觀察這些函數的實現,會發現他們都調用了一個 _callNext
。
InterceptingCall.prototype.sendMessage = function(message) { this._callNext('sendMessage', [message]); }; InterceptingCall.prototype.halfClose = function() { this._callNext('halfClose'); }; InterceptingCall.prototype.cancel = function() { this._callNext('cancel'); }; InterceptingCall.prototype._callNext = function(method_name, args, next) { var args_array = args || []; var next_call = next ? next : this._getNextCall(method_name); if (this.requester && this.requester[method_name]) { // Avoid using expensive `apply` calls var num_args = args_array.length; switch (num_args) { case 0: return this.requester[method_name](next_call); case 1: return this.requester[method_name](args_array[0], next_call); case 2: return this.requester[method_name](args_array[0], args_array[1], next_call); } } else { if (next_call === emptyNext) { throw new Error('Interceptor call chain terminated unexpectedly'); } return next_call(args_array[0], args_array[1]); } };
在 _callNext
方法中,咱們就能夠找到 requester
參數到底是有什麼用了,若是 requester
也有實現對應的 method_name
,那麼就會先執行 requester
的方法,隨後將 next_call
對應的方法做爲調用 requester
方法的最後一個參數傳入。
在 grpc-node 中,攔截器的執行順序與傳入順序有關,是一個隊列,先傳入的攔截器先執行,若是傳入了第二個參數,則先執行第二個參數對應的方法,後執行第一個參數對應的方法。
因此若是咱們想作一些額外的事情,好比說針對 metadata
添加一個咱們想要的字段,那麼就能夠這麼來寫攔截器:
var interceptor = function(options, nextCall) { return new InterceptingCall(nextCall(options), { start: function(metadata, listener, next) { next(metadata, { onReceiveMetadata: function (metadata, next) { metadata.set('xxx', 'xxx') next(metadata); }, }); }, }); };
稍微特殊的地方是,start
函數的next
參數被調用時傳入的第二個參數並非一個InterceptingCall
的實例,而是一個InterceptingListener
的實例,二者都有_callNext
的實現,只不過所提供的方法不徹底同樣罷了。
接下來的代碼邏輯主要是用於建立 Channel
,能夠經過傳遞不一樣的參數來覆蓋 Channel
,也能夠用默認的 Channel
,這個 Channel
對應的 gRPC 中其實就是作數據傳輸的那一個模塊,能夠理解爲 HTTP2 最終是在這裏使用的。
通常不多會去覆蓋默認的 Channel
,因此咱們直接去看 grpc-node 裏邊的 Channel
是如何實現的。
Channel
是 c++ 代碼實現的,代碼的位置: https://github.com/grpc/grpc-...
若是有同窗嘗試過混用 grpc-node
和 grpc-js
,那麼你必定有看到過這個報錯:Channel's second argument (credentials) must be a ChannelCredentials
緣由就在於 Channel
實例化過程當中會進行檢查咱們建立 Channel
傳入的 credential
是不是繼承自 grpc 中的 ChannelCredentials
類。
而 grpc-node
和 grpc-js
用的是兩個不一樣的類,因此混用的話可能會出現這個問題。
而後就是根據傳入的 credential
的不一樣來判斷是否要使用加密,而通常經常使用的 grpc.credentials.createInsecure()
其實就是不走加密的意思了,咱們能夠在 https://github.com/grpc/grpc-... 和 https://github.com/grpc/grpc-... 來看到對應的邏輯。
後邊就是調用 c++ 版本的 grpc 來構建對應的 Channel
了,若是有老鐵看過 c++ 版本是如何建立 grpc Client 的,那麼這些代碼就比較熟悉了: https://github.com/grpc/grpc/...
在 grpc-node
中也是調用的一樣的 API 來建立的。
當 Client
被建立出來後,咱們會調用 Client
上的方法(也就是發請求了),這時候就會觸發到上邊提到的 requester_funcs
其中的一個,咱們先從最簡單的 Unary
來講,這種 Client/Server 都是 Unary
請求方式時會觸發的函數。
咱們經過上邊 method_func
中調用方式能夠肯定傳遞了什麼參數進去,有幾個固定的參數 path、request 序列化方式,以及 response 的反序列化方式。
後邊的參數就是由調用時傳入的動態參數了,這些能夠在 makeUnaryRequest
函數定義中看到,分別是 argument
(也就是 request body)、metadata
(能夠理解爲 header,一些元數據)、options
是一個可選的參數(自定義的攔截器是放在這裏的),能夠用於覆蓋 method 的一些描述信息,以及最後的 callback
就是咱們接收到 response 後應該作的操做了。
整個函數的實現,按長度來講,有一半都是在處理參數,而剩下的部分則作了兩件事,一個是實例化了 ClientUnaryCall
對象,另外一個則是處理攔截器相關的邏輯,並啓動攔截器來發送整個請求。
在 makeUnaryRequest
函數中涉及到攔截器的部分有這麼幾塊 resolveInterceptorProviders
、getLastListener
與getInterceptingCall
。
先來看 ClientUnaryCall
作了什麼事情,在源碼中有這樣的一個代碼塊,是使用該對象的場景:
function ClientUnaryCall(call) { EventEmitter.call(this); this.call = call; } var callProperties = { argument: argument, metadata: metadata, call: new ClientUnaryCall(), channel: this.$channel, methodDefinition: method_definition, callOptions: options, callback: callback }; // 以及後續與攔截器產生了一些關聯 var emitter = callProperties.call; // 這行代碼很詭異,看起來是能夠在實例化的時候傳入的,卻選擇了在這裏覆蓋屬性值 emitter.call = intercepting_call; var last_listener = client_interceptors.getLastListener( methodDefinition, emitter, callProperties.callback );
關於 ClientUnaryCall
的定義也很是簡單,實際上是一個繼承自 EventEmitter
的子類,增長了一個 call
屬性的定義,以及兩個方法封裝調用了 call
屬性對應的一些方法。
強烈懷疑 這部分代碼是後期有過調整,由於 ClientUnaryCall
構造函數的實現中是能夠接受一個參數做爲 call
屬性的賦值的,然而在代碼應用中選擇了後續覆蓋 call
屬性,而非直接在實例化的時候傳入進去
resolveInterceptorProviders
是用來處理用戶傳入的攔截器的,這個函數在 Client
的整個生命週期會有兩處調用,一個是在上邊 Client
實例化的過程當中會觸發一次,再有就是每次 method
被調用以前,會從新觸發該函數。 resolveInterceptorProviders
的邏輯很簡單,就是遍歷咱們傳入的 interceptor_provider
並將對應 method 的信息描述傳入並執行,獲得 provider
返回的 interceptor
用做攔截器。
在 Client
實例化過程當中是會遍歷全部的 method
來執行,而在具體的 method
觸發時則只觸發當前 method
相關的 provider
邏輯。
getLastListener
按照註釋中的描述,是爲了得到一個最後會觸發的監聽者,源碼大體是這樣的:
https://github.com/grpc/grpc-...
var listenerGenerators = { [methodTypes.UNARY]: _getUnaryListener, [methodTypes.CLIENT_STREAMING]: _getClientStreamingListener, [methodTypes.SERVER_STREAMING]: _getServerStreamingListener, [methodTypes.BIDI_STREAMING]: _getBidiStreamingListener }; function getLastListener(method_definition, emitter, callback) { if (emitter instanceof Function) { callback = emitter; callback = function() {}; } if (!(callback instanceof Function)) { callback = function() {}; } if (!((emitter instanceof EventEmitter) && (callback instanceof Function))) { throw new Error('Argument mismatch in getLastListener'); } var method_type = common.getMethodType(method_definition); var generator = listenerGenerators[method_type]; return generator(method_definition, emitter, callback); }
一樣也使用了一個枚舉來區分不一樣的方法類型來調用不一樣的函數來生成對應的 listener。
好比這裏用到的 getUnaryListener
,是這樣的一個邏輯:
function _getUnaryListener(method_definition, emitter, callback) { var resultMessage; return { onReceiveMetadata: function (metadata) { emitter.emit('metadata', metadata); }, onReceiveMessage: function (message) { resultMessage = message; }, onReceiveStatus: function (status) { if (status.code !== constants.status.OK) { var error = common.createStatusError(status); callback(error); } else { callback(null, resultMessage); } emitter.emit('status', status); } }; }
代碼也算比較清晰,在不一樣的階段會觸發不一樣的事件,而後再真正返回結果之後,觸發 callback
來告知用戶請求響應。
也就是咱們在示例中調用 sayHello
時傳入的 callback
被調用的地方了。
getInterceptingCall
函數的調用會返回一個實例,經過操做該實例咱們能夠控制請求的開始、數據的發送以及請求的結束。
咱們上邊 getLastListener
返回的對象觸發的時機也是會在這裏能夠找到的。
從源碼上來看會涉及到這麼幾個函數:
var interceptorGenerators = { [methodTypes.UNARY]: _getUnaryInterceptor, [methodTypes.CLIENT_STREAMING]: _getClientStreamingInterceptor, [methodTypes.SERVER_STREAMING]: _getServerStreamingInterceptor, [methodTypes.BIDI_STREAMING]: _getBidiStreamingInterceptor }; function getInterceptingCall(method_definition, options, interceptors, channel, responder) { var last_interceptor = _getLastInterceptor(method_definition, channel, responder); var all_interceptors = interceptors.concat(last_interceptor); return _buildChain(all_interceptors, options); } function _getLastInterceptor(method_definition, channel, responder) { var callback = (responder instanceof Function) ? responder : function() {}; var emitter = (responder instanceof EventEmitter) ? responder : new EventEmitter(); var method_type = common.getMethodType(method_definition); var generator = interceptorGenerators[method_type]; return generator(method_definition, channel, emitter, callback); } function _buildChain(interceptors, options) { var next = function(interceptors) { if (interceptors.length === 0) { return function (options) {}; } var head_interceptor = interceptors[0]; var rest_interceptors = interceptors.slice(1); return function (options) { return head_interceptor(options, next(rest_interceptors)); }; }; var chain = next(interceptors)(options); return new InterceptingCall(chain); }
_getUnaryInterceptor 因爲篇幅較長,直接貼 GitHub 連接了: https://github.com/grpc/grpc-...
大體的邏輯就是咱們經過 method_definition
、channel
等參數來獲取到一個 interceptor
,並將其拼接到原有的 interceptor
後邊,做爲最後執行的攔截器, _buildChain
函數比較簡單,就是實現了一個鏈式調用的函數,用來按順序執行攔截器。
關於 interceptor 如何使用能夠看咱們介紹 interceptor 用法時寫的 demo
主要的邏輯實際上在 _getUnaryInterceptor
中,咱們會建立一個功能全面的 interceptor
,函數會返回一個匿名函數,就是咱們在上邊代碼中看到的調用 generator
的地方了,而在匿名函數的開頭部門,咱們就調用了 getCall
來獲取一個 call
對象,這個 call
對象就是咱們與 gRPC 服務器之間的通道了,請求最終是由 call
對象負責發送的。
getCall
中實際上調用了 channel
對象的 createCall
方法,這部分的邏輯也是在 c++ 中作的了,包含數據的發送之類的邏輯。
這是咱們回到 makeUnaryRequest
函數,再看函數結束的地方調用的那三個方法,第一個 start,將咱們的 metadata(能夠理解爲 header) 發送了過去,而後將真實的信息發送了過去,最後調用關閉方法。
咱們能夠在 _getUnaryInterceptor
中的 start
、sendMessage
以及 halfClose
函數中都有調用 _startBatchIfReady
函數,而這個方法實際上就是調用的 channel
上的 startBatch
方法,再根據調用鏈查找,最終會看處處理邏輯在這裏:https://github.com/grpc/grpc/...
opType 與 代碼中 switch-case 中的對應關係在這裏: https://github.com/grpc/grpc-...
首先在 start
裏邊主要是發送了 metadata
,而且嘗試接受服務端返回過來的 metadata
,並在回調中觸發咱們傳入的 listener
的 onReceiveMetadata
方法。
而後檢查 response 的狀態是否正確,並觸發 listener
的 onReceiveStatus
方法。
接下來是調用 sendMessage
方法,在這裏咱們將消息體進行序列化,併發送,在回調中就會去調用咱們傳入的 callback。
最後在 halfClose
方法中其實就是發送一個指令來設置請求的結束。
整個的流程細化之後大概是這個樣子的:
上邊總體的記錄就是關於 Client 這一側是如何實現的了。
主要涉及到 Client 的構建、發送請求時作的事情、攔截器的做用。
而更深刻的一些邏輯實際上是在 c++ 版本的 gRPC 庫裏所實現,因此本次筆記並無過多的涉及。
文章涉及到的部分示例代碼倉庫地址: https://github.com/Jiasm/grpc...