教你用node從零搭建一套微服務系統(四)

      通過以前的三節課程,相信你們已經搭建好了微服務的基礎環境,那麼這節課程,筆者會帶領各位改造以前的代碼,完成爲服務間的通訊。此次採用一個api網關,加上三個微服務節點。

api-rest( git: https://github.com/burning0xb...

/* 首先咱們在server.js中添加一個隊列監聽數組global.readyListener,用來記錄已經consume的queue */
import schedule from 'node-schedule';

global.msgQueue = [];
global.resolveRabbit = {};
global.readyListener = [];
/* RabbitSent.js中修改以下代碼, 防止rabbit重複監聽 */
if (!global.readyListener.includes(queue)) {
   global.readyListener.push(queue);
   this.ch.consume(this.ok.queue, (msg) => {
     this.mabeAnswer(msg);
   }, { noAck: true });
 }
/* 添加訂單類型的queue */
switch (type) {
 case 'order':
   queue = config.MQ_QUEUE_ORDER;
   break;
 case 'pay':
   queue = config.MQ_QUEUE_PAY;
   break;
 default:
   queue = config.MQ_QUEUE_COMMON;
   // queue = config.MQ_QUEUE_COMMON_TEST;
   break;
}
/* 添加獲取用戶列表的路由 */
router.get('/user/getUserList', async (ctx, next) => {
    const user = await userHandler.getUserList(ctx);
    ctx.body = user;
})
/* 添加對應的控制器 */
/* 這裏咱們要注意,咱們在掉這個api的時候同時發起了兩個遠端請求,一個爲原始獲取用戶列表,另外一個爲獲取訂單列表 */
/**
* [getUserList description]
* @param  {[type]}  ctx [description]
* @return {Promise}     [description]
*/
async getUserList(ctx) {
 const body = ctx.request.body;
 const content = {
   class: 'user',
   func: 'getUserList',
   content: {}
 };
 const server = this.initServer(this.server);
 const res1 = await server.send(content);

 const content2 = {
   class: 'common',
   func: 'getOrderList',
   content: {}
 };
 const server2 = this.initServer(this.server);
 const res2 = await server2.send(content2, 'order');

 return { res1, res2 };
}

node-service-common ( git: https://github.com/burning0xb... )

/* 這裏改動比較大,增長了發送消息的方法 */
import amqp from 'amqplib/callback_api';
import { logger, logger_date } from './src/log4j';
import config from './config';
import route from './route';
import { RabbitSend } from './rabbitMQ';
import { Cache } from './util';
import packages from './package.json';

logger.info('server started');

global.msgQueue = [];
global.resolveRabbit = {};
global.readyListener = [];

function bail(err, conn) {
  logger.error(err);
}

// 初始化mq的發送promise
function initServer(ch, ok) {
  const server = new RabbitSend(ch, ok)
  return server;
}

// 聲明監聽queue
function assertQueue(ch, q) {
  return new Promise((resolve, reject) => {
    ch.assertQueue(q, {durable: true}, (err, ok) => {
      if (err !== null) return bail(err);
      global.ch = ch;
      global.ok = ok;
      global.server = initServer;
      resolve();
    });
  });
}

// 發送mq的方法,這裏就簡單的傳送的方法中,後續會提出到基礎類,用繼承的方式實現
function mq() {
  return global.server(global.ch, global.ok);
}

// 刪除已使用的queue,這裏解釋下緣由
// 考慮到分佈式節點,同一個服務可能會啓動多個,這裏用uuid去標記不一樣節點的queue,每次從新啓動的時候刪除上次啓動時rabbitmq-server端保留的queue 避免無用堆積 
function delQueues(ch) {
  Cache.getCache(`${packages.name}-mq`).then((res) => {
    if (res) {
      logger.warn('================ start clear mq queues =================');
      Cache.destroy(`${packages.name}-mq`);
      const queues = res.rabbitmq_queues.queues;
      queues.map((key) => {
        ch.checkQueue(key, (err, ok) => {
          if (ok.queue === key) {
            logger.warn(`================== delete queue ${key} ==================`);
            ch.deleteQueue(key);
          }
        });
      });
    }
  });
}

function on_connect(err, conn) {
    if (err !== null)
        return bail(err);

    process.once('SIGINT', () => {
        conn.close();
    });

    var q = config.rabbitMq_queue.logic01

    /*
    測試mq
     */
    // var q = config.rabbitMq_queue.logic02

    // 壓入本地已監聽隊列中
    global.readyListener.push(q);

    conn.createChannel((err, ch) => {
        logger_date.info('rabbitMQ createChannel');
        delQueues(ch);
        assertQueue(ch, q).then(() => {
          ch.prefetch(1);
          ch.consume(q, reply, {
              noAck: false
          }, (err) => {
              if (err !== null)
                  return bail(err, conn);
              logger.info(' [x] Awaiting RPC requests');
          });

          function reply(msg) {
              logger.info('request content is ' + msg.content.toString());
              const request = JSON.parse(msg.content.toString());
              // 聲明返回消息的queue,以及消息id和返回體
              const cb = (response) => {
                  ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId });
                  ch.ack(msg);
              };
              try {
                const func = request.class && request.func ? route[request.class][request.func] : null;
                if (func) {
                  // 這裏傳入發送對象
                  func(cb, request.content, mq);
                } else {
                  cb({
                    err: 'method not allowed'
                  });
                }
              } catch(err) {
                console.log(err);
                cb({
                  code: 500,
                  err: 'server error'
                });
              }
          }
        });
    });
}

amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect);
logger_date.info('rabbitMQ connect success');
logger.warn('don`t kill this process');
/* 這裏的send方法與api-rest的有所不一樣 */
send(content, type) {
    console.log(' [x] Requesting is ', content);
    let queue = config.MQ_QUEUE_ORDER;
    switch (type) {
      case 'log':
        queue = config.MQ_QUEUE_ORDER;
        break;
      case 'pay':
        queue = config.MQ_QUEUE_ORDER;
        break;
      default:
        queue = config.MQ_QUEUE_ORDER;
        break;
    }
    return new Promise(async (resolve, reject) => {
      const correlationId = uuid();
      console.log('========= mq loading ==========');
      global.msgQueue.push(correlationId);
      global.resolveRabbit[correlationId] = {
        resolve: resolve,
        reject: reject
      };
      // 避免重複監聽,不重複取redis,下降開銷
      if (!global.readyListener.includes(queue)) {
        global.readyListener.push(queue);
        // 若是是第一次監聽該通道,從redis裏取一下,查看是否存在指定uuid隊列,存在的話直接用,不存在的話去assert一個隊列,不持久化,而後監聽該隊列,一切初始化完成後進行發送
        const _c = await Cache.getCache(`${packages.name}-mq`);
        console.log(_c);
        if (_c && _c.rabbitmq_queues && _c.rabbitmq_queues.queues) {
          const queues = _c.rabbitmq_queues.queues;
          if (queues.includes(`${queue}-${this.pid}`)) {
            console.log(`========= use old mq queue ${queue}-${this.pid} ==========`);
            this.ch.consume(`${queue}-${this.pid}`, (msg) => {
              this.mabeAnswer(msg);
            }, { noAck: true });
          } else {
            queues.push(`${queue}-${this.pid}`);
            console.log(`========= use new mq queue ${queue}-${this.pid} ==========`);
            Cache.setCache(`${packages.name}-mq`, {
              rabbitmq_queues: {
                queues
              },
            });
            this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => {
              if (err) return;
              this.ch.consume(`${queue}-${this.pid}`, (msg) => {
                this.mabeAnswer(msg);
              }, { noAck: true });
            });
          }
        } else {
          console.log('========== 初始化mq隊列 ==========');
          Cache.setCache(`${packages.name}-mq`, {
            rabbitmq_queues: {
              queues: [`${queue}-${this.pid}`]
            },
          });
          this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => {
            if (err) return;
            this.ch.consume(`${queue}-${this.pid}`, (msg) => {
              this.mabeAnswer(msg);
            }, { noAck: true });
          });
        }
      }
      console.log(`============= use queue ${queue}-${this.pid}  ==============`);
      this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), {
        replyTo: `${queue}-${this.pid}`,
        correlationId: correlationId
      })
    }).catch((err) => {
      console.log(err);
    });
  }
/* 由api-rest發起的微服務調用會被分發到下面這個方法中 */
async getUserList(cb, info, mq) {
    logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`);
    const content = {
     class: 'common',
     func: 'getOrderList',
     content: {}
    };
    // 這裏模擬返回用戶列表,而後再次調用遠端的訂單服務,去拉取訂單列表
    const res = await mq().send(content);
    cb({ code: '00000', users: [], order: res });
}

node-service-order ( git: https://github.com/burning0xb... )

/* 該服務在其餘方法上與common服務相似,不作贅述,監聽的queue不一樣 */
module.exports = Object.assign({
  rabbitMq_host: '192.168.41.144',
  rabbitMq_port: '5672',
  rabbitMq_user: 'admin',
  rabbitMq_password: 'wangrui1994',
  // server_host: '106.14.77.183',
  server_host: '127.0.0.1',
  server_port: 8889,
  rabbitMq_queue: {
    logic01: 'jslight-service-order',
    logic02: 'jslight-service-order-test'
  }
});
/* 這裏爲上一步common服務調用的訂單列表服務 */
async getOrderList(cb, info, mq) {
    logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`);
    const content = {
      class: 'address',
      func: 'getUserAddress',
      content: {}
    };
    // 再次調用遠端帳戶服務中的地址列表,最後將訂單和地址所有返回
    const address = await mq().send(content, 'account');
    cb({ code: '00000', order: [{
      orderId: Date.now(),
      price: 200
    }], address });
}

node-service-account ( git: https://github.com/burning0xb... )

/* 監聽帳戶queue */
module.exports = Object.assign({
  rabbitMq_host: '192.168.41.144',
  rabbitMq_port: '5672',
  rabbitMq_user: 'admin',
  rabbitMq_password: 'wangrui1994',
  // server_host: '106.14.77.183',
  server_host: '127.0.0.1',
  server_port: 8889,
  rabbitMq_queue: {
    logic01: 'jslight-service-account',
    logic02: 'jslight-service-account-test'
  }
});
/* 這裏返回地址列表 */
async getUserAddress(cb, info, mq) {
    logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`);
    cb({ code: '00000', address: { province: '上海', city: '上海', country: '徐彙區' } });
}

至此,由api發起的調用完成所有返回。下面,咱們來從頭梳理一下流程。

首先api網關發起遠端微服務調用,兩個分支,一個爲調用common服務,另外一個爲調用order服務node

分支一 common服務git


分支二 order服務github

整合後redis

至此微服務間的調用整合完成,咱們來看一下控制檯的輸出json

api-rest
咱們能夠看到兩個遠端調用的返回值
api

node-service-common
咱們能夠看到common服務調用了order服務,而且使用監聽了jslight-service-order-23be5586-2411-450d-9523-e0093401830d隊列,最後獲得了遠端的返回值
數組

node-service-order
訂單服務接收到了api網關和common服務發起的調用,請求account服務,而且獲得了返回
promise

node-service-account
帳戶服務接收到兩個由訂單服務發起的請求,並獲得返回值
async

如下爲postman的最終返回結果分佈式

{
    "res1": {
        "finalRes": {
            "code": "00000",
            "users": [],
            "order": {
                "finalRes": {
                    "code": "00000",
                    "order": [
                        {
                            "orderId": 1512366914453,
                            "price": 200
                        }
                    ],
                    "address": {
                        "finalRes": {
                            "code": "00000",
                            "address": {
                                "province": "上海",
                                "city": "上海",
                                "country": "徐彙區"
                            }
                        }
                    }
                }
            }
        }
    },
    "res2": {
        "finalRes": {
            "code": "00000",
            "order": [
                {
                    "orderId": 1512366914501,
                    "price": 200
                }
            ],
            "address": {
                "finalRes": {
                    "code": "00000",
                    "address": {
                        "province": "上海",
                        "city": "上海",
                        "country": "徐彙區"
                    }
                }
            }
        }
    }
}

好了,本節爲你們演示了微服務間的調用,其中還有不少優化須要去作,筆者但願各位可以本身完成。以上是本篇的全部內容,歡迎各位關注個人我的公衆號,提出您的寶貴意見並相互交流學習。

相關文章
相關標籤/搜索