api-rest(先扔一個git地址,這裏是源碼:https://github.com/burning0xb...)
項目主要分爲這幾個部分html
require('babel-core/register') require('./server');
const app = new Koa(); // 建立基礎路由 const baseRouter = new BaseRouter(); // 使用session(整合redis) app.use(session({ key: 'burning:session', store: new RedisStore(), maxAge: config.maxAge }));
// 加載中間件依次爲body體格式化,日誌與跨域 app.use(bodyParser()); app.use(Logger()); app.use(convert(cors()));
// 加載基礎路由中登陸過濾 app.use(async (ctx, next) => { if (baseRouter.requireLogin(ctx)) { await next(); } });
// 加載全部路由 app .use(router.routes()) .use(router.allowedMethods());
// 服務啓動 app.listen(config.port, () => { logger.info(`server is running port ${config.port}`); });
const rule = new schedule.RecurrenceRule(); rule.minute = [0, 20, 40]; // 任務規則 schedule.scheduleJob(rule, () => { getAccessToken().then((res) => { console.log(res); global.wechatToken = res.access_token; }) });
// 定義路由前綴 const router = koaRouter({ prefix: '/api' });
// 初始化rabbitMQ 客戶端後再去加載全部的路由 new Client().then((res) => { logger.info('rabbitMQ is ready'); global.MQ = res.RabbitSend; }).then(() => { for (let _router in routers) { if (_router !== '') { routers[_router](router, upload); console.log(`${_router} 加載成功 ?`); } } const userHandler = new UserHandler(global.MQ); rollUserList(userHandler); });
// 這裏去加載一個新的定時任務(每晚去更新關注用戶的數據) async function rollUserList(userHandler) { schedule.scheduleJob('0 0 1 * * *', async () => { const userList = await wechatApi.getUserList(); if (userList.data) { console.log(`關注用戶總數 ${userList.total} 人 開始更新用戶信息`); userList.data.openid.map(async (openid) => { await util.sleep(1); const userInfo = await wechatApi.getUserInfo(openid); // 這裏是重點,調用遠端的服務去持久化用戶信息 userHandler.saveWechatUser(userInfo); }) } }); }
import { UserHandler } from '../controller'; function userRouter(router, upload) { // 給controller綁定MQ對象 const userHandler = new UserHandler(global.MQ); // get請求 router.get('/user/getUserList', async (ctx, next) => { const user = await userHandler.getUserList(ctx); // respnose返回 ctx.body = user; }) } export default userRouter;
import BaseHandler from './BaseHandler'; export default class UserHandler extends BaseHandler { constructor(server) { super(); // 這裏初始化MQ對象 this.server = server; } /** * [getUserList description] * @param {[type]} ctx [description] * @return {Promise} [description] */ async getUserList(ctx) { // 獲取解析過的請求體 const body = ctx.request.body; // 構造遠端服務請求體 const content = { class: 'user', // 遠端服務的類名 func: 'getUserList', // 遠端服務的方法 content: {} // 消息內容 }; // 建立新的遠端服務發送對象 const server = this.initServer(this.server); // 發送並取得返回結果 const res = await server.send(content); return res; } }
initServer(server) { return server(global.ch, global.ok); }
import amqp from 'amqplib/callback_api'; import RabbitSend from './RabbitSend'; import config from '../config.json'; export default class Client { constructor() { // 建立mq鏈接,on_connect爲回調函數 return new Promise((resolve, reject) => { amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, this.on_connect.bind(this, resolve)); }).catch((err) => { console.log(err); }); } // 最後會返回一個new 對象,也就是說每init一次就會new一次 init(ch, ok) { const server = new RabbitSend(ch, ok) return server; } // 失敗處理函數 bail(err) { console.error(err); } init_client(resolve, RabbitSend) { resolve({ RabbitSend: RabbitSend }); } on_connect(resolve, err, conn) { if (err !== null) return this.bail(err); // 建立信道 conn.createChannel((err, ch) => { if (err !== null) return this.bail(err); // 通道建立成功後咱們經過通道對象的assertQueue方法來監聽空隊列,並設置durable持久化爲true。 ch.assertQueue('', { exclusive: true }, (err, ok) => { if (err !== null) return this.bail(err); global.ch = ch; global.ok = ok; this.init_client(resolve, (ch, ok) => { return this.init(ch, ok); }); }); }); } }
import config from '../config.json'; import uuid from 'node-uuid'; // 這裏就是將要去發送消息的對象 export default class RabbitSend { constructor(ch, ok) { this.ch = ch; this.ok = ok; this.ramdom = Date.now(); } mabeAnswer(msg) { // 若是返回的消息ID再發送的消息隊列中,就去處理 if (global.msgQueue.includes(msg.properties.correlationId)) { console.log(msg.content.toString()); const index = global.msgQueue.indexOf(msg.properties.correlationId); global.msgQueue.splice(index, 1); // resove返回的消息 global.resolveRabbit[msg.properties.correlationId].resolve({ finalRes: JSON.parse(msg.content.toString()) }); // 從待處理隊列中刪除 delete global.resolveRabbit[msg.properties.correlationId]; } else { // 若是指定消息的promise對象還存在那麼就移除不然直接輸出沒有對應的MQ if (global.resolveRabbit[msg.properties.correlationId]) { global.resolveRabbit[msg.properties.correlationId].reject({ err: 'Unexpected message' }); delete global.resolveRabbit[msg.properties.correlationId]; } else { console.log('未找到對應的MQ'); } } } // 當控制器去掉用send的時候會觸發到這裏 send(content, type) { console.log(' [x] Requesting is ', content); let queue = config.MQ_QUEUE_COMMON; // let queue = config.MQ_QUEUE_COMMON_TEST; // 根據type去區分要調用的queue,默認爲config.MQ_QUEUE_COMMON switch (type) { case 'log': queue = config.MQ_QUEUE_LOG; break; case 'pay': queue = config.MQ_QUEUE_PAY; break; default: queue = config.MQ_QUEUE_COMMON; // queue = config.MQ_QUEUE_COMMON_TEST; break; } // 返回一個帶結果的promise對象 return new Promise((resolve, reject) => { // 這裏去聲明消息的ID const correlationId = uuid(); // 將此ID壓入消息隊列中 global.msgQueue.push(correlationId); // 標識當前的promise對象 global.resolveRabbit[correlationId] = { resolve: resolve, reject: reject }; // 建立消費者監聽指定queue,noAck: true不作應答 this.ch.consume(this.ok.queue, (msg) => { // 返回的結果處理函數 this.mabeAnswer(msg); }, { noAck: true }); // 發送到指定queue,指明應答的queue以及消息ID this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), { replyTo: this.ok.queue, correlationId: correlationId }); }).catch((err) => { console.log(err); }); } }
{ "port": 8888, "rabbitMq_host": "主機IP", "rabbitMq_port": "端口", "rabbitMq_user": "用戶名", "rabbitMq_password": "密碼", "MQ_QUEUE_COMMON": "js_server", "MQ_QUEUE_COMMON_TEST": "server_test", "MQ_QUEUE_LOG": "log", "MQ_QUEUE_PAY": "pay", "MQ_QUEUE_PAY_TEST": "pay_test", "maxAge": 1800000, "redis_maxAge": 1800 }
common-service (git地址:https://github.com/burning0xb... )node
import amqp from 'amqplib/callback_api'; import { logger, logger_date } from './src/log4j'; import config from './config'; import route from './route'; logger.info('server started'); function bail(err, conn) { logger.error(err); } function on_connect(err, conn) { if (err !== null) return bail(err); process.once('SIGINT', () => { conn.close(); }); var q = config.rabbitMq_queue.logic01 /* 測試mq */ // var q = config.rabbitMq_queue.logic02 // 建立信道 conn.createChannel((err, ch) => { logger_date.info('rabbitMQ createChannel'); // 監聽指定的queue ch.assertQueue(q, {durable: true}); // 設置公平調度,這裏是指mq不會向一個繁忙的隊列推送超過1條消息。 ch.prefetch(1); // 建立消費者監聽Q,reply爲接收處理函數, noAck: false作出應答 ch.consume(q, reply, { noAck: false }, (err) => { if (err !== null) return bail(err, conn); logger.info(' [x] Awaiting RPC requests'); }); function reply(msg) { logger.info('request content is ' + msg.content.toString()); // 接收的消息內容 const request = JSON.parse(msg.content.toString()); // 這裏定義返回函數 const cb = (response) => { ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId }); ch.ack(msg); }; try { // 查找api網關發送的消息中指定的方法 const func = request.class && request.func ? route[request.class][request.func] : null; if (func) { // 調用指定方法 func(cb, request.content); } else { cb({ err: 'method not allowed' }); } } catch(err) { console.log(err); cb({ code: 500, err: 'server error' }); } } }); } // 建立鏈接 amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect); logger_date.info('rabbitMQ connect success'); logger.warn('don`t kill this process');
import { User } from './src/server/user'; const user = new User(); const route = { user }; export default route;
import { AttentionUser } from '../../model'; import dbStorage from '../../config/dbStorage'; import moment from 'moment'; import autobind from 'autobind-decorator' // 綁定this import { logger } from '../../log4j'; @autobind export default class User { constructor() { } /** * [getUserList 獲取用戶信息] * @param {Function} cb [description] * @param {[type]} info [description] * @return {Promise} [description] */ async getUserList(cb, info) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); // 用orm模型去分頁查詢數據 const attentionUser = await AttentionUser.findAndCount({ limit: 10, offset: 0 }); cb({ code: '00000', attentionUser }); } /** * [unsubscribe 取消關注] * @method unsubscribe * @param {Function} cb [description] * @param {[type]} info [description] * @return {Promise} [description] */ async unsubscribe(cb, info) { // 開啓事務 const t = await dbStorage.transaction(); try { const res = await AttentionUser.update({ IS_DISPLAY: 'N', UPDATE_TIME: new Date() }, { where: { OPENID: info.openid } }, { transaction: t }); // 提交事務 t.commit(); cb({ code: '00000', res }) } catch (err) { // 回滾事務 t.rollback(); console.log(err); cb({ code: '00001', err: err }); } } }
"build": "sequelize-auto -o ./entity_model -d 數據庫 -h 主機IP -u 用戶名 -p 端口 -x 密碼 -e mysql -a ./src/model/config.json"
{ "database": "數據庫", "username": "用戶名", "password": "密碼", "host": "主機", "port": 3306 }
至此,咱們完成了api網關到一個簡單服務的通訊,在從此的課程中會逐步搭建各種微服務,好比支付服務,物流服務,訂單服務,還有其餘一些框架的搭建。若是各位看官對筆者的文章感興趣的話,但願關注下筆者的我的公衆號,你們一塊兒交流探討,若有寫的不對的地方,還但願各位指正,筆者深感榮幸,最後感謝你們的閱讀。mysql