使用Server Sent Events製做一個代碼在線運行工具

¿

最近嘗試製做了一個可以在線運行代碼的工具: Code-Runner, 踩坑很多, 作一個總結html

涉及到的技術/模塊以下:node

  • 服務端推送技術Server Sent Events, 下文簡稱SSEgit

  • Node.js模塊:github

    • child_process模塊
    • stream模塊
  • Docker幾條簡單的命令docker

    • docker run
    • docker pull
    • docker kill
  • Koa2: 順便用用, 不是核心後端

  • pug: 渲染模板bash

參考服務器

阮一峯: Server-Sent Events 教程socket

優雅的終止docker容器async

1. SSE 用做服務端推送

嚴格地說,HTTP 協議沒法作到服務器主動推送信息。可是,有一種變通方法, 就是服務器向客戶端聲明,接下來要發送的是流信息(streaming)。這時, 客戶端 不會關閉鏈接, 而是會一直等着服務器發過來的新數據流。

1.1 SSE 和 WebSocket 對比

相對於WebSocket, 它有以下的特色:

  • SSE使用HTTP協議; WebSocket是一個獨立的協議

  • SSE使用簡單, 輕量級; WebSocket你確定要引入socket.io, ws之類的庫

  • SSE默認支持斷線重連; WebSocket須要本身實現

  • SSE通常用來傳輸文本; WebSocket默認支持傳送二進制數據

若是僅僅須要服務端推送這個功能的話, 使用SSE的開發成本是最低的, 兼容性以下

SSE-兼容性

1.2 SSE 最簡單接入

假設接口地址爲/sse, 服務端代碼(以Koa2爲例)爲:

const { PassThrough } = require('stream')

router.get('/sse', ctx => {
  const stream = new PassThrough()
  setInterval(() => { stream.write(`: \n\n`) }, 5000)
    
  ctx.set({ 'Content-Type':'text/event-stream' })
  ctx.body = stream
})
複製代碼

客戶端

const eventSource = new EventSource('/sse')
複製代碼

把空行算上, 僅僅 10 行代碼, 就完成了先後端的SSE鏈接, 效果以下

1.3 SSE 事件流格式

事件流僅僅是一個簡單的文本流數據,文本應該使用UTF-8格式的編碼,每條消息後面都有一由一個空行做爲分隔符 以冒號,以冒號開頭的行爲註釋行,會被忽略。

註釋行能夠用來防止鏈接超時,服務器能夠按期發送一條消息註釋行,以保持鏈接不斷

這樣看不夠直接,代碼表述以下:

// 發送註釋行
stream.write(`: \n\n`)

// 發送自定義事件test, 數據爲字符串: this is a test message
stream.write(`event: test\ndata: this is a test message\n\n`)

// 發送自定義事件test1, 數據爲一個對象: { msg: 'this is a test message' }
stream.write(`event: test1\ndata: ${JSON.stringify({ msg: 'this is a test message' })}\n\n`)
複製代碼

客戶端監聽自定義事件:

const eventSource = new EventSource('/sse')

eventSource.addEventListener('test', event => {
  console.log(event.data) // this is a test message
})
複製代碼

沒錯, 就是這麼簡單

2. Code-Runner 具體實現

2.1 總體流程

  1. 客戶端發出GET /sse HTTP/1.1請求
  • 監聽自定義事件sse-connect: 得到一個身份標識id
  • 監聽自定義事件sse-message: 進度消息的推送, 例如鏡像拉取、代碼執行開始、代碼執行結束
  • 監聽自定義事件sse-result: 代碼執行結果的推送
  1. 用戶提交代碼POST /runner HTTP/1.1
  • 拉取鏡像, 例如: docker pull node:latest
  • 將用戶提交代碼寫入文件, 例如: /code/main-1.js
  • 啓動容器, 例如: docker run --rm --name runner-1 -v /code:/code node:latest node /code/main-1.js
  • 根據身份標識id, 將結果寫入對應的流
  • 關閉容器

2.2 SSE的封裝

封裝目標:

  • 能夠根據身份標識id得到對應的流

  • 對發送自定義事件的封裝

  • 對保持鏈接不斷的封裝

  • 維護一個實例表, 便於向對應的流推送消息

const { PassThrough } = require('stream')

const instanceMap = new Map()
let uid = 0

/** * Server Sent Events封裝 */
module.exports = class SSE {
  /** * 構造函數中初始化轉換流、身份標識、執行初始化方法 */
  constructor(options = {}) {
    this.stream = new PassThrough()
    this.uid = ++uid
    this.intervalTime = options.intervalTime || 5000
    this._init()
  }
  /** * 根據uid獲取SSE實例 */
  static getInstance(uid) {
    return instanceMap.get(+uid)
  }
  /** * 根據uid發送自定義事件 */
  static writeStream(uid, event, data) {
    const instance = this.getInstance(uid)

    if (instance) instance.writeStream(event, data)
  }
  /** * 初始化函數中記錄當前實例, 並保持長鏈接 */
  _init() {
    instanceMap.set(this.uid, this)

    this._writeKeepAliveStream()
    const timer = setInterval(() => { this._writeKeepAliveStream() }, this.intervalTime)

    this.stream.on('close', () => {
      clearInterval(timer)
      instanceMap.delete(this.uid)
    })
  }
  /** * 經過發送註釋消息保持長鏈接 */
  _writeKeepAliveStream() {
    this.stream.write(': \n\n')
  }
  /** * 發送自定義事件 */
  writeStream(event, data) {
    const payload = typeof data === 'string' ? data : JSON.stringify(data)

    this.stream.write(`event: ${event}\ndata: ${payload}\n\n`)
  }
}
複製代碼

封裝後, /sse接口代碼簡化爲:

router.get('/sse', ctx => {
    ctx.set({
      'Content-Type':'text/event-stream',
      'Cache-Control':'no-cache',
      'Connection': 'keep-alive'
    })
    const sse = new SSE()
    sse.writeStream('sse-connect', sse.uid)

    ctx.body = sse.stream
  })
複製代碼

2.3 限制容器的使用時長

執行用戶代碼, 須要限制容器的使用時長, 雖然一直有Issue: 給docker run命令增長timeout選項, 可是最佳的中止容器運行的方式仍是docker stop / docker kill

docker stop: 用docker stop命令來停掉容器的時候,docker默認會容許容器中的應用程序有10秒的時間用以終止運行, 若是等待時間達到設定的超時時間,或者默認的10秒,會繼續發送SIGKILL的系統信號強行kill掉進程

docker kill: 默認狀況下,docker kill命令不會給容器中的應用程序有任何gracefully shutdown的機會。 它會直接發出SIGKILL的系統信號,以強行終止容器中程序的運行

所以, 此處採用docker kill更符合需求, 中止容器的代碼以下:

/** * 中止Docker容器 * @description * exec方法中的timeout選項在執行「docker run」命令時無效, 所以採用「docker kill」命令來限制容器使用時長 * 經過docker kill, childProcess exitCode值爲137 * @param {string} containerName 容器名稱 * @param {number} timeout 限制使用時長 * @return {number} timer */
function stopDocker(containerName, timeout) {
  return setTimeout(async () => {
    try {
      await exec(`docker kill ${containerName}`)
    } catch (e) { }
  }, timeout)
}
複製代碼

注: 此處child_process.exectimeout選項並不能中止docker容器

2.4 流的方式得到輸出

child_process.exec方法執行命令返回的結果是buffer/string, 此處咱們須要使用流的方式, 就要使用child_process.spawn方法

容器啓動的部分代碼以下:

/** * 啓動Docker容器並使用流模式獲取輸出 * @param {object} dockerOptions 啓動docker的配置 */
function startDockerBySpawn(dockerOptions) {
  // 得到容器名, 鏡像名, 執行命令, 掛載卷
  const { containerName, imageName, execCommand, volume } = dockerOptions
  // 參數差別
  const commandStr = `docker run --rm --memory=50m --name ${containerName} -v ${volume} ${imageName} ${execCommand}`
  const [command, ...args] = commandStr.split(' ')
  // 啓動容器
  const childProcess = spawn(command, args)
  
  //...
}
複製代碼

容器啓動後, 能夠得到兩個流:

  • childProcess.stdout

  • childProcess.stderr

咱們須要把兩個流的數據組合起來, 而後將其轉換爲SSE數據格式, 最終寫入到目標流targetStream, 代碼以下:

const t = new SSETransform()
const transferStation = new PassThrough()

childProcess.stdout.pipe(transferStation)
childProcess.stderr.pipe(transferStation)

transferStation.pipe(t).pipe(targetStream, { end: false })
複製代碼

自定義的轉換流以下:

const { Transform } = require('stream')
/** * 自定義轉換流 * @description * 將child_process.stdout/stderr的可寫流轉換爲EventStream的格式 */
module.exports = class SSETransform extends Transform {
  constructor(eventName) {
    super()
    this.eventName = eventName || 'sse-result'
  }
  _transform(chunk, encoding, callback) {
    callback(null, `event: ${this.eventName}\ndata: ${JSON.stringify({ result: chunk.toString('utf8') })}\n\n`)
  }
}
複製代碼

一次代碼執行流程得到的數據以下圖所示

至此, 就完成了一個代碼在線運行工具的開發

相關文章
相關標籤/搜索