基於lua-resty-redis的redis鏈接池

基於lua-resty-redis的redis鏈接池 [輪]

@author     karminski <code.karminski@outlook.com>
@version    161028:3
@link       http://blog.eth6.org/src/wheel/redis_connection_pool_with_lua_nginx_module.html

這幾天用oprensty寫了一些東西, 在用lua-resty-redis鏈接redis的時候須要一個鏈接池, 本來想着這東西也沒多難因而就手動擼了一個, 寫完了接入到系統在測試的時候發現不妙了. 不但redis鏈接巨慢, 並且失敗率也很高. RTFM以後終於寫出了一個穩定版本.html

模塊分爲這幾個部分:

-- Pseudocode
<code #1>
redis_factory = function(redis_config)
    h               = redis_config
    h.redis         = lua-resty-redis
    h.cosocket_pool = cosocket_pool config
    h.commands      = lua-resty-redis proxy commands name
    h.connect       = lua-resty-redis connect warp
    h.spawn_client  = function(): spawn redis-proxy-client -><code #2>

    self            = {}
    self.pool       = storage redis instance name
    self.construct  = function(): do your own construct 
    self.spawn      = function(): call h.spawn_client() by name and storage spawned instance into ngx.ctx
    self.destruct   = function(): close and put into cosocket connection pool 
end

<code #2>
spawn_client instance, aka redis-proxy-client = {
    name            = redis instance name
    redis_instance  = lua-resty-redis instance
    connect         = h.connect
    connect_info    = h.name
    construct       = function(): proxy lua-resty-redis all method into self
    ... (proxy function from lua-resty-redis)
}

原型部分:

  • h變量用來存儲配置.nginx

  • h.connect()函數封裝了lua-resty-redis的鏈接方法.git

  • h.spawn_client()方法用來生成包裝lua-resty-redis的redis-proxy-client.github

  • redis-proxy-client將lua-resty-redis內部的方法所有包裝爲本身內部的方法, 方法名稱從h.commands指定.redis

redis-proxy-client中包含整個h變量的鏈接方法和鏈接參數, 該proxy構造過程將全部的proxy方法中均插入對lua-resty-redis產生的實例進行檢測並從新鏈接的邏輯, 並且只在代理方法被調用時進行檢測, 極大地縮短了redis實例初始化和使用之間的時間差, 同時又能克服與redis之間因爲網絡問題或設置問題致使的鏈接中斷.網絡

當redis_factory實例化後,返回的table包含如下幾個方法:

  • self.construct()是預留的構造函數.併發

  • self.pool變量用來存儲已經實例化的redis實例的名稱.app

  • redis-proxy-client.redis_instance, 真正的實例化的redis保存在redis-proxy-client.redis_instance,而redis-proxy-client則在redis_factory:spawn()過程當中被保存在ngx.ctx中(必須將redis實例放置在ngx.ctx,不然會引發競爭致使命令請求失敗).dom

  • self.destruct()用來銷燬鏈接池中的全部redis實例, 其內部調用set_keepalive()後會當即將redis鏈接置爲關閉狀態. 並將redis鏈接放入ngx_lua cosocket鏈接池.socket

模塊詳細實現以下:

鏈接池代碼:

--[[

    redis_factory.lua
    Redis factory method. 
    You can also find it at https://gist.github.com/karminski/33fa9149d2f95ff5d802


    @version    151019:5
    @author     karminski 
    @license    MIT

    @changelogs 
                151019:5 CLEAN test code.
                151016:4 REFACTORY spawn logic.
                151012:3 REWRITE redis proxy.
                151009:2 ADD connection mode feature.
                150922:1 INIT commit.

]]--

local redis_factory = function(h)
    
    local h           = h

    h.redis           = require('resty.redis')
    h.cosocket_pool   = {max_idel = 10000, size = 200}

    h.commands        = {
        "append",            "auth",              "bgrewriteaof",
        "bgsave",            "bitcount",          "bitop",
        "blpop",             "brpop",
        "brpoplpush",        "client",            "config",
        "dbsize",
        "debug",             "decr",              "decrby",
        "del",               "discard",           "dump",
        "echo",
        "eval",              "exec",              "exists",
        "expire",            "expireat",          "flushall",
        "flushdb",           "get",               "getbit",
        "getrange",          "getset",            "hdel",
        "hexists",           "hget",              "hgetall",
        "hincrby",           "hincrbyfloat",      "hkeys",
        "hlen",
        "hmget",             "hmset",             "hscan",
        "hset",
        "hsetnx",            "hvals",             "incr",
        "incrby",            "incrbyfloat",       "info",
        "keys",
        "lastsave",          "lindex",            "linsert",
        "llen",              "lpop",              "lpush",
        "lpushx",            "lrange",            "lrem",
        "lset",              "ltrim",             "mget",
        "migrate",
        "monitor",           "move",              "mset",
        "msetnx",            "multi",             "object",
        "persist",           "pexpire",           "pexpireat",
        "ping",              "psetex",            "psubscribe",
        "pttl",
        "publish",           "punsubscribe",      "pubsub",
        "quit",
        "randomkey",         "rename",            "renamenx",
        "restore",
        "rpop",              "rpoplpush",         "rpush",
        "rpushx",            "sadd",              "save",
        "scan",              "scard",             "script",
        "sdiff",             "sdiffstore",
        "select",            "set",               "setbit",
        "setex",             "setnx",             "setrange",
        "shutdown",          "sinter",            "sinterstore",
        "sismember",         "slaveof",           "slowlog",
        "smembers",          "smove",             "sort",
        "spop",              "srandmember",       "srem",
        "sscan",
        "strlen",            "subscribe",         "sunion",
        "sunionstore",       "sync",              "time",
        "ttl",
        "type",              "unsubscribe",       "unwatch",
        "watch",             "zadd",              "zcard",
        "zcount",            "zincrby",           "zinterstore",
        "zrange",            "zrangebyscore",     "zrank",
        "zrem",              "zremrangebyrank",   "zremrangebyscore",
        "zrevrange",         "zrevrangebyscore",  "zrevrank",
        "zscan",
        "zscore",            "zunionstore",       "evalsha",
        -- resty redis private command
        "set_keepalive",     "init_pipeline",     "commit_pipeline",      
        "array_to_hash",     "add_commands",      "get_reused_times",
    }

    -- connect
    -- @param table connect_info, e.g { host="127.0.0.1", port=6379, pass="", timeout=1000, database=0}
    -- @return boolean result
    -- @return userdata redis_instance
    h.connect = function(connect_info)
        local redis_instance = h.redis:new()
        redis_instance:set_timeout(connect_info.timeout)
        if not redis_instance:connect(connect_info.host, connect_info.port) then 
            return false, nil
        end
        if connect_info.pass ~= '' then
            redis_instance:auth(connect_info.pass)
        end
        redis_instance:select(connect_info.database)
        return true, redis_instance
    end

    -- spawn_client
    -- @param table h, include config info
    -- @param string name, redis config name
    -- @return table redis_client
    h.spawn_client = function(h, name)

        local self = {}
        
        self.name           = ""
        self.redis_instance = nil
        self.connect        = nil
        self.connect_info   = {
            host = "",   port = 0,    pass = "", 
            timeout = 0, database = 0
        }

        -- construct
        self.construct = function(_, h, name)
            -- set info
            self.name         = name
            self.connect      = h.connect
            self.connect_info = h[name]
            -- gen redis proxy client
            for _, v in pairs(h.commands) do
                self[v] = function(self, ...)
                    -- instance test and reconnect  
                    if (type(self.redis_instance) == 'userdata: NULL' or type(self.redis_instance) == 'nil') then
                        local ok
                        ok, self.redis_instance = self.connect(self.connect_info)
                        if not ok then return false end
                    end
                    -- get data
                    return self.redis_instance[v](self.redis_instance, ...)
                end
            end
            return true
        end

        -- do construct
        self:construct(h, name) 

        return self
    end     



    local self = {}

    self.pool  = {} -- redis client name pool

    -- construct
    -- you can put your own construct code here.
    self.construct = function()
        return
    end

    -- spawn
    -- @param string name, redis database serial name
    -- @return boolean result
    -- @return userdata redis
    self.spawn = function(_, name)
        if self.pool[name] == nil then
            ngx.ctx[name] = h.spawn_client(h, name) 
            self.pool[name] = true
            return true, ngx.ctx[name]
        else
            return true, ngx.ctx[name]
        end
    end

    -- destruct
    -- @return boolean allok, set_keepalive result
    self.destruct = function()
        local allok = true
        for name, _ in pairs(self.pool) do
            local ok, msg = ngx.ctx[name].redis_instance:set_keepalive(
                h.cosocket_pool.max_idel, h.cosocket_pool.size
            )
            if not ok then allok = false end 
        end
        return allok
    end

    -- do construct
    self.construct() 
        
    return self
end


return redis_factory

使用方法:

package.path  = '/home/www/bin_lua/?.lua;;./?.lua;' .. package.path

-- config example
local config = {
    redis_a = { -- your connection name 
        host = '127.0.0.1',
        port = 6379,
        pass = '',
        timeout = 200, -- watch out this value
        database = 0,
    },
    redis_b = {
        host = '127.0.0.1',
        port = 6379,
        pass = '',
        timeout = 200,
        database = 0,
    },
}

local redis_factory = require('redis_factory')(config) -- import config when construct
local ok, redis_a = redis_factory:spawn('redis_a')
local ok, redis_b = redis_factory:spawn('redis_b')
local ok = redis_a:set('test', "aaaaaaaaaaa")
if not ok then ngx.say("failed") end
local ok = redis_b:set('test', "bbbbbbbbbbb")
if not ok then ngx.say("failed") end

redis_factory:destruct() -- important, better call this method on your main function return

ngx.say("end")

注意事項:

  • 必須打開lua_code_cache,不打開的狀況,性能不只是打開狀況的一半如下,並且持續併發請求的時候會 形成平均響應時間的持續上升,最終拖垮整個服務.

  • 建議按需求設置timeout和max_idel以及size,其中timeout是鏈接池最爲致命的參數,建議該值不小一次請求的平均時間,若是timeout太小,則會形成"lua tcp socket read timed out"和"attempt to send data on a closed socket"錯誤,形成這種錯誤的緣由是timeout太小,鏈接被redis過早釋放,致使cosocket鏈接池沒法重複利用鏈接.例如:

    2015/10/19 15:03:16 [error] 9117#0: *2673 lua tcp socket read timed out, client: 10.121.95.83, server: bin_lua, request: "GET /test HTTP/1.1", host: "bin_lua"
    
    2015/10/19 15:03:16 [error] 9117#0: *2673 attempt to send data on a closed socket: u:00000000402FAFC8, c:0000000000000000, ft:0 eof:0, client: 127.0.0.1, server: bin_lua, request: "GET /test HTTP/1.1", host: "bin_lua"
  • lua-resty-redis的實例應該存放於ngx.ctx全局變量中(單個請求生命週期的全局), 若是存放在本地變量中, 會形成競爭引起的請求錯誤等故障, 例如:

    2015/10/13 15:30:32 [error] 1347#0: *841234 lua entry thread aborted: runtime error: /home/www/bin_lua/redis_factory.lua:188: bad request
  • 這一點lua-resty-redis做者也在文檔中有詳細的說明: (引用自https://github.com/openresty/...)

    Limitations
    
    This library cannot be used in code contexts like init_by_lua, set_by_lua, log_by_lua, and header_filter_by_lua where the ngx_lua cosocket API is not available.
    
    The resty.redis object instance cannot be stored in a Lua variable at the Lua module level, because it will then be shared by all the concurrent requests handled by the same nginx worker process (see http://wiki.nginx.org/HttpLuaModule#Data_Sharing_within_an_Nginx_Worker ) and result in bad race conditions when concurrent requests are trying to use the same resty.redis instance (you would see the "bad request" or "socket busy" error to be returned from the method calls). You should always initiate resty.redis objects in function local variables or in the ngx.ctx table. These places all have their own data copies for each request.

以上

參考:

https://github.com/openresty/...

相關文章
相關標籤/搜索