寫做不易,未經做者容許禁止以任何形式轉載!
若是以爲文章不錯,歡迎關注、點贊和分享!
持續分享技術博文,關注微信公衆號 👉🏻 前端LeBron
字節跳動校招進行中,校招內推碼: 4FCV6BV 遊戲部門前端團隊可私聊直推javascript
簡介html
負載均衡,含義就是根據必定算法將負載(工做任務)進行平衡,分攤到多個操做單元上運行、執行,常見的爲Web服務器、企業核心應用服務器和其餘主要任務服務器等,從而協同完成工做任務。負載均衡在原有的網絡結構上提供了一種透明且有效的的方法擴展服務器和網絡設備的帶寬、增強網絡數據處理能力、增長吞吐量、提升網絡的可用性和靈活性,同時承受住更大的併發量級。前端
簡單來講就是將大量的併發請求處理轉發給多個後端節點處理,減小工做響應時間。java
四層即OSI七層模型中的傳輸層,有TCP、UDP協議,這兩種協議中包含源IP、目標IP之外,還包含源端口號及目標端口號。四層負載均衡在接收到客戶端請求後,經過修改報文的地址信息(IP + PORT)將流量轉發到應用服務器。node
代理負載均衡ios
七層即OSI七層模型中的應用層,應用層協議較多,經常使用的爲HTTP/HTTPS。七層負載均衡能夠給予這些協議來負載。這些應用層協議中會包含不少有意義的內容。好比同一個Web服務器的負載均衡,除了根據IP + PORT進行負載均衡,還能夠根據七層的URL、Cookie、瀏覽器類別、語言、請求類型來決定。git
四層負載均衡的本質是轉發,七層負載均衡的本質是內容交換和代理。github
四層負載均衡 | 七層負載均衡 | |
---|---|---|
基於 | IP + PORT | URL 或 主機IP |
相似 | 路由器 | 代理服務器 |
複雜度 | 低 | 高 |
性能 | 高,無需解析內容 | 中,需算法識別URL Header、Cookie等 |
安全性 | 低,沒法識別DDoS攻擊 | 高,可防護SYN Flood攻擊 |
擴展功能 | 無 | 內容緩存、圖片防盜鏈等 |
前置數據結構web
interface urlObj{
url:string,
weight:number // 僅在權重輪詢時生效
}
urlDesc: urlObj[]
interface urlCollectObj{
count: number, // 鏈接數
costTime: number, // 響應時間
connection: number, // 實時鏈接數
}
urlCollect: urlCollectObj[]
複製代碼
隨機算法
const Random = (urlDesc) => {
let urlCollect = [];
// 收集url
urlDesc.forEach((val) => {
urlCollect.push(val.url);
});
return () => {
// 生成隨機數下標返回相應URL
const pos = parseInt(Math.random() * urlCollect.length);
return urlCollect[pos];
};
};
module.exports = Random;
複製代碼
權重輪詢算法
const WeiRoundRobin = (urlDesc) => {
let pos = 0,
urlCollect = [],
copyUrlDesc = JSON.parse(JSON.stringify(urlDesc));
// 根據權重收集url
while (copyUrlDesc.length > 0) {
for (let i = 0; i < copyUrlDesc.length; i++) {
urlCollect.push(copyUrlDesc[i].url);
copyUrlDesc[i].weight--;
if (copyUrlDesc[i].weight === 0) {
copyUrlDesc.splice(i, 1);
i--;
}
}
}
// 輪詢獲取URL函數
return () => {
const res = urlCollect[pos++];
if (pos === urlCollect.length) {
pos = 0;
}
return res;
};
};
module.exports = WeiRoundRobin;
複製代碼
源IP / URL Hash
const { Hash } = require("../util");
const IpHash = (urlDesc) => {
let urlCollect = [];
for (const key in urlDesc) {
// 收集url
urlCollect.push(urlDesc[key].url);
}
return (sourceInfo) => {
// 生成Hash十進制數值
const hashInfo = Hash(sourceInfo);
// 取餘爲下標
const urlPos = Math.abs(hashInfo) % urlCollect.length;
// 返回
return urlCollect[urlPos];
};
};
module.exports = IpHash;
複製代碼
一致性Hash
const { Hash } = require("../util");
const ConsistentHash = (urlDesc) => {
let urlHashMap = {},
hashCollect = [];
for (const key in urlDesc) {
// 收集urlHash進數組和生成HashMap
const { url } = urlDesc[key];
const hash = Hash(url);
urlHashMap[hash] = url;
hashCollect.push(hash);
}
// 將hash數組從小到大排序
hashCollect = hashCollect.sort((a, b) => a - b);
return (sourceInfo) => {
// 生成Hash十進制數值
const hashInfo = Hash(sourceInfo);
// 遍歷hash數組找到第一個比源信息hash值大的,並經過hashMap返回url
hashCollect.forEach((val) => {
if (val >= hashInfo) {
return urlHashMap[val];
}
});
// 沒找大則返回最大的
return urlHashMap[hashCollect[hashCollect.length - 1]];
};
};
module.exports = ConsistentHash;
複製代碼
最小鏈接數
const leastConnections = () => {
return (urlCollect) => {
let min = Number.POSITIVE_INFINITY,
url = "";
// 遍歷對象找到最少鏈接數的地址
for (let key in urlCollect) {
const val = urlCollect[key].connection;
if (val < min) {
min = val;
url = key;
}
}
// 返回
return url;
};
};
module.exports = leastConnections;
複製代碼
注:urlCollect爲負載均屬數據統計對象,有如下屬性
最小響應時間
const Fair = () => {
return (urlCollect) => {
let min = Number.POSITIVE_INFINITY,
url = "";
// 找到耗時最少的url
for (const key in urlCollect) {
const urlObj = urlCollect[key];
if (urlObj.costTime < min) {
min = urlObj.costTime;
url = key;
}
}
// 返回
return url;
};
};
module.exports = Fair;
複製代碼
看到這裏是否是感受算法都挺簡單的 🥱
期待一下模塊五的實現吧😏
健康監測即對應用服務器的健康監測,爲防止把請求轉發到異常的應用服務器上,應使用健康監測策略。應對不一樣的業務敏感程度,可相應調整策略和頻率。
PORT XX unreachable
的ICMP報錯信息,反之爲正常。Vrtual IP
在TCP / IP架構下,全部想上網的電腦,不論以何種形式連上網絡,都不須要有一個惟一的IP地址。事實上IP地址是主機硬件物理地址的一種抽象。
簡單來講地址分爲兩種
虛擬IP是一個未分配給真實主機的IP,也就是說對外提供的服務器的主機除了有一個真實IP還有一個虛IP,這兩個IP中的任意一個均可以鏈接到這臺主機。
虛擬IP通常用做達到高可用的目的,好比讓全部項目中的數據庫連接配置都是這個虛擬IP,當主服務器發生故障沒法對外提供服務時,動態將這個虛IP切換到備用服務器。
好比存在主機A(192.168.1.6)和主機B(192.168.1.8)。A做爲對外服務的主服務器,B做爲備份機器,兩臺服務器之間經過HeartBeat通訊。
即主服務器會定時給備份服務器發送數據包,告知主服務器正常,當備份服務器在規定時間內沒有收到主服務器的HeartBeat,會認爲主服務器宕機。
此時備份服務器就升級爲主服務器。
服務器B將本身的ARP緩存發送出去,告知路由器修改路由表,告知虛擬IP地址應該指向192.168.1.8.
這時外接再次訪問虛擬IP的時候,機器B就會變成主服務器,而A降級爲備份服務器。
這樣就完成了主從機器的切換,這一切對外都是無感知、透明的。
想手動實現一下負載均衡器 / 看看源碼的同窗均可以看看 👉🏻 代碼倉庫
編輯config.js後
npm run start
便可啓動均衡器和後端服務節點
const {ALGORITHM, BASE_URL} = require("./constant");
module.exports = {
urlDesc: [
{
url: `${BASE_URL}:${16666}`,
weight: 6,
},
{
url: `${BASE_URL}:${16667}`,
weight: 1,
},
{
url: `${BASE_URL}:${16668}`,
weight: 1,
},
{
url: `${BASE_URL}:${16669}`,
weight: 1,
},
{
url: `${BASE_URL}:${16670}`,
weight: 2,
},
{
url: `${BASE_URL}:${16671}`,
weight: 1,
},
{
url: `${BASE_URL}:${16672}`,
weight: 4,
},
],
port: 8080,
algorithm: ALGORITHM.RANDOM,
workerNum: 5,
balancerNum: 5,
workerFilePath:path.resolve(__dirname, "./worker.js")
}
複製代碼
初始化負載均衡統計對象balanceDataBase
運行均衡器
運行後端服務節點
const {urlDesc, balancerNum} = require("./config")
const cluster = require("cluster");
const path = require("path");
const cpusLen = require("os").cpus().length;
const {DataBase} = require("./util");
const {Worker} = require('worker_threads');
const runWorker = () => {
// 防止監聽端口數 > CPU核數
const urlObjArr = urlDesc.slice(0, cpusLen);
// 初始化建立子線程
for (let i = 0; i < urlObjArr.length; i++) {
createWorkerThread(urlObjArr[i].url);
}
}
const runBalancer = () => {
// 設置子進程執行文件
cluster.setupMaster({exec: path.resolve(__dirname, "./balancer.js")});
// 初始化建立子進程
let max
if (balancerNum) {
max = balancerNum > cpusLen ? cpusLen : balancerNum
} else {
max = 1
}
for (let i = 0; i < max; i++) {
createBalancer();
}
}
// 初始化負載均衡數據統計對象
const balanceDataBase = new DataBase(urlDesc);
// 運行均衡器
runBalancer();
// 運行後端服務節點
runWorker();
複製代碼
建立進程
監聽進程通訊消息
監聽更新響應時間事件並執行更新函數
監聽獲取統計對象事件並返回
監聽異常退出並從新建立,進程守護。
const createBalancer = () => {
// 建立進程
const worker = cluster.fork();
worker.on("message", (msg) => {
// 監聽更新響應時間事件
if (msg.type === "updateCostTime") {
balanceDataBase.updateCostTime(msg.URL, msg.costTime)
}
// 監聽獲取url統計對象事件並返回
if (msg.type === "getUrlCollect") {
worker.send({type: "getUrlCollect", urlCollect: balanceDataBase.urlCollect})
}
});
// 監聽異常退出事件並從新建立進程
worker.on("exit", () => {
createBalancer();
});
}
複製代碼
建立線程
解析須要監聽的端口
向子線程通訊,發送須要監聽的端口
經過線程通訊,監聽子線程事件
監聽異常退出並從新建立,線程守護。
const createWorkerThread = (listenUrl) => {
// 建立線程
const worker = new Worker(path.resolve(__dirname, "./workerThread.js"));
// 獲取監聽端口
const listenPort = listenUrl.split(":")[2];
// 向子線程發送要監聽的端口號
worker.postMessage({type: "port", port: listenPort});
// 接收子線程消息統計進程被訪問次數
worker.on("message", (msg) => {
// 監聽鏈接事件並觸發計數事件
if (msg.type === "connect") {
balanceDataBase.add(msg.port);
}
// 監聽斷開鏈接事件並觸發計數事件
else if (msg.type === "disconnect") {
balanceDataBase.sub(msg.port);
}
});
// 監聽異常退出事件並從新建立進程
worker.on("exit", () => {
createWorkerThread(listenUrl);
});
}
複製代碼
獲取getURL工具函數
監聽請求並代理
注1:LoadBalance函數即經過算法名稱返回不一樣的getURL工具函數,各算法實現見模塊二:常見算法
注2:getSource函數即處理參數並返回,getURL爲上面講到的獲取URL工具函數。
const cpusLen = require("os").cpus().length;
const LoadBalance = require("./algorithm");
const express = require("express");
const axios = require("axios");
const app = express();
const {urlFormat, ipFormat} = require("./util");
const {ALGORITHM, BASE_URL} = require("./constant");
const {urlDesc, algorithm, port} = require("./config");
const run = () => {
// 獲取轉發URL工具函數
const getURL = LoadBalance(urlDesc.slice(0, cpusLen), algorithm);
// 監聽請求並均衡代理
app.get("/", async (req, res) => {
// 獲取須要傳入的參數
const source = await getSource(req);
// 獲取URL
const URL = getURL(source);
// res.redirect(302, URL) 重定向負載均衡
// 記錄請求開始時間
const start = Date.now();
// 代理請求
axios.get(URL).then(async (response) => {
// 獲取負載均衡統計對象並返回
const urlCollect = await getUrlCollect();
// 處理跨域
res.setHeader("Access-Control-Allow-Origin", "*");
response.data.urlCollect = urlCollect;
// 返回數據
res.send(response.data);
// 記錄相應時間並更新
const costTime = Date.now() - start;
process.send({type: "updateCostTime", costTime, URL})
});
});
// 負載均衡服務器開始監聽請求
app.listen(port, () => {
console.log(`Load Balance Server Running at ${BASE_URL}:${port}`);
});
};
run();
const getSource = async (req) => {
switch (algorithm) {
case ALGORITHM.IP_HASH:
return ipFormat(req);
case ALGORITHM.URL_HASH:
return urlFormat(req);
case ALGORITHM.CONSISTENT_HASH:
return urlFormat(req);
case ALGORITHM.LEAST_CONNECTIONS:
return await getUrlCollect();
case ALGORITHM.FAIR:
return await getUrlCollect();
default:
return null;
}
};
複製代碼
// 獲取負載均衡統計對象
const getUrlCollect = () => {
return new Promise((resolve, reject) => {
try {
process.send({type: "getUrlCollect"})
process.on("message", msg => {
if (msg.type === "getUrlCollect") {
resolve(msg.urlCollect)
}
})
} catch (e) {
reject(e)
}
})
}
複製代碼
使用多線程+多進程模型,爲每一個服務節點提供併發能力。
根據配置文件,建立相應數量服務節點。
const cluster = require("cluster");
const cpusLen = require("os").cpus().length;
const {parentPort} = require('worker_threads');
const {workerNum, workerFilePath} = require("./config")
if (cluster.isMaster) {
// 建立工做進程函數
const createWorker = () => {
// 建立進程
const worker = cluster.fork();
// 監聽父線程消息,並轉發給子進程。
parentPort.on("message", msg => {
if (msg.type === "port") {
worker.send({type: "port", port: msg.port})
}
})
// 監聽子進程消息並轉發給父線程
worker.on("message", msg => {
parentPort.postMessage(msg);
})
// 監聽進程異常退出並從新建立
worker.on("exit", () => {
createWorker();
})
}
// 按配置建立進程,但不可大於CPU核數
let max
if (workerNum) {
max = workerNum > cpusLen ? cpusLen : workerNum
} else {
max = 1
}
for (let i = 0; i < max; i++) {
createWorker();
}
} else {
// 後端服務執行文件
require(workerFilePath)
}
複製代碼
var express = require("express");
var app = express();
let port = null;
app.get("/", (req, res) => {
// 觸發鏈接事件
process.send({type: "connect", port});
// 打印信息
console.log("HTTP Version: " + req.httpVersion);
console.log("Connection PORT Is " + port);
const msg = "Hello My PORT is " + port;
// 返回響應
res.send({msg});
// 觸發斷開鏈接事件
process.send({type: "disconnect", port});
});
// 接收主進通訊消息中的端口口並監聽
process.on("message", (msg) => {
if (msg.type === "port") {
port = msg.port;
app.listen(port, () => {
console.log("Worker Listening " + port);
});
}
});
複製代碼
status:任務隊列狀態
urlCollect:數據統計對象(提供給各算法使用 / 展現數據)
add方法
sub方法
updateCostTime方法
class DataBase {
urlCollect = {};
// 初始化
constructor (urlObj) {
urlObj.forEach((val) => {
this.urlCollect[val.url] = {
count: 0,
costTime: 0,
connection: 0,
};
});
}
//增長鏈接數和實時鏈接數
add (port) {
const url = `${BASE_URL}:${port}`;
this.urlCollect[url].count++;
this.urlCollect[url].connection++;
}
// 減小實時鏈接數
sub (port) {
const url = `${BASE_URL}:${port}`;
this.urlCollect[url].connection--;
}
// 更新響應時間
updateCostTime (url, time) {
this.urlCollect[url].costTime = time;
}
}
複製代碼
作了個可視化圖表來看均衡效果(Random)✔️
看起來均衡效果還不錯🧐
想手動實現一下負載均衡器 / 看看源碼的同窗均可以看看 👉🏻 代碼倉庫
經過cluster.isMaster判斷是否爲主進程,主進程不負責任務處理,只負責管理和調度工做子進程。
master主進程啓動了一個TCP服務器,真正監聽端口的只有這個TCP服務器。請求觸發了這個TCP服務器的connection
事件後,經過句柄轉發(IPC)給工做進程處理。
如何選擇工做進程?
爲何不直接用cluster進行負載均衡?
常見的進程間通訊方式
管道通訊
信號量
共享內存
Socket
消息隊列
Node中實現IPC通道是依賴於libuv。Windows下由命名管道實現,*nix系統則採用Domain Socket實現。
表如今應用層上的進程間通訊只有簡單的message事件和send()方法,接口十分簡潔和消息化。
IPC管道是如何創建的?
歡迎留言討論
Node.js非阻塞異步I/O速度快,前端擴展服務端業務?
企業實踐,說明Node仍是可靠的?
Node計算密集型不友好?
Node生態不如其餘成熟的語言
討論