入門 Node.js Net 模塊構建 TCP 網絡服務

想作一個簡單的 Web API,這個時候就須要搭建一個 Web 服務器,在 ASP.NET 中須要 IIS 來搭建服務器,PHP 中須要藉助 Apache/Nginx 來實現,對於新手在還沒開始以前看到這麼多步驟,也許就要放棄了,可是在 Node.js 中開啓一個 Web 服務器是 So Easy 的,咱們利用 Net、Dgram、HTTP、HTTPS 等模塊經過幾行簡單的代碼就可實現。html

快速導航

面試指南

  • 什麼是 TCP 協議?什麼狀況下又會選擇 TCP 協議呢?參考正文 Interview1
  • TCP 粘包是什麼?該怎麼解決?參考正文 Interview2

網絡模型

大多數同窗對於 HTTP、HTTPS 會很熟悉,一般用於瀏覽器與服務端交互,或者服務端與服務端的交互,另外兩個 Net 與 Dgram 也許會相對陌生,這兩個是基於網絡模型的傳輸層來實現的,分別對應於 TCP、UDP 協議,下面一圖看明白 OSI 七層模型 與 TCP/IP 五層模型之間的關係,中間使用虛線標註了傳輸層,對於上層應用層(HTTP/HTTPS等)也都是基於這一層的 TCP 協議來實現的,因此想使用 Node.js 作服務端開發,Net 模塊也是你必需要掌握的,這也是咱們本篇要講解的重點。node

初識 TCP 協議

Interview1: 有些概念仍是要弄清楚的,什麼是 TCP 協議?什麼狀況下又會選擇 TCP 協議呢?git

TCP 是傳輸控制協議,大多數狀況下咱們都會使用這個協議,由於它是一個更可靠的數據傳輸協議,具備以下三個特色:github

  • 面向連接: 須要對方主機在線,並創建連接。
  • 面向字節流: 你給我一堆字節流的數據,我給你發送出去,可是每次發送多少是我說了算,每次選出一段字節發送的時候,都會帶上一個序號,這個序號就是發送的這段字節中編號最小的字節的編號。
  • 可靠: 保證數據有序的到達對方主機,每發送一個數據就會期待收到對方的回覆,若是在指定時間內收到了對方的回覆,就確認爲數據到達,若是超過必定時間沒收到對方回覆,就認爲對方沒收到,在從新發送一遍。

上面三個特色說到 TCP 是面向連接和可靠的,其一個顯著特徵是在傳輸以前會有一個 3 次握手,實現過程以下所示:面試

在一次 TCP 三次握手的過程當中,客戶端與服務端會分別提供一個套接字來造成一個連接。以後客戶端與服務端經過這個連接來互相發送數據。算法

Net 模塊構建一個 TCP 服務

以上了解了 TCP 的一些概念以後,咱們開始建立一個 TCP 服務端與客戶端實例,這裏咱們須要使用 Node.js 的 Net 模塊,它提供了一些用於底層通訊的接口,該模塊能夠用於建立基於流的 TCP 或 IPC 的服務器(net.createServer())與客戶端(net.createConnection())。api

建立 TCP 服務

可使用 new net.Server 建立一個 TCP 服務端連接,也能夠經過工廠函數 net.createServer() 的方式,createServer() 的內部實現也是內部調用了 Server 構造函數來建立一個 TCP 對象,和 new net.Server 是同樣的,代碼以下所示:瀏覽器

github.com/nodejs/node…性能優化

function createServer(options, connectionListener) {
  return new Server(options, connectionListener);
}
複製代碼

github.com/nodejs/node…bash

function Server(options, connectionListener) {
  if (!(this instanceof Server))
    return new Server(options, connectionListener);

  // Server 類內部仍是繼承了 EventEmitter,這個不在本節範圍
  EventEmitter.call(this);

  ...
複製代碼

TCP 服務事件

在開始代碼以前,先了解下其相關事件,參考官網 nodejs.cn/api/net.htm…,這裏也不會把全部的都介紹,下面介紹一些經常使用的,而且經過代碼示例,進行講解,能夠在這個基礎之上在去參考官網,實踐一些其它的事件或方法。

TCP 服務器事件

  • listening: ,也就是 server.listen();
  • connection: 新連接創建時觸發,也就是每次收到客戶端回調,參數 socket 爲 net.createServer 實例,也能夠寫在 net.createServer(function(socket) {}) 方法裏
  • close:當 server 關閉的時候觸發(server.close())。 若是有鏈接存在,直到全部的鏈接結束纔會觸發這個事件
  • error:捕獲錯誤,例如監聽一個已經存在的端口就會報 Error: listen EADDRINUSE 錯誤

TCP 連接事件方法

  • data: 一端調用 write() 方法發送數據時,另外一端會經過 socket.on('data') 事件接收到,能夠理解爲讀取數據
  • end: 每次 socket 連接會出現一次,例如客戶端發送消息以後執行 Ctrl + C 終端,就會收到
  • error: 監聽 socket 的錯誤信息
  • write:write 是一個方法(socket.write())上面的 data 事件是讀數據,write 方法在這裏就爲寫數據到另外一端,

TCP 服務端代碼實現

const net = require('net');
const HOST = '127.0.0.1';
const PORT = 3000;

// 建立一個 TCP 服務實例
const server = net.createServer();

// 監聽端口
server.listen(PORT, HOST);

server.on('listening', () => {
    console.log(`服務已開啓在 ${HOST}:${PORT}`);
});

server.on('connection', socket => {
    // data 事件就是讀取數據
    socket.on('data', buffer => {
        const msg = buffer.toString();
        console.log(msg);

        // write 方法寫入數據,發回給客戶端
        socket.write(Buffer.from('你好 ' + msg));
    });
})

server.on('close', () => {
    console.log('Server Close!');
});

server.on('error', err => {
    if (err.code === 'EADDRINUSE') {
        console.log('地址正被使用,重試中...');

        setTimeout(() => {
            server.close();
            server.listen(PORT, HOST);
        }, 1000);
    } else {
        console.error('服務器異常:', err);
    }
});
複製代碼

TCP 客戶端代碼實現

const net = require('net');
const client = net.createConnection({
    host: '127.0.0.1',
    port: 3000
});

client.on('connect', () => {
    // 向服務器發送數據
    client.write('Nodejs 技術棧');

    setTimeout(() => {
        client.write('JavaScript ');
        client.write('TypeScript ');
        client.write('Python ');
        client.write('Java ');
        client.write('C ');
        client.write('PHP ');
        client.write('ASP.NET ');
    }, 1000);
})

client.on('data', buffer => {
    console.log(buffer.toString());
});

// 例如監聽一個未開啓的端口就會報 ECONNREFUSED 錯誤
client.on('error', err => {
    console.error('服務器異常:', err);
});

client.on('close', err => {
    console.log('客戶端連接斷開!', err);
});
複製代碼

源碼實現地址

https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-1-client-server 
複製代碼

客戶端與服務端 Demo 測試

首先啓動服務端,以後在啓動客戶端,客戶端調用三次,打印結果以下所示:

服務端

$ node server.js
服務已開啓在 127.0.0.1:3000
# 第一次
Nodejs 技術棧
JavaScript 
TypeScript Python Java C PHP ASP.NET 
# 第二次
Nodejs 技術棧
JavaScript TypeScript Python Java C PHP ASP.NET 
複製代碼

客戶端

$ node client.js
# 第一次
你好 Nodejs 技術棧
你好 JavaScript 
你好 TypeScript Python Java C PHP ASP.NET 

# 第二次
你好 Nodejs 技術棧
你好 JavaScript TypeScript Python Java C PHP ASP.NET 
複製代碼

在客戶端我使用 client.write() 發送了屢次數據,可是隻有 setTimeout 以外的是正常的,setTimeout 裏面連續發送的彷佛並非每一次一返回,而是會隨機合併返回了,爲何呢?且看下面 TCP 的粘包問題介紹。

TCP 粘包問題

Interview2: TCP 粘包是什麼?該怎麼解決?

上面的例子最後拋出了一個問題,爲何客戶端連續向服務端發送數據,會收到合併返回呢?這也是在 TCP 中常見的粘包問題,客戶端(發送的一端)在發送以前會將短期有多個發送的數據塊緩衝到一塊兒(發送端緩衝區),造成一個大的數據塊一併發送,一樣接收端也有一個接收端緩衝區收到的數據先存放接收端緩衝區,而後程序從這裏讀取部分數據進行消費,這樣作也是爲了減小 I/O 消耗達到性能優化。

問題思考:數據到達緩衝區什麼時間開始發送?

這個取決於 TCP 擁塞控制,是任什麼時候刻內肯定能被髮送出去的字節數的控制因素之一,是阻止發送方至接收方之間的鏈路變得擁塞的手段,參考維基百科:zh.wikipedia.org/wiki/TCP擁塞控…

TCP 粘包解決方案?

  • 方案一:延遲發送
  • 方案二:關閉 Nagle 算法
  • 方案三:封包/拆包

方案一:延遲發送

一種最簡單的方案是設置延遲發送,sleep 休眠一段時間的方式,可是這個方案雖然簡單,同時缺點也顯而易見,傳輸效率大大下降,對於交互頻繁的場景顯然是不適用的,第一次改造以下:

client.on('connect', () => {
    client.setNoDelay(true);
    // 向服務器發送數據
    client.write('Nodejs 技術棧');

    const arr = [
        'JavaScript ',
        'TypeScript ',
        'Python ',
        'Java ',
        'C ',
        'PHP ',
        'ASP.NET '
    ]
    
    for (let i=0; i<arr.length; i++) {
        (function(val, k){
            setTimeout(() => {
                client.write(val);
            }, 1000 * (k+1))
        }(arr[i], i));
    }
})
複製代碼

控制檯執行 node client.js 命令,彷佛一切 ok 了沒有在出現粘包的狀況,可是這種狀況僅使用於交互頻率很低的場景。

$ node client.js
你好 Nodejs 技術棧
你好 JavaScript 
你好 TypeScript 
你好 Python 
你好 Java 
你好 C 
你好 PHP 
你好 ASP.NET 
複製代碼

源碼實現地址

https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-2-delay
複製代碼

方案二:Nagle 算法

Nagle 算法是一種改善網絡傳輸效率的算法,避免網絡中充斥着大量小的數據塊,它所指望的是儘量發送大的數據塊,所以在每次請求一個數據塊給 TCP 發送時,TCP 並不會當即執行發送,而是等待一小段時間進行發送。

當網絡中充斥着大量小數據塊時,Nagle 算法能將小的數據塊集合起來一塊兒發送減小了網絡擁堵,這個仍是頗有幫助的,但也並非全部場景都須要這樣,例如,REPL 終端交互,當用戶輸入單個字符以獲取響應,因此在 Node.js 中能夠設置 socket.setNoDelay() 方法來關閉 Nagle 算法。

const server = net.createServer();

server.on('connection', socket => {
    socket.setNoDelay(true);
})
複製代碼

關閉 Nagle 算法並不老是有效的,由於其是在服務端完成合並,TCP 接收到數據會先存放於本身的緩衝區中,而後通知應用接收,應用層由於網絡或其它的緣由若不能及時從 TCP 緩衝區中取出數據,也會形成 TCP 緩衝區中存放多段數據塊,就又會造成粘包。

方案三:封包/拆包

前面兩種方案都不是特別理想的,這裏介紹第三種封包/拆包,也是目前業界用的比較多的,這裏使用長度編碼的方式,通訊雙方約定好格式,將消息分爲定長的消息頭(Header)和不定長的消息體(Body),在解析時讀取消息頭獲取到內容佔用的長度,以後讀取到的消息體內容字節數等於字節頭的字節數時,咱們認爲它是一個完整的包。

消息頭序號 (Header) 消息體長度 (Header) 消息體 (Body)
SerialNumber bodyLength body
2(字節) 2(字節) N(字節)

預先知識 Buffer

下面會經過編碼實現,可是在開始以前但願你能瞭解一下 Buffer,可參考我以前寫的 Buffer 文章 Node.js 中的緩衝區(Buffer)到底是什麼?,下面我列出本次須要用到的 Buffer 作下說明,對於不瞭解 Buffer 的同窗是有幫助的。

  • Buffer.alloc(size[, fill[, encoding]]):初始化一個 size 大小的 Buffer 空間,默認填充 0,也能夠指定 fill 進行自定義填充
  • buf.writeInt16BE(value[, offset]):value 爲要寫入的 Buffer 值,offset 爲偏移量從哪一個位置開始寫入
  • buf.writeInt32BE(value[, offset]):參數同 writeInt16BE,不一樣的是 writeInt16BE 表示高位優先寫入一個 16 位整型,而 writeInt32BE 表示高位優先寫入一個 32 位整型
  • buf.readInt16BE([offset]):高位優先讀取 16 位整型,offset 爲讀取以前要跳過的字節數
  • buf.readInt32BE([offset]):高位優先讀取 32 位整型,offset 爲讀取以前要跳過的字節數

編碼/解碼實現

TCP 底層傳輸是基於二進制數據,可是咱們應用層一般是易於表達的字符串、數字等,這裏第一步在編碼的實現中,就須要先將咱們的數據經過 Buffer 轉爲二進制數據,取出的時候一樣也須要解碼操做,一切盡在代碼裏,實現以下:

// transcoder.js

class Transcoder {
    constructor () {
        this.packageHeaderLen = 4; // 包頭長度
        this.serialNumber = 0; // 定義包序號
        this.packageSerialNumberLen = 2; // 包序列號所佔用的字節
    }

    /** * 編碼 * @param { Object } data Buffer 對象數據 * @param { Int } serialNumber 包序號,客戶端編碼時自動生成,服務端解碼以後在編碼時須要傳入解碼的包序列號 */
    encode(data, serialNumber) {
        const body = Buffer.from(data);

        const header = Buffer.alloc(this.packageHeaderLen);
        header.writeInt16BE(serialNumber || this.serialNumber);
        header.writeInt16BE(body.length, this.packageSerialNumberLen); // 跳過包序列號的前兩位

        if (serialNumber === undefined) {
            this.serialNumber++;
        }
       
        return Buffer.concat([header, body]);
    }

    /** * 解碼 * @param { Object } buffer */
    decode(buffer) {
        const header = buffer.slice(0, this.packageHeaderLen); // 獲取包頭
        const body = buffer.slice(this.packageHeaderLen); // 獲取包尾部
    
        return {
            serialNumber: header.readInt16BE(),
            bodyLength: header.readInt16BE(this.packageSerialNumberLen), // 由於編碼階段寫入時跳過了前兩位,解碼一樣也要跳過
            body: body.toString(),
        }
    }

    /** * 獲取包長度兩種狀況: * 1. 若是當前 buffer 長度數據小於包頭,確定不是一個完整的數據包,所以直接返回 0 不作處理(可能數據還未接收完等等) * 2. 不然返回這個完整的數據包長度 * @param {*} buffer */
    getPackageLength(buffer) {
        if (buffer.length < this.packageHeaderLen) {
            return 0;
        }
    
        return this.packageHeaderLen + buffer.readInt16BE(this.packageSerialNumberLen);
    }
}

module.exports = Transcoder;
複製代碼

客戶端改造

const net = require('net');
const Transcoder = require('./transcoder');
const transcoder = new Transcoder();
const client = net.createConnection({
    host: '127.0.0.1',
    port: 3000
});

let overageBuffer=null; // 上一次 Buffer 剩餘數據

client.on('data', buffer => {
    if (overageBuffer) {
        buffer = Buffer.concat([overageBuffer, buffer]);
    }

    let packageLength = 0;

    while (packageLength = transcoder.getPackageLength(buffer)) {
        const package = buffer.slice(0, packageLength); // 取出整個數據包
        buffer = buffer.slice(packageLength); // 刪除已經取出的數據包,這裏採用的方法是把緩衝區(buffer)已取出的包給截取掉

        const result = transcoder.decode(package); // 解碼
        console.log(result);
    }

    overageBuffer=buffer; // 記錄剩餘不完整的包
}).on('error', err => { // 例如監聽一個未開啓的端口就會報 ECONNREFUSED 錯誤
    console.error('服務器異常:', err);
}).on('close', err => {
    console.log('客戶端連接斷開!', err);
});

client.write(transcoder.encode('0 Nodejs 技術棧'));

const arr = [
    '1 JavaScript ',
    '2 TypeScript ',
    '3 Python ',
    '4 Java ',
    '5 C ',
    '6 PHP ',
    '7 ASP.NET '
]

setTimeout(function() {
    for (let i=0; i<arr.length; i++) {
        console.log(arr[i]);

        client.write(transcoder.encode(arr[i]));
    }
}, 1000);
複製代碼

服務端改造

const net = require('net');
const Transcoder = require('./transcoder');
const transcoder = new Transcoder();
const HOST = '127.0.0.1';
const PORT = 3000;
let overageBuffer=null; // 上一次 Buffer 剩餘數據

const server = net.createServer();

server.listen(PORT, HOST);

server.on('listening', () => {
    console.log(`服務已開啓在 ${HOST}:${PORT}`);
}).on('connection', socket => {
    // data 事件就是讀取數據
    socket
        .on('data', buffer => {
            if (overageBuffer) {
                buffer = Buffer.concat([overageBuffer, buffer]);
            }
        
            let packageLength = 0;
        
            while (packageLength = transcoder.getPackageLength(buffer)) {
                const package = buffer.slice(0, packageLength); // 取出整個數據包
                buffer = buffer.slice(packageLength); // 刪除已經取出的數據包,這裏採用的方法是把緩衝區(buffer)已取出的包給截取掉
        
                const result = transcoder.decode(package); // 解碼
                console.log(result);
                socket.write(transcoder.encode(result.body, result.serialNumber));
            }
        
            overageBuffer=buffer; // 記錄剩餘不完整的包
        })
        .on('end', function(){
            console.log('socket end')
        })
        .on('error',function(error){
            console.log('socket error', error);
        });
}).on('close', () => {
    console.log('Server Close!');
}).on('error', err => {
    if (err.code === 'EADDRINUSE') {
        console.log('地址正被使用,重試中...');

        setTimeout(() => {
            server.close();
            server.listen(PORT, HOST);
        }, 1000);
    } else {
        console.error('服務器異常:', err);
    }
});
複製代碼

運行測試

控制檯執行 node server.js 開啓服務端,以後執行 node client.js 開啓客戶端測試,輸出結果以下所示:

$ node client.js
{ serialNumber: 0, bodyLength: 18, body: '0 Nodejs 技術棧' }
1 JavaScript 
2 TypeScript 
3 Python 
4 Java 
5 C 
6 PHP 
7 ASP.NET 
{ serialNumber: 1, bodyLength: 13, body: '1 JavaScript ' }
{ serialNumber: 2, bodyLength: 13, body: '2 TypeScript ' }
{ serialNumber: 3, bodyLength: 9, body: '3 Python ' }
{ serialNumber: 4, bodyLength: 7, body: '4 Java ' }
{ serialNumber: 5, bodyLength: 4, body: '5 C ' }
{ serialNumber: 6, bodyLength: 6, body: '6 PHP ' }
{ serialNumber: 7, bodyLength: 10, body: '7 ASP.NET ' }
複製代碼

以上結果中,setTimeout 函數裏咱們同一時間先發送多條數據,以後一一返回,同時打印了包消息頭定義的包序列號、消息體長度和包消息體,且是一一對應的,上面提的粘包問題也獲得瞭解決。封包/拆包這塊是有點複雜的,以上代碼也已經儘量簡單的介紹了實現思路,下面給出實現代碼地址,能夠作爲參照本身也可使用不一樣的方式去實現

https://github.com/Q-Angelo/project-training/tree/master/nodejs/net/chapter-3-package
複製代碼
做者:五月君
連接:https://github.com/Q-Angelo/Nodejs-Roadmap
來源:Nodejs.js技術棧
複製代碼
相關文章
相關標籤/搜索