/* 首先咱們在server.js中添加一個隊列監聽數組global.readyListener,用來記錄已經consume的queue */ import schedule from 'node-schedule'; global.msgQueue = []; global.resolveRabbit = {}; global.readyListener = [];
/* RabbitSent.js中修改以下代碼, 防止rabbit重複監聽 */ if (!global.readyListener.includes(queue)) { global.readyListener.push(queue); this.ch.consume(this.ok.queue, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }
/* 添加訂單類型的queue */ switch (type) { case 'order': queue = config.MQ_QUEUE_ORDER; break; case 'pay': queue = config.MQ_QUEUE_PAY; break; default: queue = config.MQ_QUEUE_COMMON; // queue = config.MQ_QUEUE_COMMON_TEST; break; }
/* 添加獲取用戶列表的路由 */ router.get('/user/getUserList', async (ctx, next) => { const user = await userHandler.getUserList(ctx); ctx.body = user; })
/* 添加對應的控制器 */ /* 這裏咱們要注意,咱們在掉這個api的時候同時發起了兩個遠端請求,一個爲原始獲取用戶列表,另外一個爲獲取訂單列表 */ /** * [getUserList description] * @param {[type]} ctx [description] * @return {Promise} [description] */ async getUserList(ctx) { const body = ctx.request.body; const content = { class: 'user', func: 'getUserList', content: {} }; const server = this.initServer(this.server); const res1 = await server.send(content); const content2 = { class: 'common', func: 'getOrderList', content: {} }; const server2 = this.initServer(this.server); const res2 = await server2.send(content2, 'order'); return { res1, res2 }; }
/* 這裏改動比較大,增長了發送消息的方法 */ import amqp from 'amqplib/callback_api'; import { logger, logger_date } from './src/log4j'; import config from './config'; import route from './route'; import { RabbitSend } from './rabbitMQ'; import { Cache } from './util'; import packages from './package.json'; logger.info('server started'); global.msgQueue = []; global.resolveRabbit = {}; global.readyListener = []; function bail(err, conn) { logger.error(err); } // 初始化mq的發送promise function initServer(ch, ok) { const server = new RabbitSend(ch, ok) return server; } // 聲明監聽queue function assertQueue(ch, q) { return new Promise((resolve, reject) => { ch.assertQueue(q, {durable: true}, (err, ok) => { if (err !== null) return bail(err); global.ch = ch; global.ok = ok; global.server = initServer; resolve(); }); }); } // 發送mq的方法,這裏就簡單的傳送的方法中,後續會提出到基礎類,用繼承的方式實現 function mq() { return global.server(global.ch, global.ok); } // 刪除已使用的queue,這裏解釋下緣由 // 考慮到分佈式節點,同一個服務可能會啓動多個,這裏用uuid去標記不一樣節點的queue,每次從新啓動的時候刪除上次啓動時rabbitmq-server端保留的queue 避免無用堆積 function delQueues(ch) { Cache.getCache(`${packages.name}-mq`).then((res) => { if (res) { logger.warn('================ start clear mq queues ================='); Cache.destroy(`${packages.name}-mq`); const queues = res.rabbitmq_queues.queues; queues.map((key) => { ch.checkQueue(key, (err, ok) => { if (ok.queue === key) { logger.warn(`================== delete queue ${key} ==================`); ch.deleteQueue(key); } }); }); } }); } function on_connect(err, conn) { if (err !== null) return bail(err); process.once('SIGINT', () => { conn.close(); }); var q = config.rabbitMq_queue.logic01 /* 測試mq */ // var q = config.rabbitMq_queue.logic02 // 壓入本地已監聽隊列中 global.readyListener.push(q); conn.createChannel((err, ch) => { logger_date.info('rabbitMQ createChannel'); delQueues(ch); assertQueue(ch, q).then(() => { ch.prefetch(1); ch.consume(q, reply, { noAck: false }, (err) => { if (err !== null) return bail(err, conn); logger.info(' [x] Awaiting RPC requests'); }); function reply(msg) { logger.info('request content is ' + msg.content.toString()); const request = JSON.parse(msg.content.toString()); // 聲明返回消息的queue,以及消息id和返回體 const cb = (response) => { ch.sendToQueue(msg.properties.replyTo, new Buffer(JSON.stringify(response)), { correlationId: msg.properties.correlationId }); ch.ack(msg); }; try { const func = request.class && request.func ? route[request.class][request.func] : null; if (func) { // 這裏傳入發送對象 func(cb, request.content, mq); } else { cb({ err: 'method not allowed' }); } } catch(err) { console.log(err); cb({ code: 500, err: 'server error' }); } } }); }); } amqp.connect('amqp://' + config.rabbitMq_user + ':' + config.rabbitMq_password + '@' + config.rabbitMq_host + ':' + config.rabbitMq_port, on_connect); logger_date.info('rabbitMQ connect success'); logger.warn('don`t kill this process');
/* 這裏的send方法與api-rest的有所不一樣 */ send(content, type) { console.log(' [x] Requesting is ', content); let queue = config.MQ_QUEUE_ORDER; switch (type) { case 'log': queue = config.MQ_QUEUE_ORDER; break; case 'pay': queue = config.MQ_QUEUE_ORDER; break; default: queue = config.MQ_QUEUE_ORDER; break; } return new Promise(async (resolve, reject) => { const correlationId = uuid(); console.log('========= mq loading =========='); global.msgQueue.push(correlationId); global.resolveRabbit[correlationId] = { resolve: resolve, reject: reject }; // 避免重複監聽,不重複取redis,下降開銷 if (!global.readyListener.includes(queue)) { global.readyListener.push(queue); // 若是是第一次監聽該通道,從redis裏取一下,查看是否存在指定uuid隊列,存在的話直接用,不存在的話去assert一個隊列,不持久化,而後監聽該隊列,一切初始化完成後進行發送 const _c = await Cache.getCache(`${packages.name}-mq`); console.log(_c); if (_c && _c.rabbitmq_queues && _c.rabbitmq_queues.queues) { const queues = _c.rabbitmq_queues.queues; if (queues.includes(`${queue}-${this.pid}`)) { console.log(`========= use old mq queue ${queue}-${this.pid} ==========`); this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); } else { queues.push(`${queue}-${this.pid}`); console.log(`========= use new mq queue ${queue}-${this.pid} ==========`); Cache.setCache(`${packages.name}-mq`, { rabbitmq_queues: { queues }, }); this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => { if (err) return; this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }); } } else { console.log('========== 初始化mq隊列 =========='); Cache.setCache(`${packages.name}-mq`, { rabbitmq_queues: { queues: [`${queue}-${this.pid}`] }, }); this.ch.assertQueue(`${queue}-${this.pid}`, {durable: false}, (err, ok) => { if (err) return; this.ch.consume(`${queue}-${this.pid}`, (msg) => { this.mabeAnswer(msg); }, { noAck: true }); }); } } console.log(`============= use queue ${queue}-${this.pid} ==============`); this.ch.sendToQueue(queue, new Buffer(JSON.stringify(content)), { replyTo: `${queue}-${this.pid}`, correlationId: correlationId }) }).catch((err) => { console.log(err); }); }
/* 由api-rest發起的微服務調用會被分發到下面這個方法中 */ async getUserList(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); const content = { class: 'common', func: 'getOrderList', content: {} }; // 這裏模擬返回用戶列表,而後再次調用遠端的訂單服務,去拉取訂單列表 const res = await mq().send(content); cb({ code: '00000', users: [], order: res }); }
/* 該服務在其餘方法上與common服務相似,不作贅述,監聽的queue不一樣 */ module.exports = Object.assign({ rabbitMq_host: '192.168.41.144', rabbitMq_port: '5672', rabbitMq_user: 'admin', rabbitMq_password: 'wangrui1994', // server_host: '106.14.77.183', server_host: '127.0.0.1', server_port: 8889, rabbitMq_queue: { logic01: 'jslight-service-order', logic02: 'jslight-service-order-test' } });
/* 這裏爲上一步common服務調用的訂單列表服務 */ async getOrderList(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); const content = { class: 'address', func: 'getUserAddress', content: {} }; // 再次調用遠端帳戶服務中的地址列表,最後將訂單和地址所有返回 const address = await mq().send(content, 'account'); cb({ code: '00000', order: [{ orderId: Date.now(), price: 200 }], address }); }
/* 監聽帳戶queue */ module.exports = Object.assign({ rabbitMq_host: '192.168.41.144', rabbitMq_port: '5672', rabbitMq_user: 'admin', rabbitMq_password: 'wangrui1994', // server_host: '106.14.77.183', server_host: '127.0.0.1', server_port: 8889, rabbitMq_queue: { logic01: 'jslight-service-account', logic02: 'jslight-service-account-test' } });
/* 這裏返回地址列表 */ async getUserAddress(cb, info, mq) { logger.warn(`this is moment format ${moment().format('YYYY-MM-DD hh:mm:ss')}`); cb({ code: '00000', address: { province: '上海', city: '上海', country: '徐彙區' } }); }
首先api網關發起遠端微服務調用,兩個分支,一個爲調用common服務,另外一個爲調用order服務node
分支一 common服務git
分支二 order服務github
整合後redis
至此微服務間的調用整合完成,咱們來看一下控制檯的輸出json
api-rest
咱們能夠看到兩個遠端調用的返回值api
node-service-common
咱們能夠看到common服務調用了order服務,而且使用監聽了jslight-service-order-23be5586-2411-450d-9523-e0093401830d隊列,最後獲得了遠端的返回值數組
node-service-order
訂單服務接收到了api網關和common服務發起的調用,請求account服務,而且獲得了返回promise
node-service-account
帳戶服務接收到兩個由訂單服務發起的請求,並獲得返回值async
如下爲postman的最終返回結果分佈式
{ "res1": { "finalRes": { "code": "00000", "users": [], "order": { "finalRes": { "code": "00000", "order": [ { "orderId": 1512366914453, "price": 200 } ], "address": { "finalRes": { "code": "00000", "address": { "province": "上海", "city": "上海", "country": "徐彙區" } } } } } } }, "res2": { "finalRes": { "code": "00000", "order": [ { "orderId": 1512366914501, "price": 200 } ], "address": { "finalRes": { "code": "00000", "address": { "province": "上海", "city": "上海", "country": "徐彙區" } } } } } }
好了,本節爲你們演示了微服務間的調用,其中還有不少優化須要去作,筆者但願各位可以本身完成。以上是本篇的全部內容,歡迎各位關注個人我的公衆號,提出您的寶貴意見並相互交流學習。