nodejs微服務框架解決方案

前言

seneca是一個nodejs微服務工具集,它賦予系統易於連續構建和更新的能力。下面會逐一和你們一塊兒瞭解相關技術入門以及實踐。html

這裏插入一段硬廣。小子再進行簡單整合以後擼了個vastify框架 ---- 輕量級nodejs微服務框架,有興趣的同窗過目一下,歡迎順手star一波,另外有疑問或者代碼有毛病歡迎在博文下方留言。node

環境

  • 基礎環境
"node": "^10.0.0"
"npm": "^6.0.0"
"pm2": "^2.10.3"
"rabbitmq": "^3.7.5"
"consul": "^1.1.0"
"mongodb": "^3.6"
複製代碼
  • 微服務工程
"bluebird": "^3.5.1"
"koa": "^2.5.1"
"koa-router": "^7.4.0"
"seneca": "^3.4.3"
"seneca-web": "^2.2.0"
"seneca-web-adapter-koa2": "^1.1.0"
"amqplib": "^0.5.2"
"winston": "^2.4.2"
"mongoose": "^5.1.2"
複製代碼

FEATURES

  • 模式匹配作服務間調用:略微不一樣於SpringCloud服務發現(http協議、IP + PORT模式),它使用更加靈活的模式匹配(Patrun模塊)原則去進行微服務間的調用
  • 接入koa2對C端提供RESTFUl API
  • 插件:更靈活編寫小而微的可複用模塊
  • seneca內置日誌輸出
  • 第三方日誌庫比較winston(選用)、bunyan、log4js
  • RabbitMQ消息隊列
  • PM2:node服務部署(服務集羣)、管理與監控
  • PM2:自動化部署
  • PM2集成docker
  • 請求追蹤(重建用戶請求流程)
  • 梳理Consul 服務註冊與發現基本邏輯
  • 框架集成node-consul
  • mongodb持久化存儲
  • 結合seneca與consul的路由服務中間件(可支持多個相同名字服務集羣路由,經過$$version區別)
  • 支持流處理(文件上傳/下載等)
  • jenkins自動化部署
  • nginx負載均衡
  • 持續集成方案
  • redis緩存
  • prisma提供GraphQL接口

模式匹配(Patrun模塊)

index.js(accout-server/src/index.js)nginx

const seneca = require('seneca')()

seneca.use('cmd:login', (msg, done) => {
	const { username, pass } = msg
	if (username === 'asd' && pass === '123') {
		return done(null, { code: 1000 })
	}
	return done(null, { code: 2100 })
})

const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: 'seneca' })

act({
	cmd: 'login',
	username: 'asd',
	pass: '123'
}).then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})
複製代碼

執行後git

{ code: 1000 }
{"kind":"notice","notice":"hello seneca k5i8j1cvw96h/1525589223364/10992/3.4.3/-","level":"info","seneca":"k5i8j1cvw96h/1525589223364/10992/3.4.3/-","when":1525589223563}
複製代碼

seneca.add方法,添加一個action pattern到Seneca實例中,它有三個參數:github

  1. pattern: 用於Seneca中JSON的消息匹配模式,對象或格式化字符串
  2. sub_pattern: 子模式,優先級低於主模式(可選)
  3. action: 當匹配成功後的動做函數

seneca.act方法,執行Seneca實例中匹配成功的動做,它也有兩個參數:web

  1. msg: JSON消息
  2. sub_pattern: 子消息,優先級低於主消息(可選)
  3. response: 用於接收服務調用結果

seneca.use方法,爲Seneca實例添加一個插件,它有兩個參數:(此處插件的原理和中間件有一些不一樣)redis

  1. func: 插件執行方法
  2. options: 插件所需options(可選)

核心是利用JSON對象進行模式匹配。這個JSON對象既包含某個微服務所須要調取另外一個微服務的特徵,同時也包含傳參。和Java微服務發現有些相似不過是用模式代替ip+port,目前爲止模式是徹底能夠實現服務發現功能,可是否更加靈活還有待去挖掘。mongodb

所需注意的點docker

  • 各微服務之間模式需經過設計來區分

啓動第一個微服務

index.js(config-server/src/index.js)shell

const seneca = require('seneca')()
const config = {
SUCCESS_NORMAL_RES: {
    code: 1000,
    desc: '服務端正常響應'
}}

seneca.add('$target$:config-server', (msg, done) => {
  return done(null, config)
}).listen(10011)
複製代碼

運行此腳本後可在瀏覽器中輸入http://localhost:10011/act?cmd=config發起請求獲取全局配置信息

OR

const seneca = require('seneca')()
const Promise = require('bluebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client(10011)
act('$$target:config-server, default$:{msg:404}').then(res => {
  console.log(res)
}).catch(err => {
  console.log(err)
})
複製代碼

對內:多個微服務相互調用(關鍵)

noname-server

const seneca = require('seneca')()
seneca.add('$$target:account-server', (msg, done) => {
	done(null, { seneca: '666' })
})
seneca.listen(10015)
複製代碼

config-server(同上)

call

const seneca = require('seneca')()
const Promise = require('blurebird')

const act = Promise.promisify(seneca.act, { context: seneca })

seneca.client({
	port: '10011',
	pin: '$$target:account-server'
})
seneca.client({
	port: '10015',
	pin: '$$target:noname-server'
})

act('$$target:account-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

act('$$target:noname-server').then(res => {
	console.log(res)
}).catch(err => {
	console.log(err)
})

複製代碼

對外:提供REST服務(關鍵)

集成koa

const seneca = require('seneca')()
const Promise = require('bluebird')
const SenecaWeb = require('seneca-web')
const Koa = require('koa')
const Router = require('koa-router')
const app = new Koa()
const userModule = require('./modules/user.js')

// 初始化用戶模塊
seneca.use(userModule.init)

// 初始化seneca-web插件,並適配koa
seneca.use(SenecaWeb, {
  context: Router(),
  adapter: require('seneca-web-adapter-koa2'),
  routes: [...userModule.routes]
})

// 將routes導出給koa app
seneca.ready(() => {
  app.use(seneca.export('web/context')().routes())
})

app.listen(3333)
複製代碼

user模塊

const $module = 'module:user'
let userCount = 3

const REST_Routes = [
  {
    prefix: '/user',
    pin: `${$module},if:*`,
    map: {
      list: {
        GET: true,
        name: ''
      },
      load: {
        GET: true,
        name: '',
        suffix: '/:id'
      },
      edit: {
        PUT: true,
        name: '',
        suffix: '/:id'
      },
      create: {
        POST: true,
        name: ''
      },
      delete: {
        DELETE: true,
        name: '',
        suffix: '/:id'
      }
    }
  }
]

const db = {
  users: [{
    id: 1,
    name: '甲'
  }, {
    id: 2,
    name: '乙'
  }, {
    id: 3,
    name: '丙'
  }]
}

function user(options) {
  this.add(`${$module},if:list`, (msg, done) => {
    done(null, db.users)
  })
  this.add(`${$module},if:load`, (msg, done) => {
    const { id } = msg.args.params
    done(null, db.users.find(v => Number(id) === v.id))
  })
  this.add(`${$module},if:edit`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const { name } = msg.args.body
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1, {
        id,
        name
      })
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
  this.add(`${$module},if:create`, (msg, done) => {
    const { name } = msg.args.body
    db.users.push({
      id: ++userCount,
      name
    })
    done(null, db.users)
  })
  this.add(`${$module},if:delete`, (msg, done) => {
    let { id } = msg.args.params
    id = +id
    const index = db.users.findIndex(v => v.id === id)
    if (index !== -1) {
      db.users.splice(index, 1)
      done(null, db.users)
    } else {
      done(null, { success: false })
    }
  })
}

module.exports = {
  init: user,
  routes: REST_Routes
}
複製代碼

vscode-restclient(vscode的restclient插件,用於發起RESTFUL請求)

### 1
POST http://localhost:3333/user HTTP/1.1
Content-Type: application/json

{
  "name": "測試添加用戶"
}

### delete
DELETE http://localhost:3333/user/2 HTTP/1.1

### PUT
PUT http://localhost:3333/user/2 HTTP/1.1
Content-Type: application/json

{
  "name": "測試修改用戶信息"
}

### GET
GET http://localhost:3333/user HTTP/1.1

### GET
GET http://localhost:3333/user/3 HTTP/1.1
複製代碼

seneca內置日誌輸出

可在構造函數中傳入配置,log屬性能夠控制日誌級別

例1:傳字符串

require('seneca')({
	// quiet silent any all print standard test
	log: 'all'
})

複製代碼

例2:傳對象

require('seneca')({
	log: {
		// none debug+ info+ warn+
		level: 'debug+'
	},
	// 設置爲true時,seneca日誌功能會encapsulate senecaId,senecaTag,actId等字段後輸出(通常爲兩字符)
	short: true
})
複製代碼

建議例2代碼,由於seneca-web-adapter-koa2插件打印的日誌level爲debug,利於作web接口訪問日誌記錄。

winston日誌模塊

傳送門

Logger.js

const { createLogger, format, transports } = require('winston')
const { combine, timestamp, label, printf } = format

const logger = createLogger({
  level: 'info',
  format: combine(
    label({label: 'microservices'}),
    timestamp(),
    printf(info => {
      return `${info.timestamp} [${info.label}] ${info.level}: ${info.message}`
    })
  ),
  transports: [ new transports.Console() ]
})

// highest to lowest
const levels = {
  error: 0,
  warn: 1,
  info: 2,
  verbose: 3,
  debug: 4,
  silly: 5
}

module.exports = logger
複製代碼

日誌輸出格式

2018-05-17T14:43:28.330Z [microservices] info: 接收到rpc客戶端的調用請求
2018-05-17T14:43:28.331Z [microservices] warn: warn message
2018-05-17T14:43:28.331Z [microservices] error: error message
複製代碼

RabbitMQ消息隊列服務

1. 單任務單consumer,生產者消費者模式

producer.js

// 建立一個amqp對等體
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'
    const msg = process.argv.slice(2).join(' ') || 'hello world'

    // 爲方式RabbitMQ退出或者崩潰時重啓後丟失隊列信息,這裏配置durable:true(同時在消費者腳本中也要配置durable:true)後,
    ch.assertQueue(q, { durable: true })
    // 這裏配置persistent:true,經過閱讀官方文檔,我理解爲當程序重啓後,會斷點續傳以前未send完成的數據消息。(但此功能並不可靠,由於不會爲全部消息執行同步IO,會緩存在cache並在某個恰當時機write到disk)
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    setTimeout(() => {
      conn.close(); process.exit(0)
    }, 100)
  })
})
複製代碼
// 建立一個amqp對等體
const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'taskQueue1'

    // 爲方式RabbitMQ退出或者崩潰時重啓後丟失隊列信息,這裏配置durable:true(同時在消費者腳本中也要定義durable:true)後,
    ch.assertQueue(q, { durable: true })
    ch.prefetch(1)
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q)
    ch.consume(q, msg => {
      const secs = msg.content.toString().split('.').length - 1
      console.log(" [x] Received %s", msg.content.toString())
      setTimeout(() => {
        console.log(" [x] Done")
        ch.ack(msg)
      }, secs * 1000)
    })
    // noAck配置(默認爲false)代表consumer是否須要在處理完後反饋ack給producer,若是設置爲true,則RabbitMQ服務若是將任務send至此consumer後不關心任務實際處理結果,send任務後直接標記已完成;不然,RabbiMQ獲得ack反饋後才標記爲已完成,若是一直未收到ack默認會一直等待ack而後標記,另外若是接收到nack或者該consumer進程退出則繼續dispatcher任務
  })
})
複製代碼

檢驗過程

  • 執行rabbitmqctl list_queues查看當前隊列
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
複製代碼
  • node producer.js(rabbitMQ執行過程爲會先建立一個匿名exchange,一個指定queue而後將queue與該匿名exchange綁定)

  • rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        taskQueue1      queue   taskQueue1      []
複製代碼
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      1
複製代碼
  • node consumer.js
Waiting for messages in taskQueue1. To exit press CTRL+C
[x] Received hello world
[x] Done
複製代碼
  • rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
taskQueue1      0
複製代碼

知識點

  • 生產者消費者模式(一個生產者的消息在同一時間只交由一個消費者處理)
  • ACK機制(rabbitmq的確認機制)
  • 建立隊列{durable:true}以及向隊列發送消息{persistent:true}(消息持久化存儲,但不徹底能保證,好比當某消息未從緩存中寫到磁盤中而程序崩潰時則會丟失)
  • Round-robin Dispatch(公平分發)
  • 處理窗口控制(prefetch來控制分發窗口)
  • 異步多任務處理機制(好比一個大任務分解,分而治之)
  • 整個消息流流程(某個生產者進程 -> 匿名exchange -> 經過binding -> 指定queue -> 某一個消費者進程)

2. 單任務多consumer,發佈/訂閱模式(全消息模型)

publisher.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'
    const msg = process.argv.slice(2).join(' ') || 'Hello World!'

    // ex爲exchange名稱(惟一)
    // 模式爲fanout
    // 不對消息持久化存儲
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 第二個參數爲指定某一個binding,如爲空則由RabbitMQ隨機指定
    ch.publish(ex, '', Buffer.from(msg))
    console.log(' [x] Send %s', msg)
  })

  setTimeout(() => {
    conn.close()
    process.exit(0)
  }, 100)
})
複製代碼

subscriber.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ex = 'logs'

    // ex -> exchange是發佈/訂閱消息的載體,
    // fanout -> 分發消息的模式,fanout,direct,topic,headers
    // durable設置爲false下降一些可靠性,提升性能,由於不須要磁盤IO持久化存儲消息,另外
    ch.assertExchange(ex, 'fanout', { durable: false })
    // 使用匿名(也就是RabbitMQ自動生成隨機名的queue)隊列
    // exclusive設置爲true,便可以當其寄生的connection被close的時候自動deleted
    ch.assertQueue('', { exclusive: true }, (err, q) => {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue)
      // 綁定隊列到某個exchange載體(監聽某個exchange的消息)
      // 第三個入參爲binding key
      ch.bindQueue(q.queue, ex, '')
      // 消費即訂閱某個exchange的消息並設置處理句柄
      // 由於發佈/訂閱消息的模式就是非可靠性,只有當訂閱者訂閱才能收到相關的消息並且發佈者不關心該消息的訂閱者是誰以及處理結果如何,因此這裏noAck會置爲true
      ch.consume(q.queue, (msg) => {
        console.log(' [x] %s', msg.content.toString())
      }, { noAck: true })
    })
  })
})

複製代碼

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空以前測試使用的queues、echanges、bindings)

node subscriber.js

[*] Waiting for messages in amq.gen-lgNW51IeEfj9vt1yjMUuaw. To exit press CTRL+C
複製代碼

rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
logs    fanout
複製代碼

rabbitmqctl list_bindings

Listing bindings for vhost /...
        exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue   amq.gen-jDbfwJR8TbSNJT2a2a83Og  []
logs    exchange        amq.gen-jDbfwJR8TbSNJT2a2a83Og  queue           []
複製代碼

node publisher.js tasks.........

[x] Send tasks......... // publiser.js

[x] tasks......... // subscriber.js
複製代碼

知識點

  • 發佈/訂閱模式(發佈者將消息以一對多的形式發送給訂閱者處理)
  • noAck(此模式下推薦用非Ack機制,由於發佈者每每不須要訂閱者如何處理消息以及其結果)
  • durable:false(此模式下推薦不須要作數據持久化存儲,緣由如上)
  • exchange的工做模式(即路由類型,fanout,direct,topic,headers等,下節會講解到)
  • 整個消息流流程(某個發佈者進程 -> 指定exchange -> 經過binding以及工做模式 -> 某個或多個匿名queue即訂閱者進程)

3. Direct Routing

exchange.js

module.exports = {
  name: 'ex1',
  type: 'direct',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}
複製代碼

direct-routing.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {

    ch.assertExchange(ex.name, ex.type, ex.options)
    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})
複製代碼

subscriber.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      // 聲明一個非匿名queue
      ch.assertQueue(`${rank}-queue`, { exclusive: false }, (err, q) => {
        ch.bindQueue(q.queue, ex.name, rank)
        ch.consume(q.queue, msg => {

          console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})
複製代碼

publisher.js

const amqp = require('amqplib/callback_api')
const ex = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const ranks = ex.ranks

    ranks.forEach(rank => {
      ch.publish(ex.name, rank, Buffer.from(`${rank} logs...`))
    })

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})
複製代碼

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空以前測試使用的queues、echanges、bindings)

node direct-routing.js rabbitmqctl list_exchanges

Listing exchanges for vhost / ...
amq.headers	headers
ex1	direct
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.topic	topic
	direct
amq.direct	direct
amq.match	headers
複製代碼

node subscriber.js rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
severity-queue	0
error-queue	0
info-queue	0
warning-queue	0

Listing bindings for vhost /...
	exchange	error-queue	queue	error-queue	[]
	exchange	info-queue	queue	info-queue	[]
	exchange	severity-queue	queue	severity-queue	[]
	exchange	warning-queue	queue	warning-queue	[]
ex1	exchange	error-queue	queue	error	[]
ex1	exchange	info-queue	queue	info	[]
ex1	exchange	severity-queue	queue	severity	[]
ex1	exchange	warning-queue	queue	warning	[]
複製代碼

node publisher.js

[x] info: 'info logs...'
 [x] error: 'error logs...'
 [x] severity: 'severity logs...'
 [x] warning: 'warning logs...'
複製代碼

知識點

  • 路由key,用於exchange的direct工做模式下消息的路由
  • 每當assertQueue時,該queue會在以queue名稱看成路由key綁定到匿名exchange
  • 可用於日誌不一樣級別的log處理

4. Topic Routing

exchange.js

module.exports = {
  name: 'ex2',
  type: 'topic',
  option: {
    durable: false
  },
  ranks: ['info', 'error', 'warning', 'severity']
}
複製代碼

topic-routing.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    ch.assertExchange(exchangeConfig.name, exchangeConfig.type, exchangeConfig.option)

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})
複製代碼

subscriber.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const keys = (args.length > 0) ? args : ['anonymous.info']

    console.log(' [*] Waiting for logs. To exit press CTRL+C');
    keys.forEach(key => {
      ch.assertQueue('', { exclusive: true }, (err, q) => {
        console.log(` [x] Listen by routingKey ${key}`)
        ch.bindQueue(q.queue, exchangeConfig.name, key)

        ch.consume(q.queue, msg => {
          console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
        }, { noAck: true })
      })
    })
  })
})
複製代碼

publisher.js

const amqp = require('amqplib/callback_api')
const exchangeConfig = require('./exchange')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const args = process.argv.slice(2)
    const key = (args.length > 1) ? args[0] : 'anonymous.info'
    const msg = args.slice(1).join(' ') || 'hello world'

    ch.publish(exchangeConfig.name, key, Buffer.from(msg))

    setTimeout(() => {
      conn.close()
      process.exit(0)
    }, 0)
  })
})
複製代碼

檢驗過程

rabbitmqctl stop_app;rabbitmqctl reset;rabbitmqctl start_app(清空以前測試使用的queues、echanges、bindings)

node topic-routing.js

Listing exchanges for vhost / ...
amq.fanout	fanout
amq.rabbitmq.trace	topic
amq.headers	headers
amq.match	headers
ex2	topic
	direct
amq.topic	topic
amq.direct	direct
複製代碼

node subscriber.js "#.info" "*.error"

[*] Waiting for logs. To exit press CTRL+C
[x] Listen by routingKey #.info
[x] Listen by routingKey *.error
複製代碼
  • node publisher.js "account-server.info" "用戶服務測試"
  • node publisher.js "config-server.info" "配置服務測試"
  • node publisher.js "config-server.error" "配置服務出錯"
[x] account-server.info:'用戶服務測試'
[x] config-server.info:'配置服務測試'
[x] config-server.error:'配置服務出錯'
複製代碼

知識點

  • key最長爲255字節
  • #可匹配0或多個單詞,*可精確匹配1個單詞

5. RPC

rpc_server.js

const amqp = require('amqplib/callback_api')
const logger = require('./Logger')

let connection = null

amqp.connect('amqp://localhost', (err, conn) => {
  connection = conn
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'

    ch.assertQueue(q, { durable: true })
    ch.prefetch(2)

    ch.consume(q, msg => {
      let data = {}
      let primitiveContent = msg.content.toString()
      try {
        data = JSON.parse(primitiveContent)
      } catch (e) {
        logger.error(new Error(e))
      }
      logger.info('接收到rpc客戶端的調用請求')
      if (msg.properties.correlationId === '10abc') {
        logger.info(primitiveContent)
        const uid = Number(data.uid) || -1
        let r = getUserById(uid)
        ch.sendToQueue(msg.properties.replyTo, Buffer.from(JSON.stringify(r)), { persistent: true })
        ch.ack(msg)
      } else {
        logger.info('不匹配的調用請求')
      }
    })
  })
})

function getUserById (uid) {
  let result = ''

  if (uid === +uid && uid > 0) {
    result = {
      state: 1000,
      msg: '成功',
      data: {
        uid: uid,
        name: '小強',
        sex: 1
      }
    }
  } else {
    result = {
      state: 2000,
      msg: '傳參格式錯誤'
    }
  }

  return result
}

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})
複製代碼

rpc_client.js

const amqp = require('amqplib/callback_api')

amqp.connect('amqp://localhost', (err, conn) => {
  conn.createChannel((err, ch) => {
    const q = 'account_rpc_queue'
    const callback = 'callback_queue'

    ch.assertQueue(callback, { durable: true })
    ch.consume(callback, msg => {
      const result = msg.content.toString()
      console.log(`接收到回調的消息啦!`)
      console.log(result)
      ch.ack(msg)
      setTimeout(() => {
        conn.close()
        process.exit(0)
      }, 0)
    })

    ch.assertQueue(q, { durable: true })
    const msg = {
      uid: 2
    }
    ch.sendToQueue(q, Buffer.from(JSON.stringify(msg)), {
      persistent: true,
      correlationId: '10abc',
      replyTo: 'callback_queue'
    })
  })
})
複製代碼

檢驗過程

node rpc_server.js

rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
account_rpc_queue	0
複製代碼

node rpc_client.js

rpc_client的CLI打印

接收到回調的消息啦!
{"state":1000,"msg":"成功","data":{"uid":2,"name":"小強","sex":1}}
複製代碼

rpc_server的CLI打印

接收到rpc客戶端的調用請求
{ uid: 2 }
複製代碼

PM2:node服務部署(服務集羣)、管理與監控

pm2官網

啓動

pm2 start app.js

  • -w --watch:監聽目錄變化,如變化則自動重啓應用
  • --ignore-file:監聽目錄變化時忽略的文件。如pm2 start rpc_server.js --watch --ignore-watch="rpc_client.js"
  • -n --name:設置應用名字,可用於區分應用
  • -i --instances:設置應用實例個數,0與max相同
  • -f --force: 強制啓動某應用,經常用於有相同應用在運行的狀況
  • -o --output <path>:標準輸出日誌文件的路徑
  • -e --error <path>:錯誤輸出日誌文件的路徑
  • --env <path>:配置環境變量

pm2 start rpc_server.js -w -i max -n s1 --ignore-watch="rpc_client.js" -e ./server_error.log -o ./server_info.log

在cluster-mode,也就是-i max下,日誌文件會自動在後面追加-${index}保證不重複

其餘簡單且經常使用命令

pm2 stop app_name|app_id pm2 restart app_name|app_id pm2 delete app_name|app_id pm2 show app_name|app_id OR pm2 describe app_name|app_id pm2 list pm2 monit pm2 logs app_name|app_id --lines <n> --err

Graceful Stop

pm2 stop app_name|app_id

process.on('SIGINT', () => {
  logger.warn('SIGINT')
  connection && connection.close()
  process.exit(0)
})
複製代碼

當進程結束前,程序會攔截SIGINT信號從而在進程即將被殺掉前去斷開數據庫鏈接等等佔用內存的操做後再執行process.exit()從而優雅的退出進程。(如在1.6s後進程還未結束則繼續發送SIGKILL信號強制進程結束)

Process File

ecosystem.config.js

const appCfg = {
  args: '',
  max_memory_restart: '150M',
  env: {
    NODE_ENV: 'development'
  },
  env_production: {
    NODE_ENV: 'production'
  },
  // source map
  source_map_support: true,
  // 不合並日志輸出,用於集羣服務
  merge_logs: false,
  // 經常使用於啓動應用時異常,超時時間限制
  listen_timeout: 5000,
  // 進程SIGINT命令時間限制,即進程必須在監聽到SIGINT信號後必須在如下設置時間結束進程
  kill_timeout: 2000,
  // 當啓動異常後不嘗試重啓,運維人員嘗試找緣由後重試
  autorestart: false,
  // 不容許以相同腳本啓動進程
  force: false,
  // 在Keymetrics dashboard中執行pull/upgrade操做後執行的命令隊列
  post_update: ['npm install'],
  // 監聽文件變化
  watch: false,
  // 忽略監聽文件變化
  ignore_watch: ['node_modules']
}

function GeneratePM2AppConfig({ name = '', script = '', error_file = '', out_file = '', exec_mode = 'fork', instances = 1, args = "" }) {
  if (name) {
    return Object.assign({
      name,
      script: script || `${name}.js`,
      error_file: error_file || `${name}-err.log`,
      out_file: out_file|| `${name}-out.log`,
      instances,
      exec_mode: instances > 1 ? 'cluster' : 'fork',
      args
    }, appCfg)
  } else {
    return null
  }
}

module.exports = {
  apps: [
    GeneratePM2AppConfig({
      name: 'client',
      script: './rpc_client.js'
    }),

    GeneratePM2AppConfig({
      name: 'server',
      script: './rpc_server.js',
      instances: 1
    })
  ]
}
複製代碼

pm2 start ecosystem.config.js

避坑指南:processFile文件命名建議爲*.config.js格式。不然後果自負。

監控

請移步app.keymetrics.io

PM2:自動化部署

ssh準備

  1. ssh-keygen -t rsa -C 'qingf deployment' -b 4096
  2. 若是有多密鑰、多用戶狀況,建議配置~/.ssh/config文件,格式相似以下
// 用不一樣用戶對不一樣遠程主機發起ssh請求時指定私鑰
Host qingf.me
  User deploy
  IdentityFile ~/.ssh/qf_deployment_rsa
  // 設置爲no可去掉首次登錄(y/n)的選擇
  StrictHostKeyChecking no
// 別名用法
Host deployment
  User deploy
  Hostname qingf.me
  IdentityFile ~/.ssh/qingf_deployment_rsa
  StrictHostKeyChecking no
複製代碼
  1. 將公鑰複製到遠程(通常爲部署服務器)對應用戶目錄,好比/home/deploy/.ssh/authorized_keys文件(authorized_keys文件權限設置爲600)

配置ecosystem.config.js

與上述apps同級增長deploy屬性,以下

deploy: {
    production: {
        'user': 'deploy',
        'host': 'qingf.me',
        'ref': 'remotes/origin/master',
        'repo': 'https://github.com/Cecil0o0/account-server.git',
        'path': '/home/deploy/apps/account-server',
        // 生命週期鉤子,在ssh到遠端以後setup操做以前執行
        'pre-setup': '',
        // 生命週期鉤子,在初始化設置即git pull以後執行
        'post-setup': 'ls -la',
        // 生命週期鉤子,在遠端git fetch origin以前執行
        'pre-setup': '',
        // 生命週期鉤子,在遠端git修改HEAD指針到指定ref以後執行
        'post-deploy': 'npm install && pm2 startOrRestart deploy/ecosystem.config.js --env production',
        // 如下這個環境變量將注入到全部app中
        "env"  : {
          "NODE_ENV": "test"
        }
    }
}
複製代碼

tip:please make git working directory clean first!

此處若是不懂或者有疑問,請查閱Demo

而後前後執行如下兩條命令**(注意config文件路徑)**

  1. pm2 deploy deploy/ecosystem.config.js production setup
  2. pm2 deploy deploy/ecosystem.config.js production

其餘命令

pm2 deploy <configuration_file>

Commands:
    setup                run remote setup commands
    update               update deploy to the latest release
    revert [n]           revert to [n]th last deployment or 1
    curr[ent]            output current release commit
    prev[ious]           output previous release commit
    exec|run <cmd>       execute the given <cmd>
    list                 list previous deploy commits
    [ref]                deploy to [ref], the "ref" setting, or latest tag
複製代碼

推薦shell toolkit

oh my zsh

請求追蹤

如何?

  • 在seneca.add以及seneca.act中使用seneca.fixedargs['tx$']值做爲traceID標識處於某一條請求流程。另外seneca內置log系統會打印此值。

疑問?

seneca內置log系統如何作自定義日誌打印?

舒適提示:請以正常的http請求開始,由於通過測試若是微服務自主發起act,其seneca.fixedargs['tx$']值不一樣。

Consul 服務註冊與發現

Consul是一個分佈式集羣服務註冊發現工具,並具備健康檢查、分級式KV存儲、多數據中心等高級特性。

安裝

  • 可選擇使用預編譯的安裝包
  • 也可選擇克隆源碼後編譯安裝

基礎使用

  • 以開發模式快速啓動服務模式代理並開啓web界面訪問http://localhost:8500

consul agent -dev -ui

  • 編寫服務定義文件
{
  "service": {
	// 服務名,稍後用於query服務
    "name": "account-server",
	// 服務標籤
    "tags": ["account-server"],
	// 服務元信息
    "meta": {
      "meta": "for my service"
    },
	// 服務端口
    "port": 3333,
	// 不容許標籤覆蓋
    "enable_tag_override": false,
	// 腳本檢測作health checks 與-enable-script-checks=true配合使用,有腳本模式、TCP模式、HTTP模式、TTL模式
    "checks": [
      {
        "http": "http://localhost:3333/user",
        "interval": "10s"
      }
    ]
  }
}
複製代碼
  • query定義的account-server服務

curl http://localhost:8500/v1/catalog/service/account-server

[
    {
        "ID": "e66eb1ff-460c-e63f-b4ac-0cb42daed19c",
        "Node": "haojiechen.local",
        "Address": "127.0.0.1",
        "Datacenter": "dc1",
        "TaggedAddresses": {
            "lan": "127.0.0.1",
            "wan": "127.0.0.1"
        },
        "NodeMeta": {
            "consul-network-segment": ""
        },
        "ServiceID": "account-server",
        "ServiceName": "account-server",
        "ServiceTags": [
            "account-server"
        ],
        "ServiceAddress": "",
        "ServiceMeta": {
            "meta": "for my service"
        },
        "ServicePort": 3333,
        "ServiceEnableTagOverride": false,
        "CreateIndex": 6,
        "ModifyIndex": 6
    }
]
複製代碼

生產級別使用(分佈式集羣)

某一個結點啓動一個server模式代理,以下

consul agent -server -bootstrap-expect=1 \
	-data-dir=/tmp/consul -node=agent-one -bind=valid extranet IP \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
複製代碼

查看集羣成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
複製代碼

另外一個結點啓動一個client模式代理,以下

consul agent \
	-data-dir=/tmp/consul -node=agent-two -bind=139.129.5.228 \
	-enable-script-checks=true -config-dir=/usr/local/etc/consul.d
複製代碼

查看集羣成員

consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>
複製代碼

加入Cluster

consul join 139.129.5.228 consul members

Node       Address         Status  Type    Build  Protocol  DC   Segment
agent-one  valid extranet IP:8301  alive   server  1.1.0  2         dc1  <all>
agent-two  139.129.5.228:8301  alive   server  1.1.0  2         dc1  <all>
複製代碼

集成node-consul

config.js

// 服務註冊與發現
// https://github.com/silas/node-consul#catalog-node-services
  'serverR&D': {
    consulServer: {
      type: 'consul',
      host: '127.0.0.1',
      port: 8500,
      secure: false,
      ca: [],
      defaults: {
        token: ''
      },
      promisify: true
    },
    bizService: {
      name: 'defaultName',
      id: 'defaultId',
      address: '127.0.0.1',
      port: 1000,
      tags: [],
      meta: {
        version: '',
        description: '註冊集羣'
      },
      check: {
        http: '',
        // check間隔時間(ex: 15s)
        interval: '10s',
        // check超時時間(ex: 10s)
        timeout: '2s',
        // 處於臨界狀態後自動註銷服務的超時時間
        deregistercriticalserviceafter: '30s',
        // 初始化狀態值爲成功
        status: 'passing',
        // 備註
        notes: '{"version":"111","microservice-port":1115}'
      }
    }
  }
複製代碼

server-register.js

/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 11:26:49 * @Description 微服務註冊方法 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')

// 註冊服務

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
  if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 檢查主機+端口是否已被佔用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`該服務集羣endpoint[${service.address}, ${service.port}]已被佔用!`)
        }
      })
      // 註冊集羣服務
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服務已註冊`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}
複製代碼

驗證

保證runtime中存在consul和mongodb服務後,clone該倉庫Demo,cd到工程根目錄下,運行node src便可。

框架集成node-consul

server-register.js

/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 13:58:22 * @Description 微服務註冊方法 */
const defaultConf = require('../config')['serverR&D']
const { ObjectDeepSet, isString } = require('../helper/utils')
const Consul = require('consul')
const { generateServiceName, generateCheckHttp } = require('../helper/consul')
const logger = new (require('./logger'))().generateLogger()

// 註冊服務方法定義

function register({ consulServer = {}, bizService = {} } = {}) {
  if (!bizService.name && isString(bizService.name)) throw new Error('name is invalid!')
  if (bizService.port !== +bizService.port) throw new Error('port is invalid!')
  if (!bizService.host && isString(bizService.host)) throw new Error('host is invalid!')
  if (!bizService.meta.$$version) throw new Error('meta.$$version is invalid!')
  if (!bizService.meta.$$microservicePort) throw new Error('meta.$$microservicePort is invalid!')
  const consul = Consul(ObjectDeepSet(defaultConf.consulServer, consulServer))
  const service = defaultConf.bizService
  service.name = generateServiceName(bizService.name)
  service.id = service.name
  service.address = bizService.host
  service.port = bizService.port
  service.check.http = generateCheckHttp(bizService.host, bizService.port)
  service.check.notes = JSON.stringify(bizService.meta)

  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(services => {
      // 檢查主機+端口是否已被佔用
      Object.keys(services).some(key => {
        if (services[key].Address === service.address && services[key].Port === service.port) {
          throw new Error(`該服務集羣endpoint[${service.address}, ${service.port}]已被佔用!`)
        }
      })
      // 註冊集羣服務
      consul.agent.service.register(service).then(() => {
        logger.info(`${bizService.name}服務註冊成功`)
        resolve(services)
      }).catch(err => {
        console.log(err)
      })
    }).catch(err => {
      throw new Error(err)
    })
  })
}

module.exports = class ServerRegister {
  constructor() {
    this.register = register
  }
}
複製代碼

account-server/src/index.js

const vastify = require('vastify')
const version = require('../package.json').version
const microservicePort = 10015
const httpPort = 3333

// 註冊服務
vastify.ServerRegister.register({
  bizService: {
    name: 'account-server',
    host: '127.0.0.1',
    port: httpPort,
    meta: {
      $$version: version,
      $$microservicePort: microservicePort
    }
  }
})
複製代碼

Mongodb持久化存儲

  • 框架使用mongoose作mongoClient,固然你也能夠選用原生nodejs mongoClient。

改造以前的user模塊,偷個懶就不貼代碼了,具體請查看Demo

結合seneca以及consul的路由服務中間件

microRouting.js

/* * @Author: Cecil * @Last Modified by: Cecil * @Last Modified time: 2018-06-02 16:22:02 * @Description 微服務內部路由中間件,暫不支持自定義路由匹配策略 */
 'use strict'

const Consul = require('consul')
const defaultConf = require('../config')
const { ObjectDeepSet, isNumber } = require('../helper/utils')
const { getServiceNameByServiceKey, getServiceIdByServiceKey } = require('../helper/consul')
const logger = new (require('../tools/logger'))().generateLogger()
const { IPV4_REGEX } = require('../helper/regex')

let services = {}
let consul = null

/** * @author Cecil0o0 * @description 同步consul服務中心的全部可用服務以及對應check並組裝成對象以方便取值 */
function syncCheckList () {
  return new Promise((resolve, reject) => {
    consul.agent.service.list().then(allServices => {
      if (Object.keys(allServices).length > 0) {
        services = allServices
        consul.agent.check.list().then(checks => {
          Object.keys(checks).forEach(key => {
            allServices[getServiceIdByServiceKey(key)]['check'] = checks[key]
          })
          resolve(services)
        }).catch(err => {
          throw new Error(err)
        })
      } else {
        const errmsg = '未發現可用服務'
        logger.warn(errmsg)
        reject(errmsg)
      }
    }).catch(err => {
      throw new Error(err)
    })
  })
}

function syncRoutingRule(senecaInstance = {}, services = {}) {
  Object.keys(services).forEach(key => {
    let service = services[key]
    let name = getServiceNameByServiceKey(key)
    let $$addr = service.Address
    let $$microservicePort = ''
    let $$version = ''
    try {
      let base = JSON.parse(service.check.Notes)
      $$microservicePort = base.$$microservicePort
      $$version = base.$$version
    } catch (e) {
      logger.warn(`服務名爲${serviceName}。該服務check.Notes爲非標準JSON格式,程序已忽略。請檢查服務註冊方式(請確保調用ServerRegister的register來註冊服務)`)
    }

    if (IPV4_REGEX.test($$addr) && isNumber($$microservicePort)) {
      if (service.check.Status === 'passing') {
        senecaInstance.client({
          host: $$addr,
          port: $$microservicePort,
          pin: {
            $$version,
            $$target: name
          }
        })
      } else {
        logger.warn(`${$$target}@${$$version || '無'}服務處於critical,所以沒法使用`)
      }
    } else {
      logger.warn(`主機(${$$addr})或微服務端口號(${$$microservicePort})有誤,請檢查`)
    }
  })
}


function startTimeInterval() {
  setInterval(syncCheckList, defaultConf.routing.servicesRefresh)
}

function microRouting(consulServer) {
  var self = this
  consul = Consul(ObjectDeepSet(defaultConf['serverR&D'].consulServer, consulServer))
  syncCheckList().then(services => {
    syncRoutingRule(self, services)
  })
}

module.exports = microRouting
複製代碼

在保證有consul與mongodb的runtime後,請結合這兩個config-serveraccount-server Demo進行測試。

[未完待續....]

相關文章
相關標籤/搜索