最近嘗試製做了一個可以在線運行代碼的工具: Code-Runner, 踩坑很多, 作一個總結html
涉及到的技術/模塊以下:node
服務端推送技術Server Sent Events
, 下文簡稱SSE
git
Node.js
模塊:github
child_process
模塊stream
模塊Docker
幾條簡單的命令docker
docker run
docker pull
docker kill
Koa2
: 順便用用, 不是核心後端
pug
: 渲染模板bash
參考服務器
阮一峯: Server-Sent Events 教程socket
優雅的終止docker容器async
嚴格地說,HTTP 協議沒法作到服務器主動推送信息。可是,有一種變通方法, 就是服務器向客戶端聲明,接下來要發送的是流信息(streaming)。這時, 客戶端 不會關閉鏈接, 而是會一直等着服務器發過來的新數據流。
相對於WebSocket
, 它有以下的特色:
SSE
使用HTTP
協議; WebSocket
是一個獨立的協議
SSE
使用簡單, 輕量級; WebSocket
你確定要引入socket.io
, ws
之類的庫
SSE
默認支持斷線重連; WebSocket
須要本身實現
SSE
通常用來傳輸文本; WebSocket
默認支持傳送二進制數據
若是僅僅須要服務端推送這個功能的話, 使用SSE
的開發成本是最低的, 兼容性以下
假設接口地址爲/sse
, 服務端代碼(以Koa2
爲例)爲:
const { PassThrough } = require('stream')
router.get('/sse', ctx => {
const stream = new PassThrough()
setInterval(() => { stream.write(`: \n\n`) }, 5000)
ctx.set({ 'Content-Type':'text/event-stream' })
ctx.body = stream
})
複製代碼
客戶端
const eventSource = new EventSource('/sse')
複製代碼
把空行算上, 僅僅 10 行代碼, 就完成了先後端的SSE
鏈接, 效果以下
事件流僅僅是一個簡單的文本流數據,文本應該使用
UTF-8
格式的編碼,每條消息後面都有一由一個空行做爲分隔符 以冒號,以冒號開頭的行爲註釋行,會被忽略。
註釋行能夠用來防止鏈接超時,服務器能夠按期發送一條消息註釋行,以保持鏈接不斷
這樣看不夠直接,代碼表述以下:
// 發送註釋行
stream.write(`: \n\n`)
// 發送自定義事件test, 數據爲字符串: this is a test message
stream.write(`event: test\ndata: this is a test message\n\n`)
// 發送自定義事件test1, 數據爲一個對象: { msg: 'this is a test message' }
stream.write(`event: test1\ndata: ${JSON.stringify({ msg: 'this is a test message' })}\n\n`)
複製代碼
客戶端監聽自定義事件:
const eventSource = new EventSource('/sse')
eventSource.addEventListener('test', event => {
console.log(event.data) // this is a test message
})
複製代碼
沒錯, 就是這麼簡單
GET /sse HTTP/1.1
請求sse-connect
: 得到一個身份標識idsse-message
: 進度消息的推送, 例如鏡像拉取、代碼執行開始、代碼執行結束sse-result
: 代碼執行結果的推送POST /runner HTTP/1.1
docker pull node:latest
/code/main-1.js
docker run --rm --name runner-1 -v /code:/code node:latest node /code/main-1.js
封裝目標:
能夠根據身份標識id
得到對應的流
對發送自定義事件的封裝
對保持鏈接不斷的封裝
維護一個實例表, 便於向對應的流推送消息
const { PassThrough } = require('stream')
const instanceMap = new Map()
let uid = 0
/** * Server Sent Events封裝 */
module.exports = class SSE {
/** * 構造函數中初始化轉換流、身份標識、執行初始化方法 */
constructor(options = {}) {
this.stream = new PassThrough()
this.uid = ++uid
this.intervalTime = options.intervalTime || 5000
this._init()
}
/** * 根據uid獲取SSE實例 */
static getInstance(uid) {
return instanceMap.get(+uid)
}
/** * 根據uid發送自定義事件 */
static writeStream(uid, event, data) {
const instance = this.getInstance(uid)
if (instance) instance.writeStream(event, data)
}
/** * 初始化函數中記錄當前實例, 並保持長鏈接 */
_init() {
instanceMap.set(this.uid, this)
this._writeKeepAliveStream()
const timer = setInterval(() => { this._writeKeepAliveStream() }, this.intervalTime)
this.stream.on('close', () => {
clearInterval(timer)
instanceMap.delete(this.uid)
})
}
/** * 經過發送註釋消息保持長鏈接 */
_writeKeepAliveStream() {
this.stream.write(': \n\n')
}
/** * 發送自定義事件 */
writeStream(event, data) {
const payload = typeof data === 'string' ? data : JSON.stringify(data)
this.stream.write(`event: ${event}\ndata: ${payload}\n\n`)
}
}
複製代碼
封裝後, /sse
接口代碼簡化爲:
router.get('/sse', ctx => {
ctx.set({
'Content-Type':'text/event-stream',
'Cache-Control':'no-cache',
'Connection': 'keep-alive'
})
const sse = new SSE()
sse.writeStream('sse-connect', sse.uid)
ctx.body = sse.stream
})
複製代碼
執行用戶代碼, 須要限制容器的使用時長, 雖然一直有Issue: 給docker run命令增長timeout選項, 可是最佳的中止容器運行的方式仍是docker stop / docker kill
docker stop
: 用docker stop命令來停掉容器的時候,docker默認會容許容器中的應用程序有10秒的時間用以終止運行, 若是等待時間達到設定的超時時間,或者默認的10秒,會繼續發送SIGKILL的系統信號強行kill掉進程
docker kill
: 默認狀況下,docker kill命令不會給容器中的應用程序有任何gracefully shutdown的機會。 它會直接發出SIGKILL的系統信號,以強行終止容器中程序的運行
所以, 此處採用docker kill
更符合需求, 中止容器的代碼以下:
/** * 中止Docker容器 * @description * exec方法中的timeout選項在執行「docker run」命令時無效, 所以採用「docker kill」命令來限制容器使用時長 * 經過docker kill, childProcess exitCode值爲137 * @param {string} containerName 容器名稱 * @param {number} timeout 限制使用時長 * @return {number} timer */
function stopDocker(containerName, timeout) {
return setTimeout(async () => {
try {
await exec(`docker kill ${containerName}`)
} catch (e) { }
}, timeout)
}
複製代碼
注: 此處child_process.exec
的timeout
選項並不能中止docker
容器
child_process.exec
方法執行命令返回的結果是buffer/string
, 此處咱們須要使用流的方式, 就要使用child_process.spawn
方法
容器啓動的部分代碼以下:
/** * 啓動Docker容器並使用流模式獲取輸出 * @param {object} dockerOptions 啓動docker的配置 */
function startDockerBySpawn(dockerOptions) {
// 得到容器名, 鏡像名, 執行命令, 掛載卷
const { containerName, imageName, execCommand, volume } = dockerOptions
// 參數差別
const commandStr = `docker run --rm --memory=50m --name ${containerName} -v ${volume} ${imageName} ${execCommand}`
const [command, ...args] = commandStr.split(' ')
// 啓動容器
const childProcess = spawn(command, args)
//...
}
複製代碼
容器啓動後, 能夠得到兩個流:
childProcess.stdout
childProcess.stderr
咱們須要把兩個流的數據組合起來, 而後將其轉換爲SSE
數據格式, 最終寫入到目標流targetStream
, 代碼以下:
const t = new SSETransform()
const transferStation = new PassThrough()
childProcess.stdout.pipe(transferStation)
childProcess.stderr.pipe(transferStation)
transferStation.pipe(t).pipe(targetStream, { end: false })
複製代碼
自定義的轉換流以下:
const { Transform } = require('stream')
/** * 自定義轉換流 * @description * 將child_process.stdout/stderr的可寫流轉換爲EventStream的格式 */
module.exports = class SSETransform extends Transform {
constructor(eventName) {
super()
this.eventName = eventName || 'sse-result'
}
_transform(chunk, encoding, callback) {
callback(null, `event: ${this.eventName}\ndata: ${JSON.stringify({ result: chunk.toString('utf8') })}\n\n`)
}
}
複製代碼
一次代碼執行流程得到的數據以下圖所示
至此, 就完成了一個代碼在線運行工具的開發