gRPC實戰包含一系列文章,包括原創和翻譯。最終會造成一個完整的系列,後續會不斷完善,增長新的內容:node
=============================================================segmentfault
本文將說明如何在NodeJS應用程序的GRPC中使用流。服務器
gRPC中的流可幫助咱們在單個RPC調用中發送消息流。併發
gRPC 的流式,分爲三種類型:負載均衡
在本文中,咱們將重點關注如下流:ide
如今讓咱們爲服務器流gRPC建立服務器和客戶端代碼。函數
建立一個名爲proto的文件夾。在該文件夾中建立一個名爲employee.proto的文件。將如下內容複製到employee.proto中:post
syntax = "proto3"; package employee; service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} } message EmployeeRequest { repeated int32 employeeIdList = 1; } message EmployeeResponse{ string message = 1; }
請參閱個人grpc基礎文章,以瞭解有關.proto文件和協議緩衝區的更多信息。ui
在這裏,咱們建立一個名爲paySalary的rpc,它接受EmployeeRequest做爲請求併發送EmployeeResponse流做爲響應。咱們使用關鍵字流來指示服務器將發送消息流。spa
上面也定義了EmployeeRequest和EmployeeResponse。 repeate關鍵字表示將發送數據列表。
在此示例中,paySalary的請求將是員工ID的列表。服務器將經過一條消息流作出響應,告知是否已向員工支付薪水。
建立一個名爲data.js的文件,並將如下代碼複製到其中。
//Hardcode some data for employees let employees = [{ id: 1, email: "abcd@abcd.com", firstName: "First1", lastName: "Last1" }, { id: 2, email: "xyz@xyz.com", firstName: "First2", lastName: "Last2" }, { id: 3, email: "temp@temp.com", firstName: "First3", lastName: "Last3" }, ]; exports.employees = employees;
咱們將其用做服務器的數據源。
建立一個名爲server.js的文件。將如下代碼複製到server.js中
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition)
接下來,將如下代碼片斷添加到server.js中
let { paySalary } = require('./pay_salary.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
在上面的腳本中,咱們將啓動GRPC Server並將Employee Service和paySalary實現一塊兒添加到其中。
可是paySalary函數在pay_salary.js文件中定義。
讓咱們建立一個pay_salary.js文件。
將如下腳本添加到pay_salary.js文件中
let { employees } = require('./data.js'); const _ = require('lodash'); function paySalary(call) { let employeeIdList = call.request.employeeIdList; _.each(employeeIdList, function (employeeId) { let employee = _.find(employees, { id: employeeId }); if (employee != null) { let responseMessage = "Salary paid for ".concat( employee.firstName, ", ", employee.lastName); call.write({ message: responseMessage }); } else{ call.write({message: "Employee with Id " + employeeId + " not found in record"}); } }); call.end(); } exports.paySalary = paySalary;
paySalary函數將調用做爲輸入。 call.request將包含客戶端發送的請求。
call.request.employeeIdList將包含客戶端發送的員工ID的列表。
而後,咱們遍歷EmployeeId,併爲每一個員工ID進行一些處理。
對於每一個員工ID,咱們最後都調用call.write函數。 call.write將在流中將單個消息寫回到客戶端。
在這種狀況下,對於每位員工,call.write都會發回工資是否已經支付。
處理完全部員工編號後,咱們將調用call.end函數。 call.end指示流已完成。
這是最終的server.js文件
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition) let { paySalary } = require('./pay_salary.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
建立一個名爲client_grpc_server_stream.js的文件。將如下代碼複製到文件中。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
接下來,將如下腳本片斷添加到客戶端。
function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let employeeIdList = [1,10,2]; let call = client.paySalary({employeeIdList: employeeIdList}); call.on('data',function(response){ console.log(response.message); }); call.on('end',function(){ console.log('All Salaries have been paid'); }); } main();
client變量將具備存根,這將有助於咱們在服務器中調用該函數。
employeeIdList是提供給服務器的輸入。
let call = client.paySalary({employeeIdList: employeeIdList});
腳本調用服務器中的paySalary函數,並將employeeIdList做爲請求傳遞。因爲服務器將要發送消息流,所以調用對象將幫助咱們偵聽流事件。
咱們會偵聽呼叫對象中的「數據」事件,以查看流中來自服務器的任何消息。以下面的腳本所示。
call.on('data',function(response){ console.log(response.message); });
在這裏,只要咱們從服務器收到任何消息,咱們就只打印響應消息。
咱們在調用對象中偵聽「結束」事件,以瞭解服務器流什麼時候結束。以下面的腳本所示。
call.on('end',function(){ console.log('All Salaries have been paid'); });
在此流結束時,咱們正在打印「已支付全部薪水」。
這是client_gprc_server_stream.js的完整代碼。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let employeeIdList = [1,10,2]; let call = client.paySalary({employeeIdList: employeeIdList}); call.on('data',function(response){ console.log(response.message); }); call.on('end',function(){ console.log('All Salaries have been paid'); }); } main();
打開命令提示符,而後使用如下腳本啓動服務器。
node server.js
打開一個新的命令提示符,並使用如下腳本運行客戶端。
node client_grpc_server_stream.js
在運行客戶端時,咱們將得到如下輸出。
Salary paid for First1, Last1 Employee with Id 10 not found in record Salary paid for First2, Last2 All Salaries have been paid
在這種狀況下,客戶端已向服務器發送了3個Id的1,10,2。服務器一一處理ID,而後將消息流發送給客戶端。流中的全部消息完成後,將顯示消息「已支付全部薪水」。
如今,讓咱們爲客戶端流GRPC建立服務器和客戶端代碼。
在先前建立的employee.proto文件中,添加如下內容
service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {} } message ReportEmployeeRequest { int32 id = 1; } message ReportEmployeeResponse{ string successfulReports = 1; string failedReports = 2; }
在這裏,咱們添加了一個名爲generateReport的新rpc,它接受ReportEmployeeRequest流做爲請求並返回ReportEmployeeResponse做爲響應。
所以,向rpc輸入的內容是員工ID的流,服務器的響應將是單個響應,其中指出生成了多少報告以及有多少報告失敗。
這是咱們更改後的完整的employee.proto文件:
syntax = "proto3"; package employee; service Employee { rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {} rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {} } message EmployeeRequest { repeated int32 employeeIdList = 1; } message EmployeeResponse{ string message = 1; } message ReportEmployeeRequest { int32 id = 1; } message ReportEmployeeResponse{ string successfulReports = 1; string failedReports = 2; }
這是添加了新rpc的完整server.js代碼:
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; let { paySalary } = require('./pay_salary.js'); let { generateReport } = require('./generate_report.js'); function main() { let server = new grpc.Server(); server.addService(employee_proto.Employee.service, { paySalary: paySalary , generateReport: generateReport } ); server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure()); server.start(); } main();
在上面的腳本中,咱們能夠看到咱們還向grpc服務器添加了generateReport函數。咱們還能夠看到generateReport函數來自generate_report.js文件。
建立一個名爲generate_report.js的文件。
將如下腳本添加到文件中:
let { employees } = require('./data.js'); const _ = require('lodash'); function generateReport(call, callback){ let successfulReports = []; let failedReports = []; call.on('data',function(employeeStream){ let employeeId = employeeStream.id; let employee = _.find(employees, { id: employeeId }); if (employee != null) { successfulReports.push(employee.firstName); } else{ failedReports.push(employeeId); } }); call.on('end',function(){ callback(null,{ successfulReports: successfulReports.join(), failedReports: failedReports.join() }) }) } exports.generateReport = generateReport;
generateReport函數接受兩個輸入,即調用和回調
爲了從客戶端獲取消息流,咱們須要在調用對象中監聽數據事件。這是在如下腳本中完成的。
call.on('data',function(employeeStream){ let employeeId = employeeStream.id; let employee = _.find(employees, { id: employeeId }); if (employee != null) { successfulReports.push(employee.firstName); } else{ failedReports.push(employeeId); } });
來自客戶端的每條消息都會調用data事件。該消息存在於employeeStream變量中。收到消息後,咱們嘗試生成報告,並肯定報告是成功仍是失敗。
調用對象上的結束事件表示客戶端流已結束。如下代碼顯示瞭如何監聽結束事件。
call.on('end',function(){ callback(null,{ successfulReports: successfulReports.join(), failedReports: failedReports.join() }) })
在這種狀況下,當結束事件發生時,咱們將全部成功和失敗報告組合到一個響應對象中,並使用回調對象將其發送回客戶端。
建立一個名爲client_grpc_client_stream.js的文件。將如下腳本添加到其中。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); const _ = require('lodash'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;
上面的腳本具備與服務器代碼相同的功能。
將如下腳本也添加到client_grpc_client_stream.js。
function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); }); let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); }) call.end(); } main();
讓咱們看看上面的腳本在作什麼。
let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); });
在腳本的這一部分中,咱們正在建立一個調用對象並調用generateReport函數。一樣在generateReport函數內部,咱們指示客戶端一旦收到服務器的響應,應該怎麼作。在這種狀況下,咱們將打印服務器發送回的成功和失敗報告。
let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); })
在腳本的以上部分中,咱們遍歷了員工ID,並將消息流發送到服務器。咱們使用call.write將消息以流的形式發送到服務器。
最後,一旦咱們在流中發送了全部消息,就可使用call.end函數指示流已完成,以下所示:
call.end();
下面給出了client_grpc_client_stream的完整代碼。
const PROTO_PATH = __dirname + '/proto/employee.proto'; const grpc = require('grpc'); const protoLoader = require('@grpc/proto-loader'); const _ = require('lodash'); let packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee; function main() { let client = new employee_proto.Employee('localhost:4500', grpc.credentials.createInsecure()); let call = client.generateReport(function (error, response) { console.log("Reports successfully generated for: ", response.successfulReports); console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports); }); let employeeIdList = [1, 10, 2]; _.each(employeeIdList, function (employeeId) { call.write({ id: employeeId }); }) call.end(); } main();
打開命令提示符,而後使用如下腳本啓動服務器。
node server.js
打開一個新的命令提示符,並使用如下腳本運行客戶端。
node client_grpc_server_stream.js
在運行客戶端時,咱們將得到如下輸出。
Reports successfully generated for: First1,First2 Reports failed since Following Employee Id\'s do not exist: 10
在這種狀況下,客戶端已向服務器發送了3個Id的1,10,2做爲消息流。而後,服務器處理流中的消息,並將單個響應發送回客戶端,以顯示成功的報告數量和失敗的報告數量。