seneca是一個nodejs微服務工具集,它賦予系統易於連續構建和更新的能力。下面會逐一和你們一塊兒瞭解相關技術入門以及實踐。html
這裏插入一段硬廣。小子再進行簡單整合以後擼了個vastify框架 ---- 輕量級nodejs微服務框架,有興趣的同窗過目一下,歡迎順手star一波,另外有疑問或者代碼有毛病歡迎在博文下方留言。node
"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"
複製代碼
"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"
複製代碼
index.js(accout-server/src/index.js)nginx
const seneca = require('seneca')()
seneca.use('cmd:login', (msg, done) => {
const { username, pass } = msg
if (username === 'asd' && pass === '123') {
return done(null, { code: 1000 })
}
return done(null, { code: 2100 })
})
const Promise = require('bluebird')
const act = Promise.promisify(seneca.act, { context: 'seneca' })
act({
cmd: 'login',
username: 'asd',
pass: '123'
}).then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})
複製代碼
執行後git
{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}
複製代碼
seneca.add
方法,添加一個action pattern到Seneca實例中,它有三個參數:github
pattern
: 用於Seneca中JSON的消息匹配模式,對象或格式化字符串sub_pattern
: 子模式,優先級低於主模式(可選)action
: 當匹配成功後的動做函數seneca.act
方法,執行Seneca實例中匹配成功的動做,它也有兩個參數:web
msg
: JSON消息sub_pattern
: 子消息,優先級低於主消息(可選)response
: 用於接收服務調用結果seneca.use
方法,爲Seneca實例添加一個插件,它有兩個參數:(此處插件的原理和中間件有一些不一樣)redis
func
: 插件執行方法options
: 插件所需options(可選)核心是利用JSON
對象進行模式匹配。這個JSON對象既包含某個微服務所須要調取另外一個微服務的特徵,同時也包含傳參。和Java微服務發現有些相似不過是用模式代替ip+port,目前爲止模式是徹底能夠實現服務發現功能,可是否更加靈活還有待去挖掘。mongodb
所需注意的點docker
index.js(config-server/src/index.js)shell
const seneca = require('seneca')()
const config = {
SUCCESS_NORMAL_RES: {
code: 1000,
desc: '服務端正常響應'
}}
seneca.add('$target$:config-server', (msg, done) => {
return done(null, config)
}).listen(10011)
複製代碼
運行此腳本後可在瀏覽器中輸入http://localhost:10011/act?cmd=config
發起請求獲取全局配置信息
OR
const seneca = require('seneca')()
const Promise = require('bluebird')
const act = Promise.promisify(seneca.act, { context: seneca })
seneca.client(10011)
act('$$target:config-server, default$:{msg:404}').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})
複製代碼
noname-server
const seneca = require('seneca')()
seneca.add('$$target:account-server', (msg, done) => {
done(null, { seneca: '666' })
})
seneca.listen(10015)
複製代碼
config-server(同上)
call
const seneca = require('seneca')()
const Promise = require('blurebird')
const act = Promise.promisify(seneca.act, { context: seneca })
seneca.client({
port: '10011',
pin: '$$target:account-server'
})
seneca.client({
port: '10015',
pin: '$$target:noname-server'
})
act('$$target:account-server').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})
act('$$target:noname-server').then(res => {
console.log(res)
}).catch(err => {
console.log(err)
})
複製代碼
集成koa
const seneca = require('seneca')()
const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')
// 初始化用戶模塊
seneca.use(userModule.init)
// 初始化seneca-web插件,並適配koa
seneca.use(SenecaWeb, {
context: Router(),
adapter: require('seneca-web-adapter-koa2'),
routes: [...userModule.routes]
})
// 將routes導出給koa app
seneca.ready(() => {
app.use(seneca.export('web/context')().routes())
})
app.listen(3333)
複製代碼
user模塊
const $module = 'module:user'
let userCount = 3
const REST_Routes = [
{
prefix: '/user',
pin: `${$module},if:*`,
map: {
list: {
GET: true,
name: ''
},
load: {
GET: true,
name: '',
suffix: '/:id'
},
edit: {
PUT: true,
name: '',
suffix: '/:id'
},
create: {
POST: true,
name: ''
},
delete: {
DELETE: true,
name: '',
suffix: '/:id'
}
}
}
]
const db = {
users: [{
id: 1,
name: '甲'
}, {
id: 2,
name: '乙'
}, {
id: 3,
name: '丙'
}]
}
function user(options) {
this.add(`${$module},if:list`, (msg, done) => {
done(null, db.users)
})
this.add(`${$module},if:load`, (msg, done) => {
const { id } = msg.args.params
done(null, db.users.find(v => Number(id) === v.id))
})
this.add(`${$module},if:edit`, (msg, done) => {
let { id } = msg.args.params
id = +id
const { name } = msg.args.body
const index = db.users.findIndex(v => v.id === id)
if (index !== -1) {
db.users.splice(index, 1, {
id,
name
})
done(null, db.users)
} else {
done(null, { success: false })
}
})
this.add(`${$module},if:create`, (msg, done) => {
const { name } = msg.args.body
db.users.push({
id: ++userCount,
name
})
done(null, db.users)
})
this.add(`${$module},if:delete`, (msg, done) => {
let { id } = msg.args.params
id = +id
const index = db.users.findIndex(v => v.id === id)
if (index !== -1) {
db.users.splice(index, 1)
done(null, db.users)
} else {
done(null, { success: false })
}
})
}
module.exports = {
init: user,
routes: REST_Routes
}
複製代碼
vscode-restclient(vscode的restclient插件,用於發起RESTFUL請求)
### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json
{
"name": "測試添加用戶"
}
### delete
DELETE http://localhost:3333/user/2 HTTP/1.1
### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json
{
"name": "測試修改用戶信息"
}
### GET
GET http://localhost:3333/user HTTP/1.1
### GET
GET http://localhost:3333/user/3 HTTP/1.1
複製代碼
可在構造函數中傳入配置,log屬性能夠控制日誌級別
例1:傳字符串
require('seneca')({
// quiet silent any all print standard test
log: 'all'
})
複製代碼
例2:傳對象
require('seneca')({
log: {
// none debug+ info+ warn+
level: 'debug+'
},
// 設置爲true時,seneca日誌功能會encapsulate senecaId,senecaTag,actId等字段後輸出(通常爲兩字符)
short: true
})
複製代碼
建議例2代碼,由於seneca-web-adapter-koa2插件打印的日誌level爲debug,利於作web接口訪問日誌記錄。
Logger.js
const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format
const logger = createLogger({
level: 'info',
format: combine(
label({label: 'microservices'}),
timestamp(),
printf(info => {
return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
})
),
transports: [ new transports.Console() ]
})
// highest to lowest
const levels = {
error: 0,
warn: 1,
info: 2,
verbose: 3,
debug: 4,
silly: 5
}
module.exports = logger
複製代碼
日誌輸出格式
2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客戶端的調用請求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message
複製代碼
producer.js
// 建立一個amqp對等體
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'taskQueue1'
const msg = process.argv.slice(2).join(' ') || 'hello world'
// 爲方式RabbitMQ退出或者崩潰時重啓後丟失隊列信息,這裏配置durable:true(同時在消費者腳本中也要配置durable:true)後,
ch.assertQueue(q, { durable: true })
// 這裏配置persistent:true,經過閱讀官方文檔,我理解爲當程序重啓後,會斷點續傳以前未send完成的數據消息。(但此功能並不可靠,由於不會爲全部消息執行同步IO,會緩存在cache並在某個恰當時機write到disk)
ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
setTimeout(() => {
conn.close(); process.exit(0)
}, 100)
})
})
複製代碼
// 建立一個amqp對等體
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'taskQueue1'
// 爲方式RabbitMQ退出或者崩潰時重啓後丟失隊列信息,這裏配置durable:true(同時在消費者腳本中也要定義durable:true)後,
ch.assertQueue(q, { durable: true })
ch.prefetch(1)
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
ch.consume(q, msg => {
const secs = msg.content.toString().split('.').length - 1
console.log(" [x] Received %s", msg.content.toString())
setTimeout(() => {
console.log(" [x] Done")
ch.ack(msg)
}, secs * 1000)
})
// noAck配置(默認爲false)代表consumer是否須要在處理完後反饋ack給producer,若是設置爲true,則RabbitMQ服務若是將任務send至此consumer後不關心任務實際處理結果,send任務後直接標記已完成;不然,RabbiMQ獲得ack反饋後才標記爲已完成,若是一直未收到ack默認會一直等待ack而後標記,另外若是接收到nack或者該consumer進程退出則繼續dispatcher任務
})
})
複製代碼
檢驗過程
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
複製代碼
node producer.js(rabbitMQ執行過程爲會先建立一個匿名exchange,一個指定queue而後將queue與該匿名exchange綁定)
rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange taskQueue1 queue taskQueue1 []
複製代碼
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1 1
複製代碼
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
複製代碼
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1 0
複製代碼
知識點
publisher.js
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ex = 'logs'
const msg = process.argv.slice(2).join(' ') || 'Hello World!'
// ex爲exchange名稱(惟一)
// 模式爲fanout
// 不對消息持久化存儲
ch.assertExchange(ex, 'fanout', { durable: false })
// 第二個參數爲指定某一個binding,如爲空則由RabbitMQ隨機指定
ch.publish(ex, '', Buffer.from(msg))
console.log(' [x] Send %s', msg)
})
setTimeout(() => {
conn.close()
process.exit(0)
}, 100)
})
複製代碼
subscriber.js
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ex = 'logs'
// ex -> exchange是發佈/訂閱消息的載體,
// fanout -> 分發消息的模式,fanout,direct,topic,headers
// durable設置爲false下降一些可靠性,提升性能,由於不須要磁盤IO持久化存儲消息,另外
ch.assertExchange(ex, 'fanout', { durable: false })
// 使用匿名(也就是RabbitMQ自動生成隨機名的queue)隊列
// exclusive設置爲true,便可以當其寄生的connection被close的時候自動deleted
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
// 綁定隊列到某個exchange載體(監聽某個exchange的消息)
// 第三個入參爲binding key
ch.bindQueue(q.queue, ex, '')
// 消費即訂閱某個exchange的消息並設置處理句柄
// 由於發佈/訂閱消息的模式就是非可靠性,只有當訂閱者訂閱才能收到相關的消息並且發佈者不關心該消息的訂閱者是誰以及處理結果如何,因此這裏noAck會置爲true
ch.consume(q.queue, (msg) => {
console.log(' [x] %s', msg.content.toString())
}, { noAck: true })
})
})
})
複製代碼
檢驗過程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前測試使用的queues、echanges、bindings)
node subscriber.js
[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C
複製代碼
rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
logs fanout
複製代碼
rabbitmqctl list_bindings
Listing bindings for vhost /...
exchange amq.gen-jDbfwJR8TbSNJT2a2a83Og queue amq.gen-jDbfwJR8TbSNJT2a2a83Og []
logs exchange amq.gen-jDbfwJR8TbSNJT2a2a83Og queue []
複製代碼
node publisher.js tasks.........
[x] Send tasks......... // publiser.js
[x] tasks......... // subscriber.js
複製代碼
知識點
exchange.js
module.exports = {
name: 'ex1',
type: 'direct',
option: {
durable: false
},
ranks: ['info', 'error', 'warning', 'severity']
}
複製代碼
direct-routing.js
const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(ex.name, ex.type, ex.options)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})
複製代碼
subscriber.js
const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ranks = ex.ranks
ranks.forEach(rank => {
// 聲明一個非匿名queue
ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
ch.bindQueue(q.queue, ex.name, rank)
ch.consume(q.queue, msg => {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true })
})
})
})
})
複製代碼
publisher.js
const amqp = require('amqplib/callback_api')
const ex = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const ranks = ex.ranks
ranks.forEach(rank => {
ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
})
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})
複製代碼
檢驗過程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前測試使用的queues、echanges、bindings)
node direct-routing.js
rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
amq.headers headers
ex1 direct
amq.fanout fanout
amq.rabbitmq.trace topic
amq.topic topic
direct
amq.direct direct
amq.match headers
複製代碼
node subscriber.js
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue 0
error-queue 0
info-queue 0
warning-queue 0
Listing bindings for vhost /...
exchange error-queue queue error-queue []
exchange info-queue queue info-queue []
exchange severity-queue queue severity-queue []
exchange warning-queue queue warning-queue []
ex1 exchange error-queue queue error []
ex1 exchange info-queue queue info []
ex1 exchange severity-queue queue severity []
ex1 exchange warning-queue queue warning []
複製代碼
node publisher.js
[x] info: 'info logs...'
[x] error: 'error logs...'
[x] severity: 'severity logs...'
[x] warning: 'warning logs...'
複製代碼
知識點
exchange.js
module.exports = {
name: 'ex2',
type: 'topic',
option: {
durable: false
},
ranks: ['info', 'error', 'warning', 'severity']
}
複製代碼
topic-routing.js
const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})
複製代碼
subscriber.js
const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const args = process.argv.slice(2)
const keys = (args.length > 0) ? args : ['anonymous.info']
console.log(' [*] Waiting for logs. To exit press CTRL+C');
keys.forEach(key => {
ch.assertQueue('', { exclusive: true }, (err, q) => {
console.log(` [x] Listen by routingKey ${key}`)
ch.bindQueue(q.queue, exchangeConfig.name, key)
ch.consume(q.queue, msg => {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, { noAck: true })
})
})
})
})
複製代碼
publisher.js
const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const args = process.argv.slice(2)
const key = (args.length > 1) ? args[0] : 'anonymous.info'
const msg = args.slice(1).join(' ') || 'hello world'
ch.publish(exchangeConfig.name, key, Buffer.from(msg))
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
})
複製代碼
檢驗過程
rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app
(清空以前測試使用的queues、echanges、bindings)
node topic-routing.js
Listing exchanges for vhost / ...
amq.fanout fanout
amq.rabbitmq.trace topic
amq.headers headers
amq.match headers
ex2 topic
direct
amq.topic topic
amq.direct direct
複製代碼
node subscriber.js "#.info" "*.error"
[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
複製代碼
[x] account-server.info:'用戶服務測試'
[x] config-server.info:'配置服務測試'
[x] config-server.error:'配置服務出錯'
複製代碼
知識點
#
可匹配0或多個單詞,*
可精確匹配1個單詞rpc_server.js
const amqp = require('amqplib/callback_api')
const logger = require('./Logger')
let connection = null
amqp.connect('amqp://localhost', (err, conn) => {
connection = conn
conn.createChannel((err, ch) => {
const q = 'account_rpc_queue'
ch.assertQueue(q, { durable: true })
ch.prefetch(2)
ch.consume(q, msg => {
let data = {}
let primitiveContent = msg.content.toString()
try {
data = JSON.parse(primitiveContent)
} catch (e) {
logger.error(new Error(e))
}
logger.info('接收到rpc客戶端的調用請求')
if (msg.properties.correlationId === '10abc') {
logger.info(primitiveContent)
const uid = Number(data.uid) || -1
let r = getUserById(uid)
ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
ch.ack(msg)
} else {
logger.info('不匹配的調用請求')
}
})
})
})
function getUserById (uid) {
let result = ''
if (uid === +uid && uid > 0) {
result = {
state: 1000,
msg: '成功',
data: {
uid: uid,
name: '小強',
sex: 1
}
}
} else {
result = {
state: 2000,
msg: '傳參格式錯誤'
}
}
return result
}
process.on('SIGINT', () => {
logger.warn('SIGINT')
connection && connection.close()
process.exit(0)
})
複製代碼
rpc_client.js
const amqp = require('amqplib/callback_api')
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel((err, ch) => {
const q = 'account_rpc_queue'
const callback = 'callback_queue'
ch.assertQueue(callback, { durable: true })
ch.consume(callback, msg => {
const result = msg.content.toString()
console.log(`接收到回調的消息啦!`)
console.log(result)
ch.ack(msg)
setTimeout(() => {
conn.close()
process.exit(0)
}, 0)
})
ch.assertQueue(q, { durable: true })
const msg = {
uid: 2
}
ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
persistent: true,
correlationId: '10abc',
replyTo: 'callback_queue'
})
})
})
複製代碼
檢驗過程
node rpc_server.js
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue 0
複製代碼
node rpc_client.js
rpc_client的CLI打印
接收到回調的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小強","sex":1}}
複製代碼
rpc_server的CLI打印
接收到rpc客戶端的調用請求
{ uid: 2 }
複製代碼
pm2 start app.js
-w --watch
:監聽目錄變化,如變化則自動重啓應用--ignore-file
:監聽目錄變化時忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
-n --name
:設置應用名字,可用於區分應用-i --instances
:設置應用實例個數,0與max相同-f --force
: 強制啓動某應用,經常用於有相同應用在運行的狀況-o --output <path>
:標準輸出日誌文件的路徑-e --error <path>
:錯誤輸出日誌文件的路徑--env <path>
:配置環境變量如pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log
在cluster-mode,也就是-i max下,日誌文件會自動在後面追加-${index}保證不重複
pm2 stop app_name|app_id
pm2 restart app_name|app_id
pm2 delete app_name|app_id
pm2 show app_name|app_id
OR pm2 describe app_name|app_id
pm2 list
pm2 monit
pm2 logs app_name|app_id --lines <n> --err
pm2 stop app_name|app_id
process.on('SIGINT', () => {
logger.warn('SIGINT')
connection && connection.close()
process.exit(0)
})
複製代碼
當進程結束前,程序會攔截SIGINT
信號從而在進程即將被殺掉前去斷開數據庫鏈接等等佔用內存的操做後再執行process.exit()從而優雅的退出進程。(如在1.6s後進程還未結束則繼續發送SIGKILL
信號強制進程結束)
ecosystem.config.js
const appCfg = {
args: '',
max_memory_restart: '150M',
env: {
NODE_ENV: 'development'
},
env_production: {
NODE_ENV: 'production'
},
// source map
source_map_support: true,
// 不合並日志輸出,用於集羣服務
merge_logs: false,
// 經常使用於啓動應用時異常,超時時間限制
listen_timeout: 5000,
// 進程SIGINT命令時間限制,即進程必須在監聽到SIGINT信號後必須在如下設置時間結束進程
kill_timeout: 2000,
// 當啓動異常後不嘗試重啓,運維人員嘗試找緣由後重試
autorestart: false,
// 不容許以相同腳本啓動進程
force: false,
// 在Keymetrics dashboard中執行pull/upgrade操做後執行的命令隊列
post_update: ['npm install'],
// 監聽文件變化
watch: false,
// 忽略監聽文件變化
ignore_watch: ['node_modules']
}
function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) {
if (name) {
return Object.assign({
name,
script: script || `${name}.js`,
error_file: error_file || `${name}-err.log`,
out_file: out_file|| `${name}-out.log`,
instances,
exec_mode: instances > 1 ? 'cluster' : 'fork',
args
}, appCfg)
} else {
return null
}
}
module.exports = {
apps: [
GeneratePM2AppConfig({
name: 'client',
script: './rpc_client.js'
}),
GeneratePM2AppConfig({
name: 'server',
script: './rpc_server.js',
instances: 1
})
]
}
複製代碼
pm2 start ecosystem.config.js
避坑指南:processFile文件命名建議爲*.config.js格式。不然後果自負。
// 用不一樣用戶對不一樣遠程主機發起ssh請求時指定私鑰
Host qingf.me
User deploy
IdentityFile ~/.ssh/qf_deployment_rsa
// 設置爲no可去掉首次登錄(y/n)的選擇
StrictHostKeyChecking no
// 別名用法
Host deployment
User deploy
Hostname qingf.me
IdentityFile ~/.ssh/qingf_deployment_rsa
StrictHostKeyChecking no
複製代碼
與上述apps同級增長deploy屬性,以下
deploy: {
production: {
'user': 'deploy',
'host': 'qingf.me',
'ref': 'remotes/origin/master',
'repo': 'https://github.com/Cecil0o0/account-server.git',
'path': '/home/deploy/apps/account-server',
// 生命週期鉤子,在ssh到遠端以後setup操做以前執行
'pre-setup': '',
// 生命週期鉤子,在初始化設置即git pull以後執行
'post-setup': 'ls -la',
// 生命週期鉤子,在遠端git fetch origin以前執行
'pre-setup': '',
// 生命週期鉤子,在遠端git修改HEAD指針到指定ref以後執行
'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production',
// 如下這個環境變量將注入到全部app中
"env" : {
"NODE_ENV": "test"
}
}
}
複製代碼
tip:please make git working directory clean first!
而後前後執行如下兩條命令**(注意config文件路徑)**
pm2 deploy <configuration_file>
Commands:
setup run remote setup commands
update update deploy to the latest release
revert [n] revert to [n]th last deployment or 1
curr[ent] output current release commit
prev[ious] output previous release commit
exec|run <cmd> execute the given <cmd>
list list previous deploy commits
[ref] deploy to [ref], the "ref" setting, or latest tag
複製代碼
seneca內置log系統如何作自定義日誌打印?
舒適提示:請以正常的http請求開始,由於通過測試若是微服務自主發起act,其seneca.fixedargs['tx$']值不一樣。
Consul是一個分佈式集羣服務註冊發現工具,並具備健康檢查、分級式KV存儲、多數據中心等高級特性。
consul agent -dev -ui
{
"service": {
// 服務名,稍後用於query服務
"name": "account-server",
// 服務標籤
"tags": ["account-server"],
// 服務元信息
"meta": {
"meta": "for my service"
},
// 服務端口
"port": 3333,
// 不容許標籤覆蓋
"enable_tag_override": false,
// 腳本檢測作health checks 與-enable-script-checks=true配合使用,有腳本模式、TCP模式、HTTP模式、TTL模式
"checks": [
{
"http": "http://localhost:3333/user",
"interval": "10s"
}
]
}
}
複製代碼
curl http://localhost:8500/v1/catalog/service/account-server
[
{
"ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
"Node": "haojiechen.local",
"Address": "127.0.0.1",
"Datacenter": "dc1",
"TaggedAddresses": {
"lan": "127.0.0.1",
"wan": "127.0.0.1"
},
"NodeMeta": {
"consul-network-segment": ""
},
"ServiceID": "account-server",
"ServiceName": "account-server",
"ServiceTags": [
"account-server"
],
"ServiceAddress": "",
"ServiceMeta": {
"meta": "for my service"
},
"ServicePort": 3333,
"ServiceEnableTagOverride": false,
"CreateIndex": 6,
"ModifyIndex": 6
}
]
複製代碼
某一個結點啓動一個server模式代理,以下
consul agent -server -bootstrap-expect=1 \
-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
複製代碼
查看集羣成員
consul members
Node Address Status Type Build Protocol DC Segment
agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1 <all>
複製代碼
另外一個結點啓動一個client模式代理,以下
consul agent \
-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \
-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
複製代碼
查看集羣成員
consul members
Node Address Status Type Build Protocol DC Segment
agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1 <all>
複製代碼
加入Cluster
consul join 139.129.5.228
consul members
Node Address Status Type Build Protocol DC Segment
agent-one valid extranet IP:8301 alive server 1.1.0 2 dc1 <all>
agent-two 139.129.5.228:8301 alive server 1.1.0 2 dc1 <all>
複製代碼
config.js
// 服務註冊與發現
// https://github.com/silas/node-consul#catalog-node-services
'serverR&D': {
consulServer: {
type: 'consul',
host: '127.0.0.1',
port: 8500,
secure: false,
ca: [],
defaults: {
token: ''
},
promisify: true
},
bizService: {
name: 'defaultName',
id: 'defaultId',
address: '127.0.0.1',
port: 1000,
tags: [],
meta: {
version: '',
description: '註冊集羣'
},
check: {
http: '',
// check間隔時間(ex: 15s)
interval: '10s',
// check超時時間(ex: 10s)
timeout: '2s',
// 處於臨界狀態後自動註銷服務的超時時間
deregistercriticalserviceafter: '30s',
// 初始化狀態值爲成功
status: 'passing',
// 備註
notes: '{"version":"111","microservice-port":1115}'
}
}
}
複製代碼
server-register.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 11:26:49 * @Description 微服務註冊方法 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
// 註冊服務
function register({ consulServer = {}, bizService = {} } = {}) {
if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
const service = defaultConf.bizService
service.name = generateServiceName(bizService.name)
service.id = service.name
service.address = bizService.host
service.port = bizService.port
service.check.http = generateCheckHttp(bizService.host, bizService.port)
service.check.notes = JSON.stringify(bizService.meta)
return new Promise((resolve, reject) => {
consul.agent.service.list().then(services => {
// 檢查主機+端口是否已被佔用
Object.keys(services).some(key => {
if (services[key].Address === service.address && services[key].Port === service.port) {
throw new Error(`該服務集羣endpoint[${service.address}, ${service.port}]已被佔用!`)
}
})
// 註冊集羣服務
consul.agent.service.register(service).then(() => {
logger.info(`${bizService.name}服務已註冊`)
resolve(services)
}).catch(err => {
console.log(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
module.exports = class ServerRegister {
constructor() {
this.register = register
}
}
複製代碼
保證runtime中存在consul和mongodb服務後,clone該倉庫Demo,cd到工程根目錄下,運行node src便可。
server-register.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 13:58:22 * @Description 微服務註冊方法 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
const logger = new (require('./logger'))().generateLogger()
// 註冊服務方法定義
function register({ consulServer = {}, bizService = {} } = {}) {
if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
const service = defaultConf.bizService
service.name = generateServiceName(bizService.name)
service.id = service.name
service.address = bizService.host
service.port = bizService.port
service.check.http = generateCheckHttp(bizService.host, bizService.port)
service.check.notes = JSON.stringify(bizService.meta)
return new Promise((resolve, reject) => {
consul.agent.service.list().then(services => {
// 檢查主機+端口是否已被佔用
Object.keys(services).some(key => {
if (services[key].Address === service.address && services[key].Port === service.port) {
throw new Error(`該服務集羣endpoint[${service.address}, ${service.port}]已被佔用!`)
}
})
// 註冊集羣服務
consul.agent.service.register(service).then(() => {
logger.info(`${bizService.name}服務註冊成功`)
resolve(services)
}).catch(err => {
console.log(err)
})
}).catch(err => {
throw new Error(err)
})
})
}
module.exports = class ServerRegister {
constructor() {
this.register = register
}
}
複製代碼
account-server/src/index.js
const vastify = require('vastify')
const version = require('../package.json').version
const microservicePort = 10015
const httpPort = 3333
// 註冊服務
vastify.ServerRegister.register({
bizService: {
name: 'account-server',
host: '127.0.0.1',
port: httpPort,
meta: {
$$version: version,
$$microservicePort: microservicePort
}
}
})
複製代碼
改造以前的user模塊,偷個懶就不貼代碼了,具體請查看Demo
microRouting.js
/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 16:22:02 * @Description 微服務內部路由中間件,暫不支持自定義路由匹配策略 */
'use strict'
const Consul = require('consul')
const defaultConf = require('../config')
const { ObjectDeepSet, isNumber } = require('../helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul')
const logger = new (require('../tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('../helper/regex')
let services = {}
let consul = null
/** * @author Cecil0o0 * @description 同步consul服務中心的全部可用服務以及對應check並組裝成對象以方便取值 */
function syncCheckList () {
return new Promise((resolve, reject) => {
consul.agent.service.list().then(allServices => {
if (Object.keys(allServices).length > 0) {
services = allServices
consul.agent.check.list().then(checks => {
Object.keys(checks).forEach(key => {
allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
})
resolve(services)
}).catch(err => {
throw new Error(err)
})
} else {
const errmsg = '未發現可用服務'
logger.warn(errmsg)
reject(errmsg)
}
}).catch(err => {
throw new Error(err)
})
})
}
function syncRoutingRule(senecaInstance = {}, services = {}) {
Object.keys(services).forEach(key => {
let service = services[key]
let name = getServiceNameByServiceKey(key)
let $$addr = service.Address
let $$microservicePort = ''
let $$version = ''
try {
let base = JSON.parse(service.check.Notes)
$$microservicePort = base.$$microservicePort
$$version = base.$$version
} catch (e) {
logger.warn(`服務名爲${serviceName}。該服務check.Notes爲非標準JSON格式,程序已忽略。請檢查服務註冊方式(請確保調用ServerRegister的register來註冊服務)`)
}
if (IPV4_REGEX.test($$addr) && isNumber($$microservicePort)) {
if (service.check.Status === 'passing') {
senecaInstance.client({
host: $$addr,
port: $$microservicePort,
pin: {
$$version,
$$target: name
}
})
} else {
logger.warn(`${$$target}@${$$version || '無'}服務處於critical,所以沒法使用`)
}
} else {
logger.warn(`主機(${$$addr})或微服務端口號(${$$microservicePort})有誤,請檢查`)
}
})
}
function startTimeInterval() {
setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}
function microRouting(consulServer) {
var self = this
consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
syncCheckList().then(services => {
syncRoutingRule(self, services)
})
}
module.exports = microRouting
複製代碼
在保證有consul與mongodb的runtime後,請結合這兩個config-server,account-server Demo進行測試。
[未完待續....]