利用dubbo打造真正的nodejs端的微服務體系

Java在微服務方面的生態比較完善,國內常見的有:前端

  • 基於consul的spring生態。nest-cloud就是基於consul來搭建微服務。
  • 阿里巴巴的dubbosofa

基本上國內的Java微服務都是使用以上的生態。而nodejs爲了可以與Java微服務互通,目前尚未很是完善的體系,除了nest-cloud的微服務體系。今天,我主要爲了講解如何在nodejs上經過dubbo打造與Java互通的微服務體系。java

咱們的主角是 dubbo.tsnode

若是不是TS寫的,都很差意思拿出來講。git

TCP通信

全部的微服務基本都基於TCP的長鏈接,咱們要解決的問題主要是如下幾個:github

  • TCP數據發送與接收時候的粘包與拆包問題
  • TCP鏈接的心跳檢測以及重連重試機制
  • 數據傳輸時候的序列化與發序列化算法
  • 使用註冊中心的訂閱機制

TCP傳輸具備很是可靠的安全性,不像UDP傳輸那樣會丟包,因此微服務間的通信基本使用TCP去完成,一旦鏈接,通信速度是很是快的。redis

服務的註冊與發現

通常的,在dubbo中,咱們使用 Zookeeper 來實現服務註冊與發現,可是也有生態是使用Redis來實現的。不過今天咱們就講使用ZK的場景下的微服務。算法

其實ZK就是一個雲端的KEY/VALUE存儲器,同時具有了訂閱通知的功能。這裏推薦使用node-zookeeper-client,這個庫已至關穩定,每週下載量也很多,值得放心使用。spring

使用dubbo.ts建立註冊中心的鏈接是很是簡單的:apache

import { Registry, RegistryInitOptions } from 'dubbo.ts';
const registry = new Registry({
  host: '127.0.0.1:2181' // zk地址
} as RegistryInitOptions);
await registry.connect(); // 鏈接
registry.close(); // 斷開
複製代碼

提供服務

縱觀整個NPM,沒有找出服務提供者的庫。通常來講,使用dubbo的nodejs程序,僅僅只是調用java方服務的,沒法提供基於nodejs的微服務的註冊與被調用。也就是說,經過dubbo.ts咱們能夠像java同樣,在註冊中心註冊咱們的微服務,供nodejs或者java來調用。npm

在dubbo.ts中,咱們能夠這樣來玩:

  1. 建立服務提供者對象
import { 
  Provider, 
  ProviderInitOptions, 
  ProviderServiceChunkInitOptions 
} from 'dubbo.ts';

const provider = new Provider({
  application: 'test',
  dubbo_version: '2.0.2',
  port: 8080,
  pid: process.pid,
  registry: registry,
  heartbeat?: 60000,
} as ProviderInitOptions);
複製代碼
  1. 爲其添加微服務接口定義
class CUATOM_SERVICE {
  xxx() {}
  ddd() {}
}
provider.addService(CUATOM_SERVICE, {
  interface: 'xxx',
  version: 'x.x.x',
  group; 'xxxx',
  methods: ['xxx', 'ddd'],
  timeout: 3000
} as ProviderServiceChunkInitOptions);
// ..,.
複製代碼
  1. 啓動服務自動註冊到中心或者卸載服務
await provider.listen();
await provider.close();
複製代碼

若是你有zk的監控平臺,那麼你能夠在平臺上看到微服務已經被註冊上去了。

消費者

消費者就是用來鏈接微服務,經過方法及參數來得到最終數據的。它經過ZK自動發現服務後鏈接服務,當服務被註銷的時候也自動註銷鏈接。當請求服務的時候有以下規則:

  • 若是服務方法超時,將自動重試。
  • 若是重試的時候,微服務提供者是多個,那麼重試的時候將擇優選擇不一樣的相同微服務接口調用。
  • 若是重試時候,微服務提供者只有一個,那麼重試這個接口N次。
  • 若是重試沒有提供者,將報no prividers錯誤。

建立消費者對象:

import { Consumer } from 'dubbo.ts';
const consumer = new Consumer({
  application: 'dist',
  dubbo_version: '2.0.2',
  pid: process.pid,
  registry: registry,
});
await consumer.listen();
await consumer.close();
複製代碼

鏈接微服務獲取數據

const invoker = await consumer.get('com.mifa.stib.service.ProviderService');
const java = require('js-to-java');
type resultData = {
  name: string,
  age: number,
}
const result = await invoker.invoke<resultData>('testRpc', [
  java.combine('com.mifa.stib.common.RpcData', {
    "name":"gxh",
    "age":"18",
  })
])
複製代碼

經過很簡單的調用,咱們就能得到微服務數據。NPM上全部消費者的設計都大同小異,有的還作了熔斷處理。這裏本架構沒有作處理,用戶能夠根據需求自行完成。

架構成AOP模式

使用TS建立相似java的註解很是方便。無非利用Provider.addService的參數作文章,具體用戶能夠自行設計。這裏我給你們看一個我司的最終使用例子(你也能夠參考@nelts/dubbo的註解設計來完成):

import { provide, inject } from 'injection';
import { rpc } from '@nelts/dubbo';
import { RPC_INPUT_SCHEMA, MIN_PROGRAM_TYPE, error } from '@node/com.stib.utils'; // 私有源上的包,參考時候可忽略功能
import WX from './wx';
import * as ioredis from 'ioredis';

@provide('User')
@rpc.interface('com.mifa.stib.service.User')
@rpc.version('1.0.0')
export default class UserService {
  @inject('wx')
  private wx: WX;

  @inject('redis')
  private redis: ioredis.Redis;

  @rpc.method
  @rpc.middleware(OutputConsole)
  login(req: RPC_INPUT_SCHEMA) {
    switch (req.headers.platform) {
      case MIN_PROGRAM_TYPE.WX:
        if (req.data.code) return this.wx.codeSession(req.data.code);
        return this.wx.jsLogin(req.data, req.headers.appName);
      case MIN_PROGRAM_TYPE.WX_SDK: return this.wx.sdkLogin(req.data.code, req.headers.appName);
      default: throw error('不支持的登陸類型');
    }
  }

  @rpc.method
  async status(req: RPC_INPUT_SCHEMA) {
    if (!req.headers.userToken) throw error('401 Not logined', 401);
    const value = await this.redis.get(req.headers.userToken);
    if (!value) throw error('401 Not logined', 401);
    const user = await this.redis.hgetall(value);
    if (!value) throw error('401 Not logined', 401);
    user.sex = Number(user.sex);
    user.id = undefined;
    user.create_time = undefined;
    user.modify_time = undefined;
    user.unionid = undefined;
    return user;
  }
}

async function OutputConsole(ctx, next) {
  console.log('in middleware');
  await next()
}
複製代碼

我推薦使用midway.jsinjection模塊來設計服務間的IOC模型,這樣更有利於開發與維護。

Swagger

通常來講,在java中,swagger是創建在微服務上的,spring的一整套swagger都是純HTTP的。我參考了dubbo的swagger方式,以爲單服務單swagger的模式並不有利於開發者查閱,因此,咱們約定了一種分佈式swagger模式。在duboo.ts已內置。

微服務swagger方法,採用zookeeper自管理方案。經過微服務啓動,收集interface與method信息上報到自定義zookeeper節點來完成數據上報。前端服務,能夠經過讀取這個節點信息來得到具體的接口與方法。

上報格式:

/swagger/{subject}/{interface}/exports/{base64 data}
複製代碼

url參數:

  • subject 總項目命名節點名
  • interface 接口名
  • base64 data 它是一個記錄該接口下方法和參數的數組(最終base64化),見如下參數格式。

base64 data 參數詳解:

type Base64DataType = {
  description?: string, // 該接口的描述
  group: string, // 組名 若是沒有組,請使用字符串`-`
  version: string, // 版本名 若是沒有版本,請使用字符串 `0.0.0`
  methods: [
    {
      name: string, // 方法名
      summary?: string, // 方法描述,摘要
      input: Array<{ $class: string, $schema: JSONSCHEMA; }>, // 入參
      output: JSONSCHEMA // 出參
    },
    // ...
  ]
}
複製代碼

最終將數據base64後再進行encodeURIComponent操做,最後插入zookeeper的節點便可。

在Provider程序中,咱們能夠這樣使用來發布到zookeeper:

import { SwaggerProvider, Provider } from 'dubbo.ts';
const swagger = new SwaggerProvider('subject name', provider as Provider);
await swagger.publish(); // 發佈
await swagger.unPublish(); // 卸載
複製代碼

使用SwaggerConsumer調用分佈式swgger後獲得的數據。

import { SwaggerConsumer, Registry } from 'dubbo.ts';
const swgger = new SwaggerConsumer('subject name', registry as Registry);
const resultTree = await swgger.get();
複製代碼

咱們來看一個基於@nelts/dubbo的實例,在具體微服務的service上,咱們能夠這樣寫

import { provide, inject } from 'injection';
import { rpc } from '@nelts/dubbo';
import { RPC_INPUT_SCHEMA, MIN_PROGRAM_TYPE, error, RpcRequestParameter, RpcResponseParameter } from '@node/com.stib.utils';
import WX from './wx';
import * as ioredis from 'ioredis';
import Relations from './relations';
import { tableName as WxTableName } from '../tables/stib.user.wx';

@provide('User')
@rpc.interface('com.mifa.stib.service.UserService')
@rpc.version('1.0.0')
@rpc.description('用戶中心服務接口')
export default class UserService {
  @inject('wx')
  private wx: WX;

  @inject('redis')
  private redis: ioredis.Redis;

  @inject('relation')
  private rel: Relations;

  @rpc.method
  @rpc.summay('用戶統一登陸')
  @rpc.parameters(RpcRequestParameter({
    type: 'object',
    properties: {
      code: {
        type: 'string'
      }
    }
  }))
  @rpc.response(RpcResponseParameter({ type: 'string' }))
  login(req: RPC_INPUT_SCHEMA) {
    switch (req.headers.platform) {
      case MIN_PROGRAM_TYPE.WX:
        if (req.data.code) return this.wx.codeSession(req.data.code);
        return this.wx.jsLogin(req.data, req.headers.appName);
      case MIN_PROGRAM_TYPE.WX_SDK: return this.wx.sdkLogin(req.data.code, req.headers.appName);
      default: throw error('不支持的登陸類型');
    }
  }

  @rpc.method
  @rpc.parameters(RpcRequestParameter())
  @rpc.summay('獲取當前用戶狀態')
  async status(req: RPC_INPUT_SCHEMA) {
    if (!req.headers.userToken) throw error('401 Not logined', 401);
    const rid = await this.redis.get(req.headers.userToken);
    if (!rid) throw error('401 Not logined', 401);
    const user = await this.getUserDetailInfoByRelationId(Number(rid)).catch(e => Promise.reject(error('401 Not logined', 401)));
    user.sex = Number(user.sex);
    Reflect.deleteProperty(user, 'id');
    Reflect.deleteProperty(user, 'create_time');
    Reflect.deleteProperty(user, 'modify_time');
    Reflect.deleteProperty(user, 'unionid');
    return user;
  }

  @rpc.method
  @rpc.summay('獲取某個用戶詳細信息')
  @rpc.parameters(RpcRequestParameter({
    type: 'object',
    properties: {
      rid: {
        type: 'integer'
      }
    }
  }))
  async getUserDetailInfo(req: RPC_INPUT_SCHEMA) {
    return await this.getUserDetailInfoByRelationId(req.data.rid as number);
  }

  async getUserDetailInfoByRelationId(sid: number) {
    const relations: {
      f: string,
      p: string,
      s: string,
    } = await this.rel.get(sid);
    switch (relations.f) {
      case WxTableName: return await this.wx.getUserinfo(relations.f, Number(relations.s));
    }
  }
}
複製代碼

這種Swagger模式稱爲分佈式swagger,它的優點在於,若是使用同一個zk註冊中心,那麼不管服務部署在那臺服務器,均可以將swagger聚合在一塊兒處理。

最後

不管是java調用nodejs的微服務仍是nodejs調用java的微服務都是很是方便的。本文主要爲了講解如何在nodejs上打造一整套基於dubbo的微服務體系。該體系已在我司內部新項目中使用,很穩定。喜歡的朋友,能夠具體參看 dubbo.ts 的文檔,瞭解更多api的使用。但願dubbo.ts能在實際的業務場景中幫到您。感謝!

相關文章
相關標籤/搜索