本文包括以下內容:html
WebSocket
協議第四章 - 鏈接握手WebSocket
協議第五章 - 數據幀nodejs ws
庫源碼分析 - 鏈接握手過程nodejs ws
庫源碼分析 - 數據幀解析過程參考node
ws - githubgithub
本文對WebSocket
的概念、定義、解釋和用途等基礎知識不會涉及, 稍微偏幹一點, 篇幅較長, markdown大約800行, 閱讀須要耐心web
關於WebSocket
有一句很常見的話: Websocket複用了HTTP的握手通道, 它具體指的是:面試
客戶端經過HTTP請求與WebSocket服務器協商升級協議, 協議升級完成後, 後續的數據交換則遵守WebSocket協議ajax
首先由客戶端換髮起協議升級請求, 根據WebSocket
協議規範, 請求頭必須包含以下的內容api
GET / HTTP/1.1
Host: localhost:8080
Origin: http://127.0.0.1:3000
Connection: Upgrade
Upgrade: websocket
Sec-WebSocket-Version: 13
Sec-WebSocket-Key: w4v7O6xFTi36lq3RNcgctw==
複製代碼
服務器返回的響應頭必須包含以下的內容數組
HTTP/1.1 101 Switching Protocols
Connection:Upgrade
Upgrade: websocket
Sec-WebSocket-Accept: Oy4NRAQ13jhfONC7bP8dTKb4PTU=
複製代碼
HTTP/1.1 101 Switching Protocols
規範提到:瀏覽器
Sec-WebSocket-Key值由一個隨機生成的16字節的隨機數經過base64(見RFC4648的第四章)編碼獲得的
例如, 隨機選擇的16個字節爲:
// 十六進制 數字1~16
0x01 0x02 0x03 0x04 0x05 0x06 0x07 0x08 0x09 0x0a 0x0b 0x0c 0x0d 0x0e 0x0f 0x10
複製代碼
經過base64編碼後值爲: AQIDBAUGBwgJCgsMDQ4PEA==
測試代碼以下:
const list = Array.from({ length: 16 }, (v, index) => ++index)
const key = Buffer.from(list)
console.log(key.toString('base64'))
// AQIDBAUGBwgJCgsMDQ4PEA==
複製代碼
而Sec-WebSocket-Accept
值的計算方式爲:
Sec-Websocket-Key
的值和258EAFA5-E914-47DA-95CA-C5AB0DC85B11
拼接SHA1
計算出摘要, 並轉成base64
字符串此處不須要糾結神奇字符串258EAFA5-E914-47DA-95CA-C5AB0DC85B11
, 它就是一個GUID
, 沒準兒是寫RFC的時候隨機生成的
測試代碼以下:
const crypto = require('crypto')
function hashWebSocketKey (key) {
const GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
return crypto.createHash('sha1')
.update(key + GUID)
.digest('base64')
}
console.log(hashWebSocketKey('w4v7O6xFTi36lq3RNcgctw=='))
// Oy4NRAQ13jhfONC7bP8dTKb4PTU=
複製代碼
前面簡單提到他的做用爲: 提供基礎的防禦, 減小惡意鏈接, 進一步闡述以下:
Key
能夠避免服務器收到非法的WebSocket
鏈接, 好比http
請求鏈接到websocket
, 此時服務端能夠直接拒絕Key
能夠用來初步確保服務器認識ws
協議, 但也不能排除有的http服務器只處理Sec-WebSocket-Key
, 並不實現ws
協議Key
能夠避免反向代理緩存Sec-Websocket-Key
以及相關header是被禁止的, 這樣能夠避免客戶端發送ajax請求時, 意外請求協議升級最終須要強調的是: Sec-WebSocket-Key/Accept並非用來保證數據的安全性, 由於其計算/轉換公式都是公開的, 並且很是簡單, 最主要的做用是預防一些意外的狀況
WebSocket
通訊的最小單位是幀, 由一個或多個幀組成一條完整的消息, 交換數據的過程當中, 發送端和接收端須要作的事情以下:
數據幀格式做爲核心內容, 一眼看去彷佛難以理解, 但本文做者下死命令了, 必須理解, 沖沖衝
FIN
: 佔1bit
0
表示不是消息的最後一個分片1
表示是消息的最後一個分片RSV1
, RSV2
, RSV3
: 各佔1bit, 通常狀況下全爲0, 與Websocket拓展有關, 若是出現非零的值且沒有采用WebSocket拓展, 鏈接出錯
Opcode
: 佔4bit
%x0
: 表示本次數據傳輸採用了數據分片, 當前數據幀爲其中一個數據分片%x1
: 表示這是一個文本幀%x2
: 表示這是一個二進制幀%x3-7
: 保留的操做代碼, 用於後續定義的非控制幀%x8
: 表示鏈接斷開%x9
: 表示這是一個心跳請求(ping)%xA
: 表示這是一個心跳響應(pong)%xB-F
: 保留的操做代碼, 用於後續定義的非控制幀Mask
: 佔1bit
0
表示不對數據載荷進行掩碼異或操做1
表示對數據載荷進行掩碼異或操做Payload length
: 佔7或7+16或7+64bit
0~125
: 數據長度等於該值126
: 後續的2個字節表明一個16位的無符號整數, 值爲數據的長度127
: 後續的8個字節表明一個64位的無符號整數, 值爲數據的長度Masking-key
: 佔0或4bytes
1
: 攜帶了4字節的Masking-key0
: 沒有Masking-keypayload data
: 載荷數據
我想若是知道byte和bit的區別, 這部分就沒問題- -
WebSocket
的每條消息可能被切分紅多個數據幀, 當接收到一個數據幀時,會根據FIN值來判斷, 是否爲最後一個數據幀
數據幀傳遞示例:
FIN=0, Opcode=0x1
: 發送文本類型, 消息尚未發送完成,還有後續幀FIN=0, Opcode=0x0
: 消息沒有發送完成, 還有後續幀, 接在上一條後面FIN=1, Opcode=0x0
: 消息發送完成, 沒有後續幀, 接在上一條後面組成完整消息雖然以前用的都是socket.io
, 偶然發現了ws
, 使用量居然還挺大, 周下載量是socket.io
的六倍
在NodeJS
中, 每當遇到協商升級請求時, 就會觸發http
模塊的upgrade
事件, 這即是實現WebSocketServer
的切入點, 原生示例代碼以下:
// 建立 HTTP 服務器。
const srv = http.createServer( (req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('響應內容');
});
srv.on('upgrade', (req, socket, head) => {
// 特定的處理, 以實現Websocket服務
});
複製代碼
而且, 在通常的使用中, 都是在一個已有的httpServer
基礎上進行拓展, 以實現WebSocket
, 而不是建立一個獨立的WebSocketServer
在一個已有httpServer
的基礎上, ws
使用的實例代碼爲
const http = require('http');
const WebSocket = require('ws');
const server = http.createServer();
const wss = new WebSocket.Server({ server });
server.listen(8080);
複製代碼
已有的httpServer
做爲參數傳給了WebSocket.Server
構造函數, 因此源碼分析的核心切入點爲:
new WebSocket.Server({ server });
複製代碼
經過這個切入點, 就能夠完整復現鏈接握手的過程
由於httpServer
已做爲參數傳遞進來, 所以其構造函數變得十分簡單:
class WebSocketServer extends EventEmitter {
constructor(options, callback) {
super()
// 在提供了http server的基礎上, 代碼能夠簡化爲
if (options.server) {
this._server = options.server
}
// 監聽事件
if (this._server) {
this._removeListeners = addListeners(this._server, {
listening: this.emit.bind(this, 'listening'),
error: this.emit.bind(this, 'error'),
// 核心
upgrade: (req, socket, head) => {
// 下一步切入點
this.handleUpgrade(req, socket, head, (ws) => {
this.emit('connection', ws, req)
})
}
})
}
}
}
// 這是一段很是帶秀的代碼, 在綁定多個事件監聽器的同時返回一個移除多個事件監聽器的函數
function addListeners(server, map) {
for (const event of Object.keys(map)) server.on(event, map[event]);
return function removeListeners() {
for (const event of Object.keys(map)) {
server.removeListener(event, map[event]);
}
};
}
複製代碼
能夠看到, 在構造函數中, 爲httpServer
註冊了upgrade
事件的監聽器, 觸發時, 會執行this.handleUpgrade
函數, 這即是下一步的方向
這個函數主要用來過濾掉不合法的請求, 檢查的內容包括:
Sec-WebSocket-Key
值
Sec-WebSocket-Version
值
WebSocket
請求的路徑
關鍵代碼以下:
const keyRegex = /^[+/0-9A-Za-z]{22}==$/;
handleUpgrade(req, socket, head, cb) {
socket.on('error', socketOnError)
// 獲取sec-websocket-key
const key = req.headers['sec-websocket-key'] !== undefined
? req.headers['sec-websocket-key']
: false
// 獲取sec-websocket-version
const version = +req.headers['sec-websocket-version']
// 獲取協議拓展, 本篇不涉及
const extensions = {};
// 對於不合法的請求, 中斷握手
if (
req.method !== 'GET' ||
req.headers.upgrade.toLowerCase() !== 'websocket' ||
!key ||
!keyRegex.test(key) ||
(version !== 8 && version !== 13) ||
// 該函數是對Websocket請求路徑的判斷, 與option.path相關, 不展開
!this.shouldHandle(req)
) {
return abortHandshake(socket, 400)
}
// 對於合法的請求, 給它升級!
this.completeUpgrade(key, extensions, req, socket, head, cb)
}
複製代碼
對於不合法的請求, 直接400 bad request
了, abortHandshake
以下:
const { STATUS_CODES } = require('http');
function abortHandshake(socket, code, message, headers) {
// net.Socket 也是雙工流,所以它既可讀也可寫
if (socket.writable) {
message = message || STATUS_CODES[code];
headers = {
Connection: 'close',
'Content-type': 'text/html',
'Content-Length': Buffer.byteLength(message),
...headers
};
socket.write(
`HTTP/1.1 ${code} ${STATUS_CODES[code]}\r\n` +
Object.keys(headers)
.map((h) => `${h}: ${headers[h]}`)
.join('\r\n') +
'\r\n\r\n' +
message
);
}
// 移除handleUpgrade中添加的error監聽器
socket.removeListener('error', socketOnError);
// 確保在該 socket 上再也不有 I/O 活動
socket.destroy();
}
複製代碼
若是一切順利, 咱們來到completeUpgrade
函數
這個函數主要用來, 返回正確的響應, 觸發相關的事件, 記錄值等, 代碼比較簡單
const { createHash } = require('crypto');
const { GUID } = require('./constants');
const WebSocket = require('./websocket');
function completeUpgrade(key, extensions, req, socket, head, cb) {
// Destroy the socket if the client has already sent a FIN packet.
if (!socket.readable || !socket.writable) return socket.destroy()
// 生成sec-websocket-accept
const digest = createHash('sha1')
.update(key + GUID)
.digest('base64');
// 組裝Headers
const headers = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
`Sec-WebSocket-Accept: ${digest}`
];
// 建立一個Websocket實例
const ws = new Websocket(null)
this.emit('headers', headers, req);
// 返回響應
socket.write(headers.concat('\r\n').join('\r\n'));
socket.removeListener('error', socketOnError);
// 下一步切入點
ws.setSocket(socket, head, this.options.maxPayload);
// 經過Set記錄處於鏈接狀態的客戶端
if (this.clients) {
this.clients.add(ws);
ws.on('close', () => this.clients.delete(ws));
}
// 觸發connection事件
cb(ws);
}
複製代碼
到這裏, 就完成了整個握手階段, 但還沒涉及到對數據幀的處理
上一章末尾, 啓示下文的代碼爲completeUpgrade
中的:
ws.setSocket(socket, head, this.options.maxPayload);
複製代碼
進入WebSocket
類中的setSocket
方法, 關於數據幀處理代碼主要能夠簡化爲:
Class WebSocket extends EventEmitter {
...
setSocket(socket, head, maxPayload) {
// 實例化一個可寫流, 用於處理數據幀
const receiver = new Receiver(
this._binaryType,
this._extensions,
maxPayload
);
receiver[kWebSocket] = this;
socket.on('data', socketOnData);
}
}
function socketOnData(chunk) {
if (!this[kWebSocket]._receiver.write(chunk)) {
this.pause();
}
}
複製代碼
此處忽略了不少事件處理, 例如error
, end
, close
等, 由於他們與本文目標無關, 對於一些API, 也不作介紹
因此核心切入點爲Receiver
類, 它就是用於處理數據幀的核心
Receiver類繼承自可寫流, 還須要明確兩點基本概念:
stream
全部的流都是EventEmitter
的實例writable._write
方法, 該方法供內部使用const { Writable } = require('stream')
class Recevier extends Writable {
constructor(binaryType, extensions, maxPayload) {
super()
this._binaryType = binaryType || BINARY_TYPES[0]; // nodebuffer
this[kWebSocket] = undefined; // WebSocket實例的引用
this._extensions = extensions || {}; // WebSocket協議拓展
this._maxPayload = maxPayload | 0; // 100 * 1024 * 1024
this._bufferedBytes = 0; // 記錄buffer長度
this._buffers = []; // 記錄buffer數據
this._compressed = false; // 是否壓縮
this._payloadLength = 0; // 數據幀 PayloadLength
this._mask = undefined; // 數據幀Mask Key
this._fragmented = 0; // 數據幀是否分片
this._masked = false; // 數據幀 Mask
this._fin = false; // 數據幀 FIN
this._opcode = 0; // 數據幀 Opcode
this._totalPayloadLength = 0; // 載荷總長度
this._messageLength = 0; // 載荷總長度, 與this._compressed有關
this._fragments = []; // 載荷分片記錄數組
this._state = GET_INFO; // 標誌位, 用於startLoop函數
this._loop = false; // 標誌位, 用於startLoop函數
}
_write(chunk, encoding, cb) {
if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
this._bufferedBytes += chunk.length;
this._buffers.push(chunk);
this.startLoop(cb);
}
}
複製代碼
能夠看到, 每當收到新的數據幀, 就會將其記錄在_buffers
數組中, 並當即開始解析流程startLoop
startLoop(cb) {
let err;
this._loop = true;
do {
switch (this._state) {
case GET_INFO:
err = this.getInfo();
break;
case GET_PAYLOAD_LENGTH_16:
err = this.getPayloadLength16();
break;
case GET_PAYLOAD_LENGTH_64:
err = this.getPayloadLength64();
break;
case GET_MASK:
this.getMask();
break;
case GET_DATA:
err = this.getData(cb);
break;
default:
// `INFLATING`
this._loop = false;
return;
}
} while (this._loop);
cb(err);
}
複製代碼
解析流程很簡單:
getInfo
首先解析FIN
, RSV
, OPCODE
, MASK
, PAYLOAD LENGTH
等數據
由於payload length
分爲三種狀況(具體後面敘述, 此處只列出分支):
0~125: 調用haveLength
方法
126: 先觸發getPayloadLength16
方法, 再調用haveLength
方法
127: 先出法getPayloadLength64
方法, 再調用haveLength
方法
haveLength
方法中, 若是存在掩碼(mask), 先調用getMask
方法, 再調用getData
方法
總體流程和狀態經過this._loop
和this._state
控制, 比較直觀
按理說第一步應該分析getInfo
方法, 不過裏面涉及到了consume
方法, 這個函數提供了一種簡潔的方式消費已獲取的Buffer, 這個函數接受一個參數n
, 表明須要消費的字節數, 最後返回消費的字節
假如須要得到數據幀的第一個字節的數據(包含了 FIN + RSV + OPCODE), 只須要經過this.consume(1)
便可
記錄值this._buffers
是一個buffer數組, 最開始, 裏面存放完整的數據幀, 隨着消費的進行, 數據則會逐漸變小, 那麼每次消費存在三種可能:
消費的字節數剛好等於一個chunk
的字節數
消費的字節數小於一個chunk
的字節數
消費的字節數大於一個chunk
的字節數
對於第一種狀況, 只須要移出 + 返回便可
if (n === this._buffers[0].length) return this._buffers.shift()
複製代碼
對於第二種狀況, 只須要裁剪 + 返回便可
if (n < this._buffers[0].length) {
const buf = this._buffers[0]
this._buffers[0] = buf.slice(n)
return buf.slice(0, n)
}
複製代碼
對於第三種狀況, 會稍微複雜一點, 首先咱們要申請一個大小爲須要消費字節數的buffer空間, 用於存儲返回的buffer
// buffer空間是否初始化並不重要, 由於最終他都會被所有覆蓋
const dst = Buffer.allocUnsafe(n)
複製代碼
在這種狀況中, 能夠保證他的長度大於第一個chunk, 但不能肯定在消費一個chunk以後, 是否還大於第一個chunk(消費以後索引前移), 所以須要循環
// do...while能夠避免一次無心義判斷, 首先執行一次循環體, 再判斷條件
do {
const buf = this._buffers[0]
// 若是長度大於第一個chunk, 移除 + 複製便可
if (n >= buf.length) {
this._buffers.shift().copy(dst, dst.length - n);
}
// 若是長度小於一個chunk, 裁剪 + 複製便可
else {
// buf.copy這個api就本身複習一下嗷
buf.copy(dst, dst.length - n, 0, n);
this._buffers[0] = buf.slice(n);
}
n -= buf.length;
} while (n > 0)
複製代碼
一個最小的數據幀必須包含以下的數據:
FIN (1 bit) + RSV (3 bit) + OPCODE (4 bit) + MASK (1 bit) + PAYLOADLENGTH (7 bit)
複製代碼
最少2個字節, 所以少於兩個字節的數據幀是錯誤的, 簡化的getInfo
以下
getInfo() {
if (this._bufferedBytes < 2) {
this._loop = false
return
}
const buf = this.consume(2)
// 只保留了數據幀中的幾個關鍵數據
this._fin = (buf[0] & 0x80) === 0x80
this._opcode = buf[0] & 0x0f
this._payloadLength = buf[1] & 0x7f
this._masked = (buf[1] & 0x80) === 0x80
// 對應Payload Length的三種狀況
if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16
else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64
else return this.haveLength()
}
複製代碼
此處的核心就是按位於運算符&
的含義, 先以FIN
爲例, FIN
在數據幀中處於第一個bit
// FIN的值用[]指代, X表明第一個字節中的後續bit
[]xxxxxxx
// 十六進制數0x80表明二進制
10000000
// 二者按位與, 結果與後面7個bit無關
[]0000000
// 所以, 只須要比較[]0000000 和 10000000是否相等便可, 簡化即獲得
this._fin = (buf[0] & 0x80) === 0x80
複製代碼
OPCODE
和PAYLOAD LENGTH
同理
// OPCODE處於第一個字節的後四位, 與0000 1111按位與便可
xxxx[][][][] & 0000 1111 (也就是0x0f)
// PAYLOAD LENGTH處於第二個字節的後七爲, 與0111 1111按位於便可
x[][][][][][][][] & 0111 1111 (也就是0x7f)
複製代碼
三種狀況以下:
0-125
: 載荷實際長度就是0-125之間的某個數
126
: 載荷實際長度爲隨後2個字節表明的一個16位的無符號整數的數值
127
: 載荷實際長度爲隨後8個字節表明的一個64位的無符號整數的數值
可能聽起來比較繞, 看代碼, 以126
分支爲例:
getPayloadLength16() {
if (this._bufferedBytes < 2) {
this._loop = false;
return;
}
this._payloadLength = this.consume(2).readUInt16BE(0);
return this.haveLength();
}
複製代碼
能夠看到, 處理長度的核心爲readUInt16BE(0)
, 這便涉及到大小端了:
大端(Big endian)認爲第一個字節是最高位字節, 和咱們對十進制數字大小的認知類似
小端(Little endian)認爲第一個字節是最低位字節
那麼, 規範中提到的隨後2個字節表明的一個16位的無符號整數的數值, 天然指的是大端了
大端 vs 小端對比:
// 假設後面兩個字節二進制值爲
1111 1111 0000 0001
// 轉爲十六進制爲
0xff 0x01
// 大端輸出 65281
console.log(Buffer.from([0xff, 0x01]).readUInt16BE(0).toString(10))
// 小端輸出 511
console.log(Buffer.from([0xff, 0x01]).readUInt16LE(0).toString(10))
複製代碼
除此以外, 7 + 64
的模式還有一點額外的處理, 代碼以下:
getPayloadLength64() {
if (this._bufferedBytes < 8) {
this._loop = false;
return;
}
const buf = this.consume(8);
const num = buf.readUInt32BE(0);
//
// The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
// if payload length is greater than this number.
//
if (num > Math.pow(2, 53 - 32) - 1) {
this._loop = false;
return error(
RangeError,
'Unsupported WebSocket frame: payload length > 2^53 - 1',
false,
1009
);
}
this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
return this.haveLength();
}
複製代碼
在得到載荷以前, 若是getInfo
中mask
爲1, 須要進行getMask
操做, 獲取Mask Key
(一共四個字節)
getMask() {
if (this._bufferedBytes < 4) {
this._loop = false;
return;
}
this._mask = this.consume(4);
this._state = GET_DATA;
}
複製代碼
getData
源碼簡化爲以下
getData(cb) {
// data爲 Buffer.alloc(0)
let data = EMPTY_BUFFER;
// 消費payload
data = this.consume(this._payloadLength)
// 若是有mask, 根據mask key進行解碼, 此處不展開
if (this._masked) unmask(data, this._mask)
// 將其記錄進分片數組
this._fragments.push(data)
// 若是該數據幀表示: 鏈接斷開, 心跳請求, 心跳響應
if (this._opcode > 0x07) return this.controlMessage(data)
// 若是該數據幀表示: 數據分片、文本幀、二進制幀
return this.dataMessage()
}
複製代碼
接着分析dataMessage()
函數, 它用於將多個幀的數據合併, 簡化以後也比較簡單
dataMessage() {
if (this._fin) {
const messageLength = this._messageLength
const fragments = this._fragments
const buf = concat(fragments, messageLength)
this.emit('message', buf.toString())
}
}
// 簡明易懂哦, 不解釋啦
function concat(list, totalLength) {
if (list.length === 0) return EMPTY_BUFFER;
if (list.length === 1) return list[0];
const target = Buffer.allocUnsafe(totalLength);
let offset = 0;
for (let i = 0; i < list.length; i++) {
const buf = list[i];
buf.copy(target, offset);
offset += buf.length;
}
return target;
}
複製代碼
本文篇幅較長且並非面試題那種小塊的知識點, 閱讀急需耐心, 已儘可能避免貼大段代碼, 能看到這裏我都想給你打錢了
經過本篇分析, 完整的介紹以及復現了WebSocket
中的兩個關鍵階段:
鏈接握手階段
數據交換階段
我的認爲最關鍵即是: 涉及到了對Node.js的buffer模塊以及stream模塊的使用, 這也是收穫最大的一部分