實現 memcached 客戶端:TCP、鏈接池、一致性哈希、自定義協議。

實現 memcached 客戶端:TCP、鏈接池、一致性哈希、自定義協議。

廢話很少說,文本將帶你實現一個簡單的 memcached 客戶端。node

集羣:一致性哈希

memcached 自己並不支持集羣,爲了使用集羣,咱們能夠本身在客戶端實現路由分發,將相同的 key 路由到同一臺 memcached 上去便可。
路由算法有不少,這裏咱們使用一致性哈希算法。git

一致性哈希算法的原理:github

consistent-hash.png

一致性哈希算法已經有開源庫 hashring 實現,基本用法:算法

const HashRing = require('hashring');

// 輸入集羣地址構造 hash ring
const ring = new HashRing(['127.0.0.1:11211', '127.0.0.2:11211']);

// 輸入 key 獲取指定節點
const host = ring.get(key);

TCP 編程

包括 memcached 在內的許多系統對外都是經過 TCP 通訊。在 Node.js 中創建一個 TCP 鏈接並進行數據的收發很簡單:npm

const net = require('net');

const socket = new net.Socket();
socket.connect({
    host: host,                     // 目標主機
    port: port,                     // 目標端口
    // localAddress: localAddress,  // 本地地址
    // localPort: localPort,        // 本地端口
});

socket.setKeepAlive(true);      // 保活

// 鏈接相關
socket.on('connect', () => {
    console.log(`socket connected`);
});
socket.on('error', error => {
    console.log(`socket error: ${error}`);
});
socket.on('close', hadError => {
    console.log(`socket closed, transmission error: ${hadError}`);
});


socket.on('data', data => {
    // 接受數據
});

socket.write(data); // 發送數據

一條鏈接由惟一的五元組肯定,所謂的五元組就是:協議(好比 TCP 或者 UDP)、本地地址、本地端口、遠程地址、遠程端口。
系統正是經過五元組去區分不一樣的鏈接,其中本地地址和本地端口因爲在缺省狀況下會自動生成,經常會被咱們忽視。編程

鏈接池

一次完整的 TCP 通訊過程爲:三次握手,創建鏈接 --> 數據傳遞 --> 揮手,關閉鏈接。socket

咱們都知道握手創建鏈接的過程是很是消耗資源的,而鏈接池就是爲了解決這個問題,鏈接池是一個通用的模型,它包括:async

  • 創建鏈接,將鏈接放入池中。
  • 須要使用鏈接時(進行數據收發),從鏈接池中取出鏈接。
  • 鏈接使用完成後,將鏈接放回到池中。
  • 其它。

能夠看到所謂的鏈接池其實就是在鏈接使用完成後並非當即關閉鏈接,而是讓鏈接保活,等待下一次使用,從而避免反覆創建鏈接的過程。memcached

正如上文所述,鏈接池是一個通用的模型,咱們這裏直接使用開源庫 generic-pool函數

池化 TCP 鏈接示例:

const net = require('net');
const genericPool = require('generic-pool');

// 自定義建立鏈接池的函數
function _buildPool(remote_server) {
    const factory = {
        create: function () {
            return new Promise((resolve, reject) => {
                const host = remote_server.split(':')[0];
                const port = remote_server.split(':')[1];
                const socket = new net.Socket();
                socket.connect({
                    host: host, // 目標主機
                    port: port, // 目標端口
                });
                socket.setKeepAlive(true);
                socket.on('connect', () => {
                    console.log(`socket connected: ${remote_server} , local: ${socket.localAddress}:${socket.localPort}`);
                    resolve(socket);
                });
                socket.on('error', error => {
                    console.log(`socket error: ${remote_server} , ${error}`);
                    reject(error);
                });
                socket.on('close', hadError => {
                    console.log(`socket closed: ${remote_server} , transmission error: ${hadError}`);
                });
            });
        },
        destroy: function (socket) {
            return new Promise((resolve) => {
                socket.destroy();
                resolve();
            });
        },
        validate: function (socket) { // validate socket
            return new Promise((resolve) => {
                if (socket.connecting || socket.destroyed || !socket.readable || !socket.writable) {
                    return resolve(false);
                } else {
                    return resolve(true);
                }
            });
        }
    };
    const pool = genericPool.createPool(factory, {
        max: 10,            // 最大鏈接數
        min: 0,             // 最小鏈接數
        testOnBorrow: true, // 從池中取鏈接時進行 validate 函數驗證
    });
    return pool;
}

// 鏈接池基本使用
const pool = _buildPool('127.0.0.1:11211'); // 構建鏈接池

const s = await pool.acquire();             // 從鏈接池中取鏈接

await pool.release(s);                      // 使用完成後釋放鏈接

對接自定義協議

包括 memcached 在內的許多系統都定義了一套本身的協議用於對外通訊,爲了實現 memcached 客戶端固然就要遵照它的協議內容。

memcached 客戶端協議,咱們實現最簡單的 get 方法:

發送的數據格式:

get <key>\r\n

接受的數據格式:

VALUE <key> <flags> <bytes>\r\n
<data block>\r\n

實現示例:

// 定義一個請求方法並返回響應數據
function _request(command) {
    return new Promise(async (resolve, reject) => {
        try {
            // ...這裏省略了鏈接池構建相關部分
            const s = await pool.acquire(); // 取鏈接

            const bufs = [];
            s.on('data', async buf => { // 監聽 data 事件接受響應數據
                bufs.push(buf);

                const END_BUF = Buffer.from('\r\n'); // 數據接受完成的結束位
                if (END_BUF.equals(buf.slice(-2))) {
                    s.removeAllListeners('data'); // 移除監聽
                    try {
                        await pool.release(s); // 釋放鏈接
                    } catch (error) { }
                    const data = Buffer.concat(bufs).toString();
                    return resolve(data);
                }
            });

            s.write(command);
        } catch (error) {
            return reject(error);
        }
    });
}


// get
function get(key) {
    return new Promise(async (resolve, reject) => {
        try {
            const command = `get ${key}\r\n`;
            const data = await _request(key, command);

            // ...響應數據的處理,注意有省略

            // key not exist
            if (data === 'END\r\n') {
                return resolve(undefined);
            }

            /*
                VALUE <key> <flags> <bytesLength>\r\n
                <data block>\r\n
            */
            const data_arr = data.split('\r\n');
            const response_line = data_arr[0].split(' ');
            const value_flag = response_line[2];
            const value_length = Number(response_line[3]);

            let value = data_arr.slice(1, -2).join('');
            value = unescapeValue(value); // unescape \r\n
            // ...有省略
            return resolve(value);
        } catch (error) {
            return reject(error);
        }
    });
}

以上示例都單獨拿出來了,實際上是在整合在一個 class 中的:

class Memcached {
    constructor(serverLocations, options) {
        this._configs = {
            ...{
                pool: {
                    max: 1,
                    min: 0,
                    idle: 30000,             // 30000 ms.
                },
                timeout: 5000,              // timeout for every command, 5000 ms.
                retries: 5,                 // max retry times for failed request.
                maxWaitingClients: 10000,   // maximum number of queued requests allowed
            }, ...options
        };
        this._hashring = new HashRing(serverLocations);
        this._pools = {}; // 經過 k-v 的形式存儲具體的地址及它的鏈接池
    }

    _buildPool(remote_server) {
        // ...
    }

    _request(key, command) {
        // ...
    }


    // get
    async get(key) {
        // ...
    }

    // ... 其餘方法
}
// 使用實例
const memcached = new Memcached(['127.0.0.1:11211'], {
    pool: {
        max: 10,
        min: 0
    }
});

const key = 'testkey';
const result = await memcached.get(key);

完整的示例能夠看 io-memcached

qrcode_for_gh_9ccbe5e0dfb3_258.jpg

相關文章
相關標籤/搜索