做爲一名開發人員咱們常常會聽到HTTP協議、TCP/IP協議、UDP協議、Socket、Socket長鏈接、Socket鏈接池
等字眼,然而它們之間的關係、區別及原理並非全部人都能理解清楚,這篇文章就從網絡協議基礎開始到Socket鏈接池,一步一步解釋他們之間的關係。javascript
首先從網絡通訊的分層模型講起:七層模型,亦稱OSI(Open System Interconnection)
模型。自下往上分爲:物理層、數據鏈路層、網絡層、傳輸層、會話層、表示層和應用層。全部有關通訊的都離不開它,下面這張圖片介紹了各層所對應的一些協議和硬件java
經過上圖,我知道IP協議對應於網絡層,TCP、UDP協議對應於傳輸層,而HTTP協議對應於應用層,OSI並無Socket,那什麼是Socket,後面咱們將結合代碼具體詳細介紹。node
關於傳輸層TCP、UDP協議可能咱們平時碰見的會比較多,有人說TCP是安全的,UDP是不安全的,UDP傳輸比TCP快,那爲何呢,咱們先從TCP的鏈接創建的過程開始分析,而後解釋UDP和TCP的區別。linux
咱們知道TCP創建鏈接須要通過三次握手,而斷開鏈接須要通過四次分手,那三次握手和四次分手分別作了什麼和如何進行的。git
第一次握手:創建鏈接。客戶端發送鏈接請求報文段,將SYN位置爲1,Sequence Number爲x;而後,客戶端進入SYN_SEND
狀態,等待服務器的確認;
第二次握手:服務器收到客戶端的SYN報文段,須要對這個SYN報文段進行確認,設置Acknowledgment Number爲x+1(Sequence Number+1);同時,本身本身還要發送SYN請求信息,將SYN位置爲1,Sequence Number爲y;服務器端將上述全部信息放到一個報文段(即SYN+ACK報文段)中,一併發送給客戶端,此時服務器進入SYN_RECV
狀態;
第三次握手:客戶端收到服務器的SYN+ACK
報文段。而後將Acknowledgment Number設置爲y+1,向服務器發送ACK報文段,這個報文段發送完畢之後,客戶端和服務器端都進入ESTABLISHED
狀態,完成TCP三次握手。程序員
完成了三次握手,客戶端和服務器端就能夠開始傳送數據。以上就是TCP三次握手的整體介紹。通訊結束客戶端和服務端就斷開鏈接,須要通過四次分手確認。github
第一次分手:主機1(可使客戶端,也能夠是服務器端),設置Sequence Number和Acknowledgment Number,向主機2發送一個FIN報文段;此時,主機1進入FIN_WAIT_1
狀態;這表示主機1沒有數據要發送給主機2了;
第二次分手:主機2收到了主機1發送的FIN報文段,向主機1回一個ACK報文段,Acknowledgment Number爲Sequence Number加1;主機1進入FIN_WAIT_2
狀態;主機2告訴主機1,我「贊成」你的關閉請求;
第三次分手:主機2向主機1發送FIN報文段,請求關閉鏈接,同時主機2進入LAST_ACK
狀態;
第四次分手:主機1收到主機2發送的FIN報文段,向主機2發送ACK報文段,而後主機1進入TIME_WAIT
狀態;主機2收到主機1的ACK報文段之後,就關閉鏈接;此時,主機1等待2MSL後依然沒有收到回覆,則證實Server端已正常關閉,那好,主機1也能夠關閉鏈接了。shell
能夠看到一次tcp請求的創建及關閉至少進行7次通訊,這還不包過數據的通訊,而UDP不需3次握手和4次分手。數據庫
一、TCP是面向連接的,雖說網絡的不安全不穩定特性決定了多少次握手都不能保證鏈接的可靠性,但TCP的三次握手在最低限度上(實際上也很大程度上保證了)保證了鏈接的可靠性;而UDP不是面向鏈接的,UDP傳送數據前並不與對方創建鏈接,對接收到的數據也不發送確認信號,發送端不知道數據是否會正確接收,固然也不用重發,因此說UDP是無鏈接的、不可靠的一種數據傳輸協議。
二、也正因爲1所說的特色,使得UDP的開銷更小數據傳輸速率更高,由於沒必要進行收發數據的確認,因此UDP的實時性更好。知道了TCP和UDP的區別,就不難理解爲什麼採用TCP傳輸協議的MSN比採用UDP的QQ傳輸文件慢了,但並不能說QQ的通訊是不安全的,由於程序員能夠手動對UDP的數據收發進行驗證,好比發送方對每一個數據包進行編號而後由接收方進行驗證啊什麼的,即便是這樣,UDP由於在底層協議的封裝上沒有采用相似TCP的「三次握手」而實現了TCP所沒法達到的傳輸效率。npm
關於傳輸層咱們會常常聽到一些問題
1.TCP服務器最大併發鏈接數是多少?
關於TCP服務器最大併發鏈接數有一種誤解就是「由於端口號上限爲65535,因此TCP服務器理論上的可承載的最大併發鏈接數也是65535」。首先須要理解一條TCP鏈接的組成部分:客戶端IP、客戶端端口、服務端IP、服務端端口。因此對於TCP服務端進程來講,他能夠同時鏈接的客戶端數量並不受限於可用端口號,理論上一個服務器的一個端口能創建的鏈接數是全球的IP數*每臺機器的端口數
。實際併發鏈接數受限於linux可打開文件數,這個數是能夠配置的,能夠很是大,因此實際上受限於系統性能。經過#ulimit -n
查看服務的最大文件句柄數,經過ulimit -n xxx
修改 xxx是你想要能打開的數量。也能夠經過修改系統參數:
#vi /etc/security/limits.conf * soft nofile 65536 * hard nofile 65536
2.爲何TIME_WAIT
狀態還須要等2MSL
後才能返回到CLOSED
狀態?
這是由於雖然雙方都贊成關閉鏈接了,並且握手的4個報文也都協調和發送完畢,按理能夠直接回到CLOSED狀態(就比如從SYN_SEND
狀態到ESTABLISH
狀態那樣);可是由於咱們必需要假想網絡是不可靠的,你沒法保證你最後發送的ACK報文會必定被對方收到,所以對方處於LAST_ACK
狀態下的Socket
可能會由於超時未收到ACK
報文,而重發FIN
報文,因此這個TIME_WAIT
狀態的做用就是用來重發可能丟失的ACK
報文。
3.TIME_WAIT狀態還須要等2MSL後才能返回到CLOSED狀態會產生什麼問題
通訊雙方創建TCP鏈接後,主動關閉鏈接的一方就會進入TIME_WAIT
狀態,TIME_WAIT
狀態維持時間是兩個MSL時間長度,也就是在1-4分鐘,Windows操做系統就是4分鐘。進入TIME_WAIT
狀態的通常狀況下是客戶端,一個TIME_WAIT
狀態的鏈接就佔用了一個本地端口。一臺機器上端口號數量的上限是65536個,若是在同一臺機器上進行壓力測試模擬上萬的客戶請求,而且循環與服務端進行短鏈接通訊,那麼這臺機器將產生4000個左右的TIME_WAIT
Socket,後續的短鏈接就會產生address already in use : connect
的異常,若是使用Nginx
做爲方向代理也須要考慮TIME_WAIT
狀態,發現系統存在大量TIME_WAIT
狀態的鏈接,經過調整內核參數解決。
vi /etc/sysctl.conf
編輯文件,加入如下內容:
net.ipv4.tcp_syncookies = 1 net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 net.ipv4.tcp_fin_timeout = 30
而後執行 /sbin/sysctl -p
讓參數生效。
net.ipv4.tcp_syncookies = 1 表示開啓SYN Cookies。當出現SYN等待隊列溢出時,啓用cookies來處理,可防範少許SYN攻擊,默認爲0,表示關閉;
net.ipv4.tcp_tw_reuse = 1 表示開啓重用。容許將TIME-WAIT sockets從新用於新的TCP鏈接,默認爲0,表示關閉;
net.ipv4.tcp_tw_recycle = 1 表示開啓TCP鏈接中TIME-WAIT sockets的快速回收,默認爲0,表示關閉。
net.ipv4.tcp_fin_timeout 修改系統默認的TIMEOUT
時間
關於TCP/IP和HTTP協議的關係,網絡有一段比較容易理解的介紹:「咱們在傳輸數據時,能夠只使用(傳輸層)TCP/IP協議,可是那樣的話,若是沒有應用層,便沒法識別數據內容。若是想要使傳輸的數據有意義,則必須使用到應用層協議。應用層協議有不少,好比HTTP、FTP、TELNET等,也能夠本身定義應用層協議。
HTTP協議即超文本傳送協議(Hypertext Transfer Protocol ),是Web聯網的基礎,也是手機聯網經常使用的協議之一,WEB使用HTTP協議做應用層協議,以封裝HTTP文本信息,而後使用TCP/IP作傳輸層協議將它發到網絡上。
因爲HTTP在每次請求結束後都會主動釋放鏈接,所以HTTP鏈接是一種「短鏈接」,要保持客戶端程序的在線狀態,須要不斷地向服務器發起鏈接請求。一般 的作法是即時不須要得到任何數據,客戶端也保持每隔一段固定的時間向服務器發送一次「保持鏈接」的請求,服務器在收到該請求後對客戶端進行回覆,代表知道 客戶端「在線」。若服務器長時間沒法收到客戶端的請求,則認爲客戶端「下線」,若客戶端長時間沒法收到服務器的回覆,則認爲網絡已經斷開。
下面是一個簡單的HTTP Post application/json數據內容的請求:
POST HTTP/1.1 Host: 127.0.0.1:9017 Content-Type: application/json Cache-Control: no-cache {"a":"a"}
如今咱們瞭解到TCP/IP只是一個協議棧,就像操做系統的運行機制同樣,必需要具體實現,同時還要提供對外的操做接口。就像操做系統會提供標準的編程接口,好比Win32編程接口同樣,TCP/IP也必須對外提供編程接口,這就是Socket。如今咱們知道,Socket跟TCP/IP並無必然的聯繫。Socket編程接口在設計的時候,就但願也能適應其餘的網絡協議。因此,Socket的出現只是能夠更方便的使用TCP/IP協議棧而已,其對TCP/IP進行了抽象,造成了幾個最基本的函數接口。好比create,listen,accept,connect,read和write等等。
不一樣語言都有對應的創建Socket服務端和客戶端的庫,下面舉例Nodejs如何建立服務端和客戶端:
服務端:
const net = require('net'); const server = net.createServer(); server.on('connection', (client) => { client.write('Hi!\n'); // 服務端向客戶端輸出信息,使用 write() 方法 client.write('Bye!\n'); //client.end(); // 服務端結束該次會話 }); server.listen(9000);
服務監聽9000端口
下面使用命令行發送http請求和telnet
$ curl http://127.0.0.1:9000 Bye! $telnet 127.0.0.1 9000 Trying 192.168.1.21... Connected to 192.168.1.21. Escape character is '^]'. Hi! Bye! Connection closed by foreign host.
注意到curl只處理了一次報文。
客戶端
const client = new net.Socket(); client.connect(9000, '127.0.0.1', function () { }); client.on('data', (chunk) => { console.log('data', chunk.toString()) //data Hi! //Bye! });
所謂長鏈接,指在一個TCP鏈接上能夠連續發送多個數據包,在TCP鏈接保持期間,若是沒有數據包發送,須要雙方發檢測包以維持此鏈接(心跳包),通常須要本身作在線維持。 短鏈接是指通訊雙方有數據交互時,就創建一個TCP鏈接,數據發送完成後,則斷開此TCP鏈接。好比Http的,只是鏈接、請求、關閉,過程時間較短,服務器如果一段時間內沒有收到請求便可關閉鏈接。其實長鏈接是相對於一般的短鏈接而說的,也就是長時間保持客戶端與服務端的鏈接狀態。
一般的短鏈接操做步驟是:
鏈接→數據傳輸→關閉鏈接;
而長鏈接一般就是:
鏈接→數據傳輸→保持鏈接(心跳)→數據傳輸→保持鏈接(心跳)→……→關閉鏈接;
何時用長鏈接,短鏈接?
長鏈接多用於操做頻繁,點對點的通信,並且鏈接數不能太多狀況,。每一個TCP鏈接都須要三步握手,這須要時間,若是每一個操做都是先鏈接,再操做的話那麼處理 速度會下降不少,因此每一個操做完後都不斷開,次處理時直接發送數據包就OK了,不用創建TCP鏈接。例如:數據庫的鏈接用長鏈接, 若是用短鏈接頻繁的通訊會形成Socket錯誤,並且頻繁的Socket建立也是對資源的浪費。
什麼是心跳包爲何須要:
心跳包就是在客戶端和服務端間定時通知對方本身狀態的一個本身定義的命令字,按照必定的時間間隔發送,相似於心跳,因此叫作心跳包。網絡中的接收和發送數據都是使用Socket進行實現。可是若是此套接字已經斷開(好比一方斷網了),那發送數據和接收數據的時候就必定會有問題。但是如何判斷這個套接字是否還可使用呢?這個就須要在系統中建立心跳機制。其實TCP中已經爲咱們實現了一個叫作心跳的機制。若是你設置了心跳,那TCP就會在必定的時間(好比你設置的是3秒鐘)內發送你設置的次數的心跳(好比說2次),而且此信息不會影響你本身定義的協議。也能夠本身定義,所謂「心跳」就是定時發送一個自定義的結構體(心跳包或心跳幀),讓對方知道本身「在線」,以確保連接的有效性。
實現:
服務端:
const net = require('net'); let clientList = []; const heartbeat = 'HEARTBEAT'; // 定義心跳包內容確保和平時發送的數據不會衝突 const server = net.createServer(); server.on('connection', (client) => { console.log('客戶端創建鏈接:', client.remoteAddress + ':' + client.remotePort); clientList.push(client); client.on('data', (chunk) => { let content = chunk.toString(); if (content === heartbeat) { console.log('收到客戶端發過來的一個心跳包'); } else { console.log('收到客戶端發過來的數據:', content); client.write('服務端的數據:' + content); } }); client.on('end', () => { console.log('收到客戶端end'); clientList.splice(clientList.indexOf(client), 1); }); client.on('error', () => { clientList.splice(clientList.indexOf(client), 1); }) }); server.listen(9000); setInterval(broadcast, 10000); // 定時發送心跳包 function broadcast() { console.log('broadcast heartbeat', clientList.length); let cleanup = [] for (let i=0;i<clientList.length;i+=1) { if (clientList[i].writable) { // 先檢查 sockets 是否可寫 clientList[i].write(heartbeat); } else { console.log('一個無效的客戶端'); cleanup.push(clientList[i]); // 若是不可寫,收集起來銷燬。銷燬以前要 Socket.destroy() 用 API 的方法銷燬。 clientList[i].destroy(); } } //Remove dead Nodes out of write loop to avoid trashing loop index for (let i=0; i<cleanup.length; i+=1) { console.log('刪除無效的客戶端:', cleanup[i].name); clientList.splice(clientList.indexOf(cleanup[i]), 1); } }
服務端輸出結果:
客戶端創建鏈接: ::ffff:127.0.0.1:57125 broadcast heartbeat 1 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:15 GMT 收到客戶端發過來的一個心跳包 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:20 GMT broadcast heartbeat 1 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:45:25 GMT 收到客戶端發過來的一個心跳包 客戶端創建鏈接: ::ffff:127.0.0.1:57129 收到客戶端發過來的一個心跳包 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:00 GMT 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:04 GMT broadcast heartbeat 2 收到客戶端發過來的數據: Thu, 29 Mar 2018 03:46:05 GMT 收到客戶端發過來的一個心跳包
客戶端代碼:
const net = require('net'); const heartbeat = 'HEARTBEAT'; const client = new net.Socket(); client.connect(9000, '127.0.0.1', () => {}); client.on('data', (chunk) => { let content = chunk.toString(); if (content === heartbeat) { console.log('收到心跳包:', content); } else { console.log('收到數據:', content); } }); // 定時發送數據 setInterval(() => { console.log('發送數據', new Date().toUTCString()); client.write(new Date().toUTCString()); }, 5000); // 定時發送心跳包 setInterval(function () { client.write(heartbeat); }, 10000);
客戶端輸出結果:
發送數據 Thu, 29 Mar 2018 03:46:04 GMT 收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:04 GMT 收到心跳包: HEARTBEAT 發送數據 Thu, 29 Mar 2018 03:46:09 GMT 收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:09 GMT 發送數據 Thu, 29 Mar 2018 03:46:14 GMT 收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:14 GMT 收到心跳包: HEARTBEAT 發送數據 Thu, 29 Mar 2018 03:46:19 GMT 收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:19 GMT 發送數據 Thu, 29 Mar 2018 03:46:24 GMT 收到數據: 服務端的數據:Thu, 29 Mar 2018 03:46:24 GMT 收到心跳包: HEARTBEAT
若是想要使傳輸的數據有意義,則必須使用到應用層協議好比Http、Mqtt、Dubbo等。基於TCP協議上自定義本身的應用層的協議須要解決的幾個問題:
下面咱們就一塊兒來定義本身的協議,並編寫服務的和客戶端進行調用:
定義報文頭格式: length:000000000xxxx
; xxxx表明數據的長度,總長度20,舉例子不嚴謹。
數據表的格式: Json
服務端:
const net = require('net'); const server = net.createServer(); let clientList = []; const heartBeat = 'HeartBeat'; // 定義心跳包內容確保和平時發送的數據不會衝突 const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } server.on('connection', (client) => { client.name = client.remoteAddress + ':' + client.remotePort // client.write('Hi ' + client.name + '!\n'); console.log('客戶端創建鏈接', client.name); clientList.push(client) let chunks = []; let length = 0; client.on('data', (chunk) => { let content = chunk.toString(); console.log("content:", content, content.length); if (content === heartBeat) { console.log('收到客戶端發過來的一個心跳包'); } else { if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length) if (heap.length >= length) { try { console.log('收到數據', JSON.parse(heap.toString())); let data = '服務端的數據數據:' + heap.toString();; let dataBuff = Buffer.from(JSON.stringify(data)); let header = getHeader(dataBuff.length) client.write(header); client.write(dataBuff); } catch (err) { console.log('數據解析失敗'); } } } }) client.on('end', () => { console.log('收到客戶端end'); clientList.splice(clientList.indexOf(client), 1); }); client.on('error', () => { clientList.splice(clientList.indexOf(client), 1); }) }); server.listen(9000); setInterval(broadcast, 10000); // 定時檢查客戶端 併發送心跳包 function broadcast() { console.log('broadcast heartbeat', clientList.length); let cleanup = [] for(var i=0;i<clientList.length;i+=1) { if(clientList[i].writable) { // 先檢查 sockets 是否可寫 // clientList[i].write(heartBeat); // 發送心跳數據 } else { console.log('一個無效的客戶端') cleanup.push(clientList[i]) // 若是不可寫,收集起來銷燬。銷燬以前要 Socket.destroy() 用 API 的方法銷燬。 clientList[i].destroy(); } } // 刪除無效的客戶端 for(i=0; i<cleanup.length; i+=1) { console.log('刪除無效的客戶端:', cleanup[i].name); clientList.splice(clientList.indexOf(cleanup[i]), 1) } }
日誌打印:
客戶端創建鏈接 ::ffff:127.0.0.1:50178 content: length:0000000000031 20 length 31 heap.length 0 content: "Tue, 03 Apr 2018 06:12:37 GMT" 31 heap.length 31 收到數據 Tue, 03 Apr 2018 06:12:37 GMT broadcast heartbeat 1 content: HeartBeat 9 收到客戶端發過來的一個心跳包 content: length:0000000000031"Tue, 03 Apr 2018 06:12:42 GMT" 51 length 31 heap.length 31 收到數據 Tue, 03 Apr 2018 06:12:42 GMT
客戶端
const net = require('net'); const client = new net.Socket(); const heartBeat = 'HeartBeat'; // 定義心跳包內容確保和平時發送的數據不會衝突 const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } client.connect(9000, '127.0.0.1', function () {}); let chunks = []; let length = 0; client.on('data', (chunk) => { let content = chunk.toString(); console.log("content:", content, content.length); if (content === heartBeat) { console.log('收到服務端發過來的一個心跳包'); } else { if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length) if (heap.length >= length) { try { console.log('收到數據', JSON.parse(heap.toString())); } catch (err) { console.log('數據解析失敗'); } } } }); // 定時發送數據 setInterval(function () { let data = new Date().toUTCString(); let dataBuff = Buffer.from(JSON.stringify(data)); let header =getHeader(dataBuff.length); client.write(header); client.write(dataBuff); }, 5000); // 定時發送心跳包 setInterval(function () { client.write(heartBeat); }, 10000);
日誌打印:
content: length:0000000000060 20 length 60 heap.length 0 content: "服務端的數據數據:\"Tue, 03 Apr 2018 06:12:37 GMT\"" 44 heap.length 60 收到數據 服務端的數據數據:"Tue, 03 Apr 2018 06:12:37 GMT" content: length:0000000000060"服務端的數據數據:\"Tue, 03 Apr 2018 06:12:42 GMT\"" 64 length 60 heap.length 60 收到數據 服務端的數據數據:"Tue, 03 Apr 2018 06:12:42 GMT"
客戶端定時發送自定義協議數據到服務端,先發送頭數據,在發送內容數據,另一個定時器發送心跳數據,服務端判斷是心跳數據,再判斷是否是頭數據,再是內容數據,而後解析後再發送數據給客戶端。從日誌的打印能夠看出客戶端前後write
header
和data
數據,服務端可能在一個data
事件裏面接收到。
這裏能夠看到一個客戶端在同一個時間內處理一個請求能夠很好的工做,可是想象這麼一個場景,若是同一時間內讓同一個客戶端去屢次調用服務端請求,發送屢次頭數據和內容數據,服務端的data事件收到的數據就很難區別哪些數據是哪次請求的,好比兩次頭數據同時到達服務端,服務端就會忽略其中一次,然後面的內容數據也不必定就對應於這個頭的。因此想複用長鏈接並能很好的高併發處理服務端請求,就須要鏈接池這種方式了。
什麼是Socket鏈接池,池的概念能夠聯想到是一種資源的集合,因此Socket鏈接池,就是維護着必定數量Socket長鏈接的集合。它能自動檢測Socket長鏈接的有效性,剔除無效的鏈接,補充鏈接池的長鏈接的數量。從代碼層次上實際上是人爲實現這種功能的類,通常一個鏈接池包含下面幾個屬性:
場景: 一個請求過來,首先去資源池要求獲取一個長鏈接資源,若是空閒隊列裏面有長鏈接,就獲取到這個長鏈接Socket,並把這個Socket移到正在運行的長鏈接隊列。若是空閒隊列裏面沒有,且正在運行的隊列長度小於配置的鏈接池資源的數量,就新建一個長鏈接到正在運行的隊列去,若是正在運行的不下於配置的資源池長度,則這個請求進入到等待隊列去。當一個正在運行的Socket完成了請求,就從正在運行的隊列移到空閒的隊列,並觸發等待請求隊列去獲取空閒資源,若是有等待的狀況。
這裏簡單介紹Nodejs的Socket鏈接池generic-pool模塊的源碼。
主要文件目錄結構
. |————lib ------------------------- 代碼庫 | |————DefaultEvictor.js ---------- | |————Deferred.js ---------------- | |————Deque.js ------------------- | |————DequeIterator.js ----------- | |————DoublyLinkedList.js -------- | |————DoublyLinkedListIterator.js- | |————factoryValidator.js -------- | |————Pool.js -------------------- 鏈接池主要代碼 | |————PoolDefaults.js ------------ | |————PooledResource.js ---------- | |————Queue.js ------------------- 隊列 | |————ResourceLoan.js ------------ | |————ResourceRequest.js --------- | |————utils.js ------------------- 工具 |————test ------------------------- 測試目錄 |————README.md ------------------- 項目描述文件 |————.eslintrc ------------------- eslint靜態檢查配置文件 |————.eslintignore --------------- eslint靜態檢查忽略的文件 |————package.json ----------------- npm包依賴配置
下面介紹庫的使用:
'use strict'; const net = require('net'); const genericPool = require('generic-pool'); function createPool(conifg) { let options = Object.assign({ fifo: true, // 是否優先使用老的資源 priorityRange: 1, // 優先級 testOnBorrow: true, // 是否開啓獲取驗證 // acquireTimeoutMillis: 10 * 1000, // 獲取的超時時間 autostart: true, // 自動初始化和釋放調度啓用 min: 10, // 初始化鏈接池保持的長鏈接最小數量 max: 0, // 最大鏈接池保持的長鏈接數量 evictionRunIntervalMillis: 0, // 資源釋放檢驗間隔檢查 設置了下面幾個參數才起效果 numTestsPerEvictionRun: 3, // 每次釋放資源數量 softIdleTimeoutMillis: -1, // 可用的超過了最小的min 且空閒時間時間 達到釋放 idleTimeoutMillis: 30000 // 強制釋放 // maxWaitingClients: 50 // 最大等待 }, conifg.options); const factory = { create: function () { return new Promise((resolve, reject) => { let socket = new net.Socket(); socket.setKeepAlive(true); socket.connect(conifg.port, conifg.host); // TODO 心跳包的處理邏輯 socket.on('connect', () => { console.log('socket_pool', conifg.host, conifg.port, 'connect' ); resolve(socket); }); socket.on('close', (err) => { // 先end 事件再close事件 console.log('socket_pool', conifg.host, conifg.port, 'close', err); }); socket.on('error', (err) => { console.log('socket_pool', conifg.host, conifg.port, 'error', err); reject(err); }); }); }, //銷燬鏈接 destroy: function (socket) { return new Promise((resolve) => { socket.destroy(); // 不會觸發end 事件 第一次會觸發發close事件 若是有message會觸發error事件 resolve(); }); }, validate: function (socket) { //獲取資源池校驗資源有效性 return new Promise((resolve) => { // console.log('socket.destroyed:', socket.destroyed, 'socket.readable:', socket.readable, 'socket.writable:', socket.writable); if (socket.destroyed || !socket.readable || !socket.writable) { return resolve(false); } else { return resolve(true); } }); } }; const pool = genericPool.createPool(factory, options); pool.on('factoryCreateError', (err) => { // 監聽新建長鏈接出錯 讓請求直接返回錯誤 const clientResourceRequest = pool._waitingClientsQueue.dequeue(); if (clientResourceRequest) { clientResourceRequest.reject(err); } }); return pool; }; let pool = createPool({ port: 9000, host: '127.0.0.1', options: {min: 0, max: 10} });
下面鏈接池的使用,使用的協議是咱們以前自定義的協議。
let pool = createPool({ port: 9000, host: '127.0.0.1', options: {min: 0, max: 10} }); const getHeader = (num) => { return 'length:' + (Array(13).join(0) + num).slice(-13); } const request = async (requestDataBuff) => { let client; try { client = await pool.acquire(); } catch (e) { console.log('acquire socket client failed: ', e); throw e; } let timeout = 10000; return new Promise((resolve, reject) => { let chunks = []; let length = 0; client.setTimeout(timeout); client.removeAllListeners('error'); client.on('error', (err) => { client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); pool.destroyed(client); reject(err); }); client.on('timeout', () => { client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); // 應該銷燬以防下一個req的data事件監聽才返回數據 pool.destroy(client); // pool.release(client); reject(`socket connect timeout set ${timeout}`); }); let header = getHeader(requestDataBuff.length); client.write(header); client.write(requestDataBuff); client.on('data', (chunk) => { let content = chunk.toString(); console.log('content', content, content.length); // TODO 過濾心跳包 if (content.indexOf('length:') === 0){ length = parseInt(content.substring(7,20)); console.log('length', length); chunks =[chunk.slice(20, chunk.length)]; } else { chunks.push(chunk); } let heap = Buffer.concat(chunks); console.log('heap.length', heap.length); if (heap.length >= length) { pool.release(client); client.removeAllListeners('error'); client.removeAllListeners('data'); client.removeAllListeners('timeout'); try { // console.log('收到數據', JSON.parse(heap.toString())); resolve(JSON.parse(heap.toString())); } catch (err) { reject(err); console.log('數據解析失敗'); } } }); }); } request(Buffer.from(JSON.stringify({a: 'a'}))) .then((data) => { console.log('收到服務的數據',data) }).catch(err => { console.log(err); }); request(Buffer.from(JSON.stringify({b: 'b'}))) .then((data) => { console.log('收到服務的數據',data) }).catch(err => { console.log(err); }); setTimeout(function () { //查看是否會複用Socket 有沒有創建新的鏈接 request(Buffer.from(JSON.stringify({c: 'c'}))) .then((data) => { console.log('收到服務的數據',data) }).catch(err => { console.log(err); }); request(Buffer.from(JSON.stringify({d: 'd'}))) .then((data) => { console.log('收到服務的數據',data) }).catch(err => { console.log(err); }); }, 1000)
日誌打印:
socket_pool 127.0.0.1 9000 connect socket_pool 127.0.0.1 9000 connect content length:0000000000040"服務端的數據數據:{\"a\":\"a\"}" 44 length 40 heap.length 40 收到服務的數據 服務端的數據數據:{"a":"a"} content length:0000000000040"服務端的數據數據:{\"b\":\"b\"}" 44 length 40 heap.length 40 收到服務的數據 服務端的數據數據:{"b":"b"} content length:0000000000040 20 length 40 heap.length 0 content "服務端的數據數據:{\"c\":\"c\"}" 24 heap.length 40 收到服務的數據 服務端的數據數據:{"c":"c"} content length:0000000000040"服務端的數據數據:{\"d\":\"d\"}" 44 length 40 heap.length 40 收到服務的數據 服務端的數據數據:{"d":"d"}
這裏看到前面兩個請求都創建了新的Socket鏈接 socket_pool 127.0.0.1 9000 connect
,定時器結束後從新發起兩個請求就沒有創建新的Socket鏈接了,直接從鏈接池裏面獲取Socket鏈接資源。
發現主要的代碼就位於lib文件夾中的Pool.js
構造函數:lib/Pool.js
/** * Generate an Object pool with a specified `factory` and `config`. * * @param {typeof DefaultEvictor} Evictor * @param {typeof Deque} Deque * @param {typeof PriorityQueue} PriorityQueue * @param {Object} factory * Factory to be used for generating and destroying the items. * @param {Function} factory.create * Should create the item to be acquired, * and call it's first callback argument with the generated item as it's argument. * @param {Function} factory.destroy * Should gently close any resources that the item is using. * Called before the items is destroyed. * @param {Function} factory.validate * Test if a resource is still valid .Should return a promise that resolves to a boolean, true if resource is still valid and false * If it should be removed from pool. * @param {Object} options */ constructor(Evictor, Deque, PriorityQueue, factory, options) { super(); factoryValidator(factory); // 檢驗咱們定義的factory的有效性包含create destroy validate this._config = new PoolOptions(options); // 鏈接池配置 // TODO: fix up this ugly glue-ing this._Promise = this._config.Promise; this._factory = factory; this._draining = false; this._started = false; /** * Holds waiting clients * @type {PriorityQueue} */ this._waitingClientsQueue = new PriorityQueue(this._config.priorityRange); // 請求的對象管管理隊列queue 初始化queue的size 1 { _size: 1, _slots: [ Queue { _list: [Object] } ] } /** * Collection of promises for resource creation calls made by the pool to factory.create * @type {Set} */ this._factoryCreateOperations = new Set(); // 正在建立的長鏈接 /** * Collection of promises for resource destruction calls made by the pool to factory.destroy * @type {Set} */ this._factoryDestroyOperations = new Set(); // 正在銷燬的長鏈接 /** * A queue/stack of pooledResources awaiting acquisition * TODO: replace with LinkedList backed array * @type {Deque} */ this._availableObjects = new Deque(); // 空閒的資源長鏈接 /** * Collection of references for any resource that are undergoing validation before being acquired * @type {Set} */ this._testOnBorrowResources = new Set(); // 正在檢驗有效性的資源 /** * Collection of references for any resource that are undergoing validation before being returned * @type {Set} */ this._testOnReturnResources = new Set(); /** * Collection of promises for any validations currently in process * @type {Set} */ this._validationOperations = new Set();// 正在校驗的中間temp /** * All objects associated with this pool in any state (except destroyed) * @type {Set} */ this._allObjects = new Set(); // 全部的連接資源 是一個 PooledResource對象 /** * Loans keyed by the borrowed resource * @type {Map} */ this._resourceLoans = new Map(); // 被借用的對象的map release的時候用到 /** * Infinitely looping iterator over available object * @type {DequeIterator} */ this._evictionIterator = this._availableObjects.iterator(); // 一個迭代器 this._evictor = new Evictor(); /** * handle for setTimeout for next eviction run * @type {(number|null)} */ this._scheduledEviction = null; // create initial resources (if factory.min > 0) if (this._config.autostart === true) { // 初始化最小的鏈接數量 this.start(); } }
能夠看到包含以前說的空閒的資源隊列,正在請求的資源隊列,正在等待的請求隊列等。
下面查看 Pool.acquire 方法lib/Pool.js
/** * Request a new resource. The callback will be called, * when a new resource is available, passing the resource to the callback. * TODO: should we add a seperate "acquireWithPriority" function * * @param {Number} [priority=0] * Optional. Integer between 0 and (priorityRange - 1). Specifies the priority * of the caller if there are no available resources. Lower numbers mean higher * priority. * * @returns {Promise} */ acquire(priority) { // 空閒資源隊列資源是有優先等級的 if (this._started === false && this._config.autostart === false) { this.start(); // 會在this._allObjects 添加min的鏈接對象數 } if (this._draining) { // 若是是在資源釋放階段就不能再請求資源了 return this._Promise.reject( new Error("pool is draining and cannot accept work") ); } // 若是要設置了等待隊列的長度且要等待 若是超過了就返回資源不可獲取 // TODO: should we defer this check till after this event loop incase "the situation" changes in the meantime if ( this._config.maxWaitingClients !== undefined && this._waitingClientsQueue.length >= this._config.maxWaitingClients ) { return this._Promise.reject( new Error("max waitingClients count exceeded") ); } const resourceRequest = new ResourceRequest( this._config.acquireTimeoutMillis, // 對象裏面的超時配置 表示等待時間 會啓動一個定時 超時了就觸發resourceRequest.promise 的reject觸發 this._Promise ); // console.log(resourceRequest) this._waitingClientsQueue.enqueue(resourceRequest, priority); // 請求進入等待請求隊列 this._dispense(); // 進行資源分發 最終會觸發resourceRequest.promise的resolve(client) return resourceRequest.promise; // 返回的是一個promise對象resolve倒是在其餘地方觸發 }
/** * Attempt to resolve an outstanding resource request using an available resource from * the pool, or creating new ones * * @private */ _dispense() { /** * Local variables for ease of reading/writing * these don't (shouldn't) change across the execution of this fn */ const numWaitingClients = this._waitingClientsQueue.length; // 正在等待的請求的隊列長度 各個優先級的總和 console.log('numWaitingClients', numWaitingClients) // 1 // If there aren't any waiting requests then there is nothing to do // so lets short-circuit if (numWaitingClients < 1) { return; } // max: 10, min: 4 console.log('_potentiallyAllocableResourceCount', this._potentiallyAllocableResourceCount) // 目前潛在空閒可用的鏈接數量 const resourceShortfall = numWaitingClients - this._potentiallyAllocableResourceCount; // 還差幾個可用的 小於零表示不須要 大於0表示須要新建長鏈接的數量 console.log('spareResourceCapacity', this.spareResourceCapacity) // 距離max數量的還有幾個沒有建立 const actualNumberOfResourcesToCreate = Math.min( this.spareResourceCapacity, // -6 resourceShortfall // 這個是 -3 ); // 若是resourceShortfall>0 表示須要新建可是這新建的數量不能超過spareResourceCapacity最多可建立的 console.log('actualNumberOfResourcesToCreate', actualNumberOfResourcesToCreate) // 若是actualNumberOfResourcesToCreate >0 表示須要建立鏈接 for (let i = 0; actualNumberOfResourcesToCreate > i; i++) { this._createResource(); // 新增新的長鏈接 } // If we are doing test-on-borrow see how many more resources need to be moved into test // to help satisfy waitingClients if (this._config.testOnBorrow === true) { // 若是開啓了使用前校驗資源的有效性 // how many available resources do we need to shift into test const desiredNumberOfResourcesToMoveIntoTest = numWaitingClients - this._testOnBorrowResources.size;// 1 const actualNumberOfResourcesToMoveIntoTest = Math.min( this._availableObjects.length, // 3 desiredNumberOfResourcesToMoveIntoTest // 1 ); for (let i = 0; actualNumberOfResourcesToMoveIntoTest > i; i++) { // 須要有效性校驗的數量 至少知足最小的waiting clinet this._testOnBorrow(); // 資源有效校驗後再分發 } } // if we aren't testing-on-borrow then lets try to allocate what we can if (this._config.testOnBorrow === false) { // 若是沒有開啓有效性校驗 就開啓有效資源的分發 const actualNumberOfResourcesToDispatch = Math.min( this._availableObjects.length, numWaitingClients ); for (let i = 0; actualNumberOfResourcesToDispatch > i; i++) { // 開始分發資源 this._dispatchResource(); } } }
/** * Attempt to move an available resource to a waiting client * @return {Boolean} [description] */ _dispatchResource() { if (this._availableObjects.length < 1) { return false; } const pooledResource = this._availableObjects.shift(); // 從能夠資源池裏面取出一個 this._dispatchPooledResourceToNextWaitingClient(pooledResource); // 分發 return false; } /** * Dispatches a pooledResource to the next waiting client (if any) else * puts the PooledResource back on the available list * @param {PooledResource} pooledResource [description] * @return {Boolean} [description] */ _dispatchPooledResourceToNextWaitingClient(pooledResource) { const clientResourceRequest = this._waitingClientsQueue.dequeue(); // 多是undefined 取出一個等待的quene console.log('clientResourceRequest.state', clientResourceRequest.state); if (clientResourceRequest === undefined || clientResourceRequest.state !== Deferred.PENDING) { console.log('沒有等待的') // While we were away either all the waiting clients timed out // or were somehow fulfilled. put our pooledResource back. this._addPooledResourceToAvailableObjects(pooledResource); // 在可用的資源裏面添加一個 // TODO: do need to trigger anything before we leave? return false; } // TODO clientResourceRequest 的state是否須要判斷 若是已是resolve的狀態 已經超時回去了 這個是否有問題 const loan = new ResourceLoan(pooledResource, this._Promise); this._resourceLoans.set(pooledResource.obj, loan); // _resourceLoans 是個map k=>value pooledResource.obj 就是socket自己 pooledResource.allocate(); // 標識資源的狀態是正在被使用 clientResourceRequest.resolve(pooledResource.obj); // acquire方法返回的promise對象的resolve在這裏執行的 return true; }
上面的代碼就按種狀況一直走下到最終獲取到長鏈接的資源,其餘更多代碼你們能夠本身去深刻了解。