教你用node從零搭建一套微服務系統(二)

      這一節筆者拿微信公衆號開發爲例,帶你們搭建一套簡單的由api網關發起調用請求到遠端通用微服務的系統。這裏做者默認你們已經搭建好rabbitMQ服務並已經成功啓動而且系統中已經安裝sequelize-auto 以及supervisor。

api-rest(先扔一個git地址,這裏是源碼:https://github.com/burning0xb...
項目主要分爲這幾個部分html

一、項目入口
require('babel-core/register')
require('./server');
用babel實現es6風格的書寫
二、服務入口
const app = new Koa();
// 建立基礎路由
const baseRouter = new BaseRouter();
// 使用session(整合redis)
app.use(session({
  key: 'burning:session',
  store: new RedisStore(),
  maxAge: config.maxAge
}));
// 加載中間件依次爲body體格式化,日誌與跨域
app.use(bodyParser());
app.use(Logger());
app.use(convert(cors()));
// 加載基礎路由中登陸過濾
app.use(async (ctx, next) => {
  if (baseRouter.requireLogin(ctx)) {
    await next();
  }
});
// 加載全部路由
app
  .use(router.routes())
  .use(router.allowedMethods());
// 服務啓動
app.listen(config.port, () => {
  logger.info(`server is running port ${config.port}`);
});
三、定時獲取微信access_token
const rule = new schedule.RecurrenceRule();
rule.minute = [0, 20, 40]; // 任務規則
schedule.scheduleJob(rule, () => {
  getAccessToken().then((res) => {
    console.log(res);
    global.wechatToken = res.access_token;
  })
});
這裏採用node-schedule模塊,實現linux式的crontab任務。
四、路由入口
// 定義路由前綴
const router = koaRouter({
  prefix: '/api'
});
// 初始化rabbitMQ 客戶端後再去加載全部的路由
new Client().then((res) => {
  logger.info('rabbitMQ is ready');
  global.MQ = res.RabbitSend;
}).then(() => {
  for (let _router in routers) {
    if (_router !== '') {
      routers[_router](router, upload);
      console.log(`${_router} 加載成功 ?`);
    }
  }
  const userHandler = new UserHandler(global.MQ);
  rollUserList(userHandler);
});
// 這裏去加載一個新的定時任務(每晚去更新關注用戶的數據)
async function rollUserList(userHandler) {
  schedule.scheduleJob('0 0 1 * * *', async () => {
    const userList = await wechatApi.getUserList();
    if (userList.data) {
      console.log(`關注用戶總數 ${userList.total} 人 開始更新用戶信息`);
      userList.data.openid.map(async (openid) => {
        await util.sleep(1);
        const userInfo = await wechatApi.getUserInfo(openid);
        // 這裏是重點,調用遠端的服務去持久化用戶信息
        userHandler.saveWechatUser(userInfo);
      })
    }
  });
}
五、路由文件(舉一個例子)
import { UserHandler } from '../controller';

function userRouter(router, upload) {
    
  // 給controller綁定MQ對象
  const userHandler = new UserHandler(global.MQ);
  // get請求
  router.get('/user/getUserList', async (ctx, next) => {
    const user = await userHandler.getUserList(ctx);
    // respnose返回
    ctx.body = user;
  })
}

export default userRouter;
六、控制器(舉一個例子)
import BaseHandler from './BaseHandler';

export default class UserHandler extends BaseHandler {
    constructor(server) {
      super();
      // 這裏初始化MQ對象
      this.server = server;
    }

    /**
     * [getUserList description]
     * @param  {[type]}  ctx [description]
     * @return {Promise}     [description]
     */
    async getUserList(ctx) {
      // 獲取解析過的請求體
      const body = ctx.request.body;
      // 構造遠端服務請求體
      const content = {
        class: 'user', // 遠端服務的類名
        func: 'getUserList', // 遠端服務的方法
        content: {} // 消息內容
      };
      // 建立新的遠端服務發送對象
      const server = this.initServer(this.server);
      // 發送並取得返回結果
      const res = await server.send(content);
      return res;
    }

}
BaseHandler中有構造新的發送對象的方法
initServer(server) {
    return server(global.ch, global.ok);
}
七、rabbitMQ
這裏重點說一下消息隊列的客戶端
import amqp from 'amqplib/callback_api';
import RabbitSend from './RabbitSend';
import config from '../config.json';

export default class Client {

  constructor() {
    // 建立mq鏈接,on_connect爲回調函數
    return new Promise((resolve, reject) => {
      amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, this.on_connect.bind(this, resolve));
    }).catch((err) => {
      console.log(err);
    });
  }

  // 最後會返回一個new 對象,也就是說每init一次就會new一次
  init(ch, ok) {
    const server = new RabbitSend(ch, ok)
    return server;
  }

  // 失敗處理函數
  bail(err) {
    console.error(err);
  }

  init_client(resolve, RabbitSend) {
    resolve({
      RabbitSend: RabbitSend
    });
  }

  on_connect(resolve, err, conn) {
    if (err !== null) return this.bail(err);
    // 建立信道
    conn.createChannel((err, ch) => {
      if (err !== null) return this.bail(err);
      
      // 通道建立成功後咱們經過通道對象的assertQueue方法來監聽空隊列,並設置durable持久化爲true。
      ch.assertQueue('', { exclusive: true }, (err, ok) => {
        if (err !== null) return this.bail(err);
        global.ch = ch;
        global.ok = ok;
        this.init_client(resolve, (ch, ok) => { return this.init(ch, ok); });
      });
    });
  }
}
import config from '../config.json';
import uuid from 'node-uuid';

// 這裏就是將要去發送消息的對象
export default class RabbitSend {
  constructor(ch, ok) {
    this.ch = ch;
    this.ok = ok;
    this.ramdom = Date.now();
  }

  mabeAnswer(msg) {
    // 若是返回的消息ID再發送的消息隊列中,就去處理
    if (global.msgQueue.includes(msg.properties.correlationId)) {
      console.log(msg.content.toString());
      const index = global.msgQueue.indexOf(msg.properties.correlationId);
      global.msgQueue.splice(index, 1);
      // resove返回的消息
      global.resolveRabbit[msg.properties.correlationId].resolve({
        finalRes: JSON.parse(msg.content.toString())
      });
      // 從待處理隊列中刪除
      delete global.resolveRabbit[msg.properties.correlationId];
    } else {
      // 若是指定消息的promise對象還存在那麼就移除不然直接輸出沒有對應的MQ
      if (global.resolveRabbit[msg.properties.correlationId]) {
        global.resolveRabbit[msg.properties.correlationId].reject({
          err: 'Unexpected message'
        });
        delete global.resolveRabbit[msg.properties.correlationId];
      } else {
        console.log('未找到對應的MQ');
      }
    }
  }

  // 當控制器去掉用send的時候會觸發到這裏
  send(content, type) {
    console.log(' [x] Requesting is ', content);
    let queue = config.MQ_QUEUE_COMMON;
    // let queue = config.MQ_QUEUE_COMMON_TEST;
    // 根據type去區分要調用的queue,默認爲config.MQ_QUEUE_COMMON
    switch (type) {
      case 'log':
        queue = config.MQ_QUEUE_LOG;
        break;
      case 'pay':
        queue = config.MQ_QUEUE_PAY;
        break;
      default:
        queue = config.MQ_QUEUE_COMMON;
        // queue = config.MQ_QUEUE_COMMON_TEST;
        break;
    }
    // 返回一個帶結果的promise對象
    return new Promise((resolve, reject) => {
      // 這裏去聲明消息的ID
      const correlationId = uuid();
      // 將此ID壓入消息隊列中
      global.msgQueue.push(correlationId);
      // 標識當前的promise對象
      global.resolveRabbit[correlationId] = {
        resolve: resolve,
        reject: reject
      };
      // 建立消費者監聽指定queue,noAck: true不作應答
      this.ch.consume(this.ok.queue, (msg) => {
        // 返回的結果處理函數
        this.mabeAnswer(msg);
      }, { noAck: true });
      // 發送到指定queue,指明應答的queue以及消息ID
      this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), {
        replyTo: this.ok.queue,
        correlationId: correlationId
      });
    }).catch((err) => {
      console.log(err);
    });
  }

}
這裏的global能夠用內存模塊去管理,筆者就簡單的存在原始內存對象中,還有一點說明,啓動項目前要修改一下rabbitmq配置,在config.json中:
{
  "port": 8888,
  "rabbitMq_host": "主機IP",
  "rabbitMq_port": "端口",
  "rabbitMq_user": "用戶名",
  "rabbitMq_password": "密碼",
  "MQ_QUEUE_COMMON": "js_server",
  "MQ_QUEUE_COMMON_TEST": "server_test",
  "MQ_QUEUE_LOG": "log",
  "MQ_QUEUE_PAY": "pay",
  "MQ_QUEUE_PAY_TEST": "pay_test",
  "maxAge": 1800000,
  "redis_maxAge": 1800
}

其他的代碼筆者相信有必定js基礎的同窗都能看懂,這裏就不作贅述。下面繼續搭建遠端服務,也就是咱們的消費者。

common-service (git地址:https://github.com/burning0xb...node

這裏的common-service就是咱們所說的微服務,整合了sequelize orm框架,如下是對該服務的解讀。
一、入口文件
import amqp from 'amqplib/callback_api';
import { logger, logger_date } from './src/log4j';
import config from './config';
import route from './route';

logger.info('server started');

function bail(err, conn) {
  logger.error(err);
}

function on_connect(err, conn) {
    if (err !== null)
        return bail(err);

    process.once('SIGINT', () => {
        conn.close();
    });

    var q = config.rabbitMq_queue.logic01

    /*
    測試mq
     */
    // var q = config.rabbitMq_queue.logic02

    // 建立信道
    conn.createChannel((err, ch) => {
        logger_date.info('rabbitMQ createChannel');
        // 監聽指定的queue
        ch.assertQueue(q, {durable: true});
        // 設置公平調度,這裏是指mq不會向一個繁忙的隊列推送超過1條消息。
        ch.prefetch(1);
        // 建立消費者監聽Q,reply爲接收處理函數, noAck: false作出應答
        ch.consume(q, reply, {
            noAck: false
        }, (err) => {
            if (err !== null)
                return bail(err, conn);
            logger.info(' [x] Awaiting RPC requests');
        });
        
        function reply(msg) {
            logger.info('request content is ' + msg.content.toString());
            // 接收的消息內容
            const request = JSON.parse(msg.content.toString());
            // 這裏定義返回函數
            const cb = (response) => {
                ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId });
                ch.ack(msg);
            };
            try {
              // 查找api網關發送的消息中指定的方法
              const func = request.class && request.func ? route[request.class][request.func] : null;
              if (func) {
                // 調用指定方法
                func(cb, request.content);
              } else {
                cb({
                  err: 'method not allowed'
                });
              }
            } catch(err) {
              console.log(err);
              cb({
                code: 500,
                err: 'server error'
              });
            }
        }
    });
}

// 建立鏈接
amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect);
logger_date.info('rabbitMQ connect success');
logger.warn('don`t kill this process');
二、路由
import { User } from './src/server/user';

const user = new User();

const route = {
  user
};

export default route;
三、遠端服務方法(舉例)
import { AttentionUser } from '../../model';
import dbStorage from '../../config/dbStorage';
import moment from 'moment';
import autobind from 'autobind-decorator' // 綁定this
import { logger } from '../../log4j';

@autobind
export default class User {
  constructor() {
  }

  /**
   * [getUserList 獲取用戶信息]
   * @param  {Function} cb   [description]
   * @param  {[type]}   info [description]
   * @return {Promise}       [description]
   */
  async getUserList(cb, info) {
    logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`);
    // 用orm模型去分頁查詢數據
    const attentionUser = await AttentionUser.findAndCount({
      limit: 10,
      offset: 0
    });
    cb({ code: '00000', attentionUser });
  }

  /**
   * [unsubscribe 取消關注]
   * @method unsubscribe
   * @param  {Function}  cb   [description]
   * @param  {[type]}    info [description]
   * @return {Promise}        [description]
   */
  async unsubscribe(cb, info) {
    // 開啓事務
    const t = await dbStorage.transaction();
    try {
      const res = await AttentionUser.update({
        IS_DISPLAY: 'N',
        UPDATE_TIME: new Date()
      }, {
        where: {
          OPENID: info.openid
        }
      }, { transaction: t });
      // 提交事務
      t.commit();
      cb({ code: '00000', res })
    } catch (err) {
      // 回滾事務
      t.rollback();
      console.log(err);
      cb({ code: '00001', err: err });
    }
  }

}
三、sequelize
這裏涉及到sequelize這個框架,筆者不在此去詳細講解,請給位移步官網自行查詢 http://docs.sequelizejs.com/m...
要說明的是,在package.json中筆者寫了一個命令行去自動生成orm的model,你們只要修改對應的數據,再 npm run build 就能夠生成實體模型。
"build": "sequelize-auto -o ./entity_model -d 數據庫 -h 主機IP -u 用戶名 -p 端口  -x 密碼 -e mysql -a ./src/model/config.json"
這裏的數據庫配置也須要修改,在src/config/seqCong.json 中
{
  "database": "數據庫",
  "username": "用戶名",
  "password": "密碼",
  "host": "主機",
  "port": 3306
}

     至此,咱們完成了api網關到一個簡單服務的通訊,在從此的課程中會逐步搭建各種微服務,好比支付服務,物流服務,訂單服務,還有其餘一些框架的搭建。若是各位看官對筆者的文章感興趣的話,但願關注下筆者的我的公衆號,你們一塊兒交流探討,若有寫的不對的地方,還但願各位指正,筆者深感榮幸,最後感謝你們的閱讀。mysql

相關文章
相關標籤/搜索