gRPC實戰--如何在NodeJS中有效使用gRPC流

gRPC實戰包含一系列文章,包括原創和翻譯。最終會造成一個完整的系列,後續會不斷完善,增長新的內容:node

=============================================================segmentfault

g10.png

本文將說明如何在NodeJS應用程序的GRPC中使用流。服務器

什麼是gRPC中的流

gRPC中的流可幫助咱們在單個RPC調用中發送消息流。併發

g11.png

gRPC 的流式,分爲三種類型:負載均衡

  • server-side streaming RPC:服務器端流式 RPC
  • Client-side streaming RPC:客戶端流式 RPC
  • Bidirectional streaming RPC:雙向流式 RPC

gRPC中的流使用場景

  • 大規模數據包
  • 實時場景

在本文中,咱們將重點關注如下流:ide

  • Server Streaming GRPC:在這種狀況下,客戶端向服務器發出單個請求,服務器將消息流發送回客戶端。
  • Client Streaming GRPC:在這種狀況下,客戶端將消息流發送到服務器。而後,服務器處理流並將單個響應發送回客戶端。

Server Streaming gRPC

如今讓咱們爲服務器流gRPC建立服務器和客戶端代碼。函數

建立 .proto 文件

建立一個名爲proto的文件夾。在該文件夾中建立一個名爲employee.proto的文件。將如下內容複製到employee.proto中:post

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

請參閱個人grpc基礎文章,以瞭解有關.proto文件和協議緩衝區的更多信息。ui

在這裏,咱們建立一個名爲paySalary的rpc,它接受EmployeeRequest做爲請求併發送EmployeeResponse流做爲響應。咱們使用關鍵字流來指示服務器將發送消息流。spa

上面也定義了EmployeeRequestEmployeeResponse。 repeate關鍵字表示將發送數據列表。

在此示例中,paySalary的請求將是員工ID的列表。服務器將經過一條消息流作出響應,告知是否已向員工支付薪水。

爲服務器建立虛擬數據

建立一個名爲data.js的文件,並將如下代碼複製到其中。

//Hardcode some data for employees
let employees = [{
    id: 1,
    email: "abcd@abcd.com",
    firstName: "First1",
    lastName: "Last1"   
},
{
    id: 2,
    email: "xyz@xyz.com",
    firstName: "First2",
    lastName: "Last2"   
},
{
    id: 3,
    email: "temp@temp.com",
    firstName: "First3",
    lastName: "Last3"   
},
];

exports.employees = employees;

咱們將其用做服務器的數據源。

建立Server

建立一個名爲server.js的文件。將如下代碼複製到server.js中

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

接下來,將如下代碼片斷添加到server.js中

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的腳本中,咱們將啓動GRPC Server並將Employee Service和paySalary實現一塊兒添加到其中。

可是paySalary函數在pay_salary.js文件中定義。

讓咱們建立一個pay_salary.js文件。

將如下腳本添加到pay_salary.js文件中

let { employees } = require('./data.js');
const _ = require('lodash');

function paySalary(call) {
    let employeeIdList = call.request.employeeIdList;
  
    _.each(employeeIdList, function (employeeId) {
      let employee = _.find(employees, { id: employeeId });
      if (employee != null) {
        let responseMessage = "Salary paid for ".concat(
          employee.firstName,
          ", ",
          employee.lastName);
        call.write({ message: responseMessage });
      }
      else{
        call.write({message: "Employee with Id " + employeeId + " not found in record"});
      }
  
    });
    call.end();
  
}
exports.paySalary = paySalary;

paySalary函數將調用做爲輸入。 call.request將包含客戶端發送的請求。

call.request.employeeIdList將包含客戶端發送的員工ID的列表。

而後,咱們遍歷EmployeeId,併爲每一個員工ID進行一些處理。

對於每一個員工ID,咱們最後都調用call.write函數。 call.write將在流中將單個消息寫回到客戶端。

在這種狀況下,對於每位員工,call.write都會發回工資是否已經支付。

處理完全部員工編號後,咱們將調用call.end函數。 call.end指示流已完成。

這是最終的server.js文件

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition)

let { paySalary } = require('./pay_salary.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

建立Client

建立一個名爲client_grpc_server_stream.js的文件。將如下代碼複製到文件中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

接下來,將如下腳本片斷添加到客戶端。

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

client變量將具備存根,這將有助於咱們在服務器中調用該函數。

employeeIdList是提供給服務器的輸入。

let call = client.paySalary({employeeIdList: employeeIdList}); 腳本調用服務器中的paySalary函數,並將employeeIdList做爲請求傳遞。因爲服務器將要發送消息流,所以調用對象將幫助咱們偵聽流事件。

咱們會偵聽呼叫對象中的「數據」事件,以查看流中來自服務器的任何消息。以下面的腳本所示。

call.on('data',function(response){
    console.log(response.message);
  });

在這裏,只要咱們從服務器收到任何消息,咱們就只打印響應消息。

咱們在調用對象中偵聽「結束」事件,以瞭解服務器流什麼時候結束。以下面的腳本所示。

call.on('end',function(){
    console.log('All Salaries have been paid');
  });

在此流結束時,咱們正在打印「已支付全部薪水」。

這是client_gprc_server_stream.js的完整代碼。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');

let packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {keepCase: true,
     longs: String,
     enums: String,
     defaults: true,
     oneofs: true
    });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
                                       grpc.credentials.createInsecure());
                                       
  let employeeIdList = [1,10,2];
  let call = client.paySalary({employeeIdList: employeeIdList});

  call.on('data',function(response){
    console.log(response.message);
  });

  call.on('end',function(){
    console.log('All Salaries have been paid');
  });

}

main();

運行代碼

打開命令提示符,而後使用如下腳本啓動服務器。

node server.js

打開一個新的命令提示符,並使用如下腳本運行客戶端。

node client_grpc_server_stream.js

在運行客戶端時,咱們將得到如下輸出。

Salary paid for First1, Last1
Employee with Id 10 not found in record
Salary paid for First2, Last2
All Salaries have been paid

在這種狀況下,客戶端已向服務器發送了3個Id的1,10,2。服務器一一處理ID,而後將消息流發送給客戶端。流中的全部消息完成後,將顯示消息「已支付全部薪水」。

Client Streaming GRPC

如今,讓咱們爲客戶端流GRPC建立服務器和客戶端代碼。

建立.proto文件

在先前建立的employee.proto文件中,添加如下內容

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

在這裏,咱們添加了一個名爲generateReport的新rpc,它接受ReportEmployeeRequest流做爲請求並返回ReportEmployeeResponse做爲響應。

所以,向rpc輸入的內容是員工ID的流,服務器的響應將是單個響應,其中指出生成了多少報告以及有多少報告失敗。

這是咱們更改後的完整的employee.proto文件:

syntax = "proto3";

package employee;

service Employee {

  rpc paySalary (EmployeeRequest) returns (stream EmployeeResponse) {}

  rpc generateReport (stream ReportEmployeeRequest) returns (ReportEmployeeResponse) {}
}


message EmployeeRequest {
  repeated int32 employeeIdList = 1;
}

message EmployeeResponse{
  string message = 1;
}

message ReportEmployeeRequest {
  int32 id = 1;
}

message ReportEmployeeResponse{
  string successfulReports = 1;
  string failedReports = 2;
}

建立Server

這是添加了新rpc的完整server.js代碼:

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');


let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;


let { paySalary } = require('./pay_salary.js');
let { generateReport } = require('./generate_report.js');

function main() {
  let server = new grpc.Server();
  server.addService(employee_proto.Employee.service, 
    { paySalary: paySalary ,
      generateReport: generateReport }
  );
  server.bind('0.0.0.0:4500', grpc.ServerCredentials.createInsecure());
  server.start();
}

main();

在上面的腳本中,咱們能夠看到咱們還向grpc服務器添加了generateReport函數。咱們還能夠看到generateReport函數來自generate_report.js文件。

建立一個名爲generate_report.js的文件。

將如下腳本添加到文件中:

let { employees } = require('./data.js');
const _ = require('lodash');

function generateReport(call, callback){

    let successfulReports = [];
    let failedReports = [];
    call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });
    call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })
}

exports.generateReport = generateReport;

generateReport函數接受兩個輸入,即調用和回調

爲了從客戶端獲取消息流,咱們須要在調用對象中監聽數據事件。這是在如下腳本中完成的。

call.on('data',function(employeeStream){
        let employeeId = employeeStream.id;
        let employee = _.find(employees, { id: employeeId });
        if (employee != null) {
          successfulReports.push(employee.firstName);
        }
      else{
          failedReports.push(employeeId);
      }

    });

來自客戶端的每條消息都會調用data事件。該消息存在於employeeStream變量中。收到消息後,咱們嘗試生成報告,並肯定報告是成功仍是失敗。

調用對象上的結束事件表示客戶端流已結束。如下代碼顯示瞭如何監聽結束事件。

call.on('end',function(){
        callback(null,{
            successfulReports: successfulReports.join(),
            failedReports: failedReports.join()
        })
    })

在這種狀況下,當結束事件發生時,咱們將全部成功和失敗報告組合到一個響應對象中,並使用回調對象將其發送回客戶端。

建立Client

建立一個名爲client_grpc_client_stream.js的文件。將如下腳本添加到其中。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

上面的腳本具備與服務器代碼相同的功能。

將如下腳本也添加到client_grpc_client_stream.js

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

讓咱們看看上面的腳本在作什麼。

let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

在腳本的這一部分中,咱們正在建立一個調用對象並調用generateReport函數。一樣在generateReport函數內部,咱們指示客戶端一旦收到服務器的響應,應該怎麼作。在這種狀況下,咱們將打印服務器發送回的成功和失敗報告。

let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

在腳本的以上部分中,咱們遍歷了員工ID,並將消息流發送到服務器。咱們使用call.write將消息以流的形式發送到服務器。

最後,一旦咱們在流中發送了全部消息,就可使用call.end函數指示流已完成,以下所示:

call.end();

下面給出了client_grpc_client_stream的完整代碼。

const PROTO_PATH = __dirname + '/proto/employee.proto';

const grpc = require('grpc');
const protoLoader = require('@grpc/proto-loader');
const _ = require('lodash');

let packageDefinition = protoLoader.loadSync(
  PROTO_PATH,
  {
    keepCase: true,
    longs: String,
    enums: String,
    defaults: true,
    oneofs: true
  });
let employee_proto = grpc.loadPackageDefinition(packageDefinition).employee;

function main() {
  let client = new employee_proto.Employee('localhost:4500',
    grpc.credentials.createInsecure());

  let call = client.generateReport(function (error, response) {
    console.log("Reports successfully generated for: ", response.successfulReports);
    console.log("Reports failed since Following Employee Id's do not exist: ", response.failedReports);
  });

  let employeeIdList = [1, 10, 2];
  _.each(employeeIdList, function (employeeId) {
        call.write({ id: employeeId });
  })

  call.end();
}

main();

運行代碼

打開命令提示符,而後使用如下腳本啓動服務器。

node server.js

打開一個新的命令提示符,並使用如下腳本運行客戶端。

node client_grpc_server_stream.js

在運行客戶端時,咱們將得到如下輸出。

Reports successfully generated for:  First1,First2
Reports failed since Following Employee Id\'s do not exist:  10

在這種狀況下,客戶端已向服務器發送了3個Id的1,10,2做爲消息流。而後,服務器處理流中的消息,並將單個響應發送回客戶端,以顯示成功的報告數量和失敗的報告數量。

相關文章
相關標籤/搜索