從零開始寫一個SSR分佈式緩存服務(一)

前言

公司如今全部新項目已經所有使用next.js了,最近在作性能優化,什麼HTTP2GzipCache-ControlService WorkerCloudFlare,一頓操做猛如虎前端

一看戰績十比五,速度提高很多,奈斯!nginx

這就好了嗎,NONONO,做爲一個前端老鳥,固然要有更高的追求git

再想一想還有沒有能夠優化的地方,老衲掐指一算,若是每次訪問都執行renderToHTML渲染頁面,不但增長TTFB時間還浪費CPU,確定要優化,官方服務端緩存的例子:ssr-caching,是基於cacheable-response的,可是咱們如今都是一個站PM2開多個進程,nginx作負載均衡,線上熱更新,一個進程緩存一遍,那豈不是開幾個進程一樣的內容緩存幾回,就像這樣:github

相同的內容重複緩存,浪費大量的服務器內存,而咱們想要的是這樣的:web

那搭一個memcached或redis
搭redis好煩哦,我這麼懶,只能本身動手寫一個了
解決這個問題的思路很簡單,經過客戶端和服務端通訊實現緩存統一管理
廢話很少說,盤他!redis

緩存

咱們先寫一個基本的緩存數據庫

function Cache(options) {
 "use strict";
    
    const defaults = {...};
    const conf = Object.assign({}, defaults, options);
    const timeoutIds = {};
    
    // 存儲接口,好比你要存到文件,數據庫等其餘地方,只須要實現這三個方法
    const localStorage = conf.localStorage || {
        cache: {},
        getItem(key){
            return this.cache[key];
        },
        setItem(key, value){
            this.cache[key] = value;
        },
        removeItem(key){
            delete this.cache[key];
        }
    };
    
    // 存儲接口再包裝一層,處理數據類型之類的操做
    const storage = {
        get: (key, dataType="json")=>{
            let data = localStorage.getItem(key);
            return dataType === "json" ? JSON.parse(data || "null") : data;
        },
        set: (key, value)=>{
            if (typeof value !== "string") {
                value = JSON.stringify(value);
            }
            localStorage.setItem(key, value);
        },
        remove: (key)=>{
            localStorage.removeItem(key);
        }
    };
    
    // 如下是對外公開的方法
    this.set = (key, value, ttl=3600)=>{
        // 緩存是有存活時間的,因此這裏須要把原始數據包裝一下
        storage.set(key, {
            value: value,  // 緩存原始數據
            timestamp: new Date().getTime(),  // 緩存寫入時間,用來計算是否過時
            ttl: ttl,  // 存活時間,單位秒,默認一小時
        });
        /* 緩存過時清理 清除上一個setTimeout,不然同一個key屢次set,第一個setTimeout會提早刪除緩存 應以最後一次爲準 */
        clearTimeout(timeoutIds[key]);
        timeoutIds[key] = setTimeout(()=>{
            this.remove(key);
        }, ttl * 1000);
    };
    
    this.get = (key, dataType="json")=>{
        let result = storage.get(key, dataType) || {};
        // 判斷緩存是否過時
        if (new Date() - result.timestamp > result.ttl * 1000){
            storage.remove(key);
            return null;
        }
        return result.value;
    };
    
    this.remove = (key)=>{
        storage.remove(key);
    };
}
複製代碼

用法就像這樣:npm

const cache = new Cache();

let value = cache.get("your_cache_key");
if (!value){
    value = getValueFunction();
    cache.set("your_cache_key", value, 7200);
}
複製代碼

服務端

接下來要寫一個服務端,該咱們的WebSocket登場了(爲何用WebSocket?由於它全雙工,而且有現成的輪子,並且前端也比較熟悉),WebSocket的框架有不少,咱們來選一個,小公雞點到誰我就選。。。就它了:ws(大名鼎鼎的socket.io內部的WebSocket鏈接就是用它實現的),首先安裝wsjson

npm install ws --save
複製代碼

安裝成功後就能夠基於ws寫咱們的服務端了,緩存

const WebSocket = require("ws");

function Server(options) {
 "use strict";
    
    const defaults = {
        port: 666,                 // 端口
        verifyClient: () => true,  // 返回true表示贊成鏈接,返回false表示拒絕鏈接
    };
    const conf = Object.assign({}, defaults, options);
    const cache = new Cache(conf);
    
    // 發送消息的方法,封裝這個方法的目的是防止異常致使程序崩潰
    function sendTo(client, data) {
        try {
            if (typeof data !== "string"){
                data = JSON.stringify(data);
            }
            client.send(data);
        } catch (e) {
            console.error(e);
        }
    }
    
    // 啓動服務
    this.start = function () {
        return new Promise(resolve => {
            const WebSocketServer = new WebSocket.Server({
                port: conf.port,
                verifyClient: conf.verifyClient,
            });
            WebSocketServer.on("connection", function (client, request) {
                // 服務端收到消息
                client.on("message", function (message) {
                    try {
                        /* 收到消息後,執行對應的操做,並返回消息給客戶端 消息結構:{ id: "...", // 消息id,用來回復消息對應的請求,後面客戶端會講到 data: { action: "get", // 要執行的操做 key: "your_cache_key", value: "your_value", ttl: 3600, } } */
                        let {id, data} = JSON.parse(message);
                        if (data.action === "get"){
                            let result = cache.get(data.key);
                            sendTo(client, {
                                id: id,
                                data: result,
                            });
                        } else if (data.action === "set"){
                            cache.set(data.key, data.value, data.ttl);
                            sendTo(client, {
                                id: id,
                                success: true,
                                message: `set cache ${data.key} success!`
                            });
                        } else if (data.action === "remove"){
                            cache.remove(data.key);
                            sendTo(client, {
                                id: id,
                                success: true,
                                message: `remove cache ${data.key} success!`
                            });
                        }
                    } catch (e) {
                        console.error("incoming message: ", message, "error: ", e);
                    }
                });
                // 用來判斷鏈接是否斷開
                client.isAlive = true;
                // 響應心跳包
                client.on("pong", function () {
                    client.isAlive = true;
                });
            });
            
            // 心跳檢測
            setInterval(function () {
                WebSocketServer.clients.forEach(function (client) {
                    /* 若是這個鏈接的isAlive是false,說明沒有響應心跳,斷開鏈接 雖然WebSocket是長鏈接,但操做系統會按期檢測,把不活動的鏈接斷開 因此須要心跳機制來維持鏈接處於活動狀態不被斷開 */
                    if (client.isAlive === false) {
                        return client.terminate();
                    }
                    client.isAlive = false;
                    // 發送心跳包
                    client.ping(function () {});
                });
            }, 60 * 1000);
            
            // 監聽事件
            WebSocketServer.on("listening", function () {
                console.log(`NODE_ENV: ${process.env.NODE_ENV}`);
                console.log(`WebSocket server is listening at ${conf.port}`);
                resolve();
            });
        });
    };
}
複製代碼

客戶端

OK,服務端完成,接下來寫客戶端,客戶端分兩塊,一個是負責收發消息的Socket,另外一個是負責解析消息返回操做結果的Client,咱們先來寫Socket

這裏有個要注意的地方,WebSocket和HTTP不同,好比客戶端發三條請求消息到服務端,服務端回覆三條消息給客戶端,可是客戶端接收到這三條消息的順序是否和請求順序同樣,那可就不必定了

通常的作法就是給消息加一個id,服務器返回消息帶上對應id,根據id執行對應的callback,就像你去KFC吃飯,點餐下單後會給你一個取餐號,廚師把菜作好之後會叫號取餐

這裏咱們須要生成惟一id,js生成惟一id的方法通常有三種:

  1. 自增數
  2. 時間戳
  3. UUID

自增數:若是網站訪問量較大,天天百萬PV,天天的增量就幾百萬,用不了幾天就炸了
時間戳:假如1秒1000以上的請求,因爲時間戳的最小單位是毫秒,確定會產生多個相同的時間戳
UUID : 重複的機率比你被隕石砸中的機率還要低

固然是用重複率最低的UUID了

function Socket(url, options) {
 "use strict";
    
    const defaults = {
        retry: 5,
        timeout: 30000,
        ws: {                         // ws的配置
            perMessageDeflate: true,  // 啓用數據壓縮
        },
    };
    const conf = Object.assign({}, defaults, options);
    
    // WebSocket Client
    let ws = {};
    // 是否鏈接成功
    let isConnectSuccess = false;
    // 鏈接狀態
    let isConnected = false;
    // 對外公開的鏈接狀態,鏈接狀態不能被外部修改,因此要只讀而且不可重定義
    Object.defineProperties(this, {
        isConnected: {
            configurable: false,
            get: ()=>{
                return isConnected;
            }
        }
    });
    
    // 等待回調的任務隊列
    const queue = {};
    // 消息的惟一id,用來關聯請求響應
    const getUniqueId = function () {
        return uuid();
    };
    // 響應請求的方法
    const response = function (id, data) {
        const callback = queue[id];
        if (callback){
            delete queue[id];
            if (typeof callback === "function"){
                callback(data);
            }
        }
    };

    this.connect = ()=>{
        return new Promise((resolve) => {
            try {
                if (ws.terminate){
                    ws.terminate();
                }
                // 鏈接服務器
                ws = new WebSocket(url, conf.ws);
                
                ws.on("open", ()=>{
                    isConnectSuccess = true;
                    isConnected = true;
                    console.log(`server is connected!`);
                    resolve(true);
                });
                
                ws.on("message", (data)=>{
                    try {
                        let result = JSON.parse(data);
                        response(result.id, result.data);
                    } catch (e) {
                        console.error(e);
                    }
                });
                
                ws.on("error", (e)=>{
                    console.error(e);
                    resolve(false);
                });
                
                ws.on("close", ()=>{
                    isConnected = false;
                    console.log(`connection is closed!`);
                    // 鏈接成功後意外狀況致使的鏈接中斷才須要自動重連
                    if (isConnectSuccess === true){
                        console.log(`reconnecting...`);
                        this.connect(url);
                    }
                });
            } catch (e) {
                console.error(e);
                resolve(false);
            }
        })
    };
    
    this.send = (data, callback)=>{
        const id = getUniqueId();
        queue[id] = callback;
        try {
            if (ws.readyState === WebSocket.OPEN){
                ws.send(JSON.stringify({
                    id: id,
                    data: data,
                }));
                // 若是意外狀況致使服務端沒有返回消息,進行超時處理
                setTimeout(()=>{
                    response(id, null);
                }, conf.timeout);
            } else {
                console.error(`server is disconnected!`);
                response(id, null);
            }
        } catch (e) {
            console.error(e);
            response(id, null);
        }
    };
}
複製代碼

接下來實現客戶端的基本操做就能夠了

function Client(url, options) {
 "use strict";
    
    const defaults = {
        ttl: 3600,
        socket: {
            ws: {},
        },
    };
    const conf = Object.assign({}, defaults, options);
    const socket = new Socket(url, conf.socket);

    Object.defineProperties(this, {
        isConnected: {
            configurable: false,
            get: ()=>{
                return socket.isConnected;
            }
        }
    });
    
    this.connect = async () => {
        try {
            if (socket.readyState !== WebSocket.OPEN){
                await socket.connect(url);
            } else {
                console.log(`server is already connected!`);
            }
            return true;
        } catch (e) {
            return e;
        }
    };
    
    this.get = (key) => {
        return new Promise(resolve => {
            socket.send({
                action: "get",
                key: key,
            }, (result)=>{
                resolve(result.value);
            });
        });
    };
    
    this.set = (key, value, ttl) => {
        return new Promise(resolve => {
            socket.send({
                action: "set",
                key: key,
                value: value,
                ttl: ttl || conf.ttl,
            }, resolve);
        });
    };
}
複製代碼

用法示例

到此,整個緩存服務基本功能就算完成了,具體怎麼使用,下面舉個栗子:

// server
const NextCache = require("next-cache");
const server = new NextCache.Server({
    port: 666,
});
server.start();

// client
const NextCache = require("next-cache");
const cache = new NextCache.Client("ws://localhost:666");
await cache.connect();

let value = await cache.get(`your_cache_key`);
if (!value){
    value = getValueFunction();
    await cache.set("your_cache_key", value, 7200);
}
複製代碼

能夠用了,不錯不錯

你覺得這就完了嗎?

too young too naive

鏈接風暴

基本功能雖然完成了,但在實際應用中,這樣不能把緩存的做用發揮到最大,問題的關鍵在這段代碼:

let value = await cache.get(`your_cache_key`);
if (!value){
    value = getValueFunction();
    await cache.set("your_cache_key", value, 3600);
}
複製代碼

且聽老衲慢慢道來

衆所周知緩存應用的場景大部分是在高訪問量高併發場景下,那麼上面那段代碼在高併發場景會出現什麼問題,假設如今有多個請求併發執行,咱們來分析一下執行過程:

// Step 1: 如今有10個線程幾乎同時進來取緩存,假設如今緩存已過時
let value = await cache.get(`your_cache_key`);

// Step 2: 10個線程拿到的值都是空的
if (!value){

    // Step 3: 10個線程都執行了取值方法
    value = getValueFunction();
    
    // Step 4: 10個線程用取到的值填充緩存
    await cache.set("your_cache_key", value, 3600);
}
複製代碼

緩存沒有更新完以前,進來的線程都會去更新緩存,這樣會大大下降緩存的命中率,致使服務器資源飆升,爲了驗證上面的推測,咱們來模擬高併發場景作個測試:

const server = new Server({
    port: 666
});
await server.start();

const cache = new Client(`ws://localhost:666`);
await cache.connect();

function sleep(ms) {
    return new Promise(resolve => {
        setTimeout(resolve, ms)
    })
}

// 模擬取數據的操做
async function getValue() {
    console.log(`get value`);
    await sleep(1000);
    return `your value`;
}

async function test() {
    // 填充緩存
    await cache.set("cache_key", "your value", 3);
    // 等待緩存過時
    await sleep(5 * 1000);
    // 模擬10次併發
    for (let i=0; i<10; i++){
        (async function () {
            let value = await cache.get("cache_key");
            if (!value){
                value = await getValue();
                cache.set("cache_key", value, 3600);
            }
        })();
    }
}

test();
複製代碼

上面代碼的運行結果:

getValue執行了10次,這就是所謂的鏈接風暴

強人鎖男

如何解決鏈接風暴的問題?

答案:髒數據 +

思路就是,給線程加鎖,更新緩存時,只放一個線程去更新緩存,其餘線程返回舊的髒數據,緩存更新完以後進來的線程,就能拿到新數據啦

既然須要髒數據,那就不能在緩存過時後當即刪除緩存,但也不能一直放着佔內存,因此須要設置一個延遲刪除的時間

function Server(options){
 "use strict";
    
    const defaults = {
        port: process.env.PORT || 666,
        verifyClient: () => true,
        removeDelay: 60,  // 緩存延遲刪除的時間,單位秒
    };
    const conf = Object.assign({}, defaults, options);
    const cache = new Cache(conf);
    ...
}

function Cache(options){
    const defaults = {
        localStorage: null,
        removeDelay: 60,
    };
    const conf = Object.assign({}, defaults, options);
    const timeoutIds = {};
    ...
    this.get = (key, dataType)=>{
        let result = storage.get(key, dataType || "json") || {};
        if (new Date() - result.timestamp > result.ttl * TimeUnit.Second + conf.removeDelay * TimeUnit.Second){
            storage.remove(key);
            return null;
        }
        return result;
    };
    this.set = (key, value, ttl)=>{
        storage.set(key, {
            value: value,
            timestamp: new Date().getTime(),
            ttl: ttl,
        });
        clearTimeout(timeoutIds[key]);
        timeoutIds[key] = setTimeout(()=>{
            storage.remove(key);
        }, ttl * TimeUnit.Second + conf.removeDelay * TimeUnit.Second);
    };
}
複製代碼

而後就是加鎖了

const lock = {};
...
this.get = (key, getValue) => {
    return new Promise((resolve, reject) => {
        socket.send({
            action: "get",
            key: key,
        }, async(result)=>{
            try {
                if (!result.value || (isCacheExpired(result) && typeof getValue === "function" && !lock[key])){
                    // 第一個線程進來加鎖,不讓後面的線程進來
                    lock[key] = true;
                    result = await getValue();
                    if (typeof result !== "object" || "ttl" in result === false){
                        result = {value: result, ttl: conf.ttl};
                    }
                    await this.set(key, result.value, result.ttl);
                    // 釋放鎖
                    delete lock[key];
                }
            } catch (e) {
                // 若是更新緩存出錯,也要釋放鎖,不然會死鎖
                delete lock[key];
                reject(e);
            } finally {
                // 其餘線程沒進if,直接返回髒數據
                resolve(result.value);
            }
        });
    });
};
複製代碼

修改以後,客戶端get用法稍微有點變化:

let value = await cache.get(`cache_key`, ()=>{
    return 123;  // 默認緩存1小時
});
複製代碼
let value = await cache.get(`cache_key`, async()=>{
    return {
        value: 123,
        ttl: 7200  // 緩存2小時
    }
});
複製代碼

怎麼樣,用法是否是比之前更簡單更清晰了,咱們來作一下測試,把以前的test方法改一下:

...
async function test(){
    await cache.set(`cache_key`, `your value`, 3);
    await sleep(5 * 1000);
    for (let i=0; i<10; i++){
        cache.get(`cache_key`, getValue);
    }
}

test();
複製代碼

運行結果:

Congratulations!

一個簡單的SSR緩存服務就大功告成了

完整代碼:next-cache

不過,一個成熟的做品,還要具有安全性,分佈式,高可用性,容災備份等等

下期給你們講安全性和分佈式的實現

相關文章
相關標籤/搜索