史上最LOW的PHP鏈接池解決方案

大多數 PHP 程序員歷來沒有使用過鏈接池,主要緣由是按照 PHP 自己的運行機制並不容易實現鏈接池,因而乎 PHP 程序員一方面不得不承受其它程序員的冷嘲熱諷,另外一方面還得面對頻繁短連接致使的性能低下和 TIME_WAIT 等問題。 說到這,我猜必定會有 PHP 程序員跳出來講可使用長鏈接啊,效果是同樣同樣的。好比以 PHP 中最流行的 Redis 模塊 PhpRedis 爲例,便有 pconnect 方法可用,經過它能夠複用以前建立的鏈接,效果和使用鏈接池差很少。惋惜實際狀況是 PHP 中各個模塊的長鏈接方法並很差用,基本上是雞肋同樣的存在,緣由以下:php

  • 首先,按照 PHP 的運行機制,長鏈接在創建以後只能寄居在工做進程之上,也就是說有多少個工做進程,就有多少個長鏈接,打個比方,咱們有 10 臺 PHP 服務器,每臺啓動 1000 個 PHP-FPM 工做進程,它們鏈接同一個 Redis 實例,那麼此 Redis 實例上最多將存在 10000 個長鏈接,數量徹底失控了!
  • 其次,PHP 的長鏈接自己並不健壯。一旦網絡異常致使長鏈接失效,沒有辦法自動關閉從新鏈接,以致於後續請求所有失敗,此時除了重啓服務別無它法!

問題分析到這裏彷佛進入了死衚衕:按常規作法是無法實現了。 別急着,若是問題比較棘手,咱們不妨繞着走。讓咱們把目光聚焦到 Nginx 的身上,其 stream 模塊實現了 TCP/UDP 服務的負載均衡,同時藉助 stream-lua 模塊,咱們就能夠實現可編程的 stream 服務,也就是用 Nginx 實現自定義的 TCP/UDP 服務!固然你能夠本身從頭寫 TCP/UDP 服務,不過站在 Nginx 肩膀上無疑是更省時省力的選擇。 但是 Nginx 和 PHP 鏈接池有什麼關係?且聽我慢慢道來:一般大部分 PHP 是搭配 Nginx 來使用的,並且 PHP 和 Nginx 多半是在同一臺服務器上。有了這個客觀條件,咱們就能夠利用 Nginx 來實現一個鏈接池,在 Nginx 上完成鏈接 Redis 等服務的工做,而後 PHP 經過本地的 Unix Domain Socket 來鏈接 Nginx,如此一來既規避了短連接的種種弊端,也享受到了鏈接池帶來的種種好處。html

PHP Pool

PHP Poolnginx

下面以 Redis 爲例來說解一下實現過程,事先最好對 Redis 交互協議有必定的瞭解,推薦閱讀官方文檔中文翻譯,具體實現能夠參考 lua-resty-redis 庫,雖然它只是一個客戶端庫,可是 Redis 客戶端請求和服務端響應實際上格式是差很少通用的。 首先在 nginx.conf 文件中加入以下配置:git

stream {
    lua_code_cache on;
    lua_check_client_abort on;
    lua_package_path "/path/to/?.lua;;";

    server {
        listen unix:/tmp/redis.sock;

        content_by_lua_block {
             local redis = require "redis"
             pool = redis:new({
                 ip = "...", port = "...", auth = "..."
             })
             pool:run()
        }
    }
}

而後在 lua_package_path 配置的路徑上建立 redis.lua 文件:程序員

local redis = require "resty.redis"

local assert = assert
local print = print
local rawget = rawget
local setmetatable = setmetatable
local tonumber = tonumber
local str_byte = string.byte
local str_gmatch = string.gmatch
local str_sub = string.sub
local str_upper = string.upper

local function parse_request(sock)
    local line, err = sock:receive()

    if not line then
        return nil, err
    end

    local prefix = str_byte(line)

    if prefix == 42 then -- char '*'
        local result = {}

        local num = tonumber(str_sub(line, 2))

        if num <= 0 then
            return nil, "Wrong protocol format"
        end

        for i = 1, num do
            local res, err = parse_request(sock)

            if res == nil then
                return nil, err
            end

            result[i] = res
        end

        return result
    end

    if prefix == 36 then -- char '$'
        local size = tonumber(str_sub(line, 2))

        if size < 0 then
            return nil, "Wrong protocol format"
        end

        local result, err = sock:receive(size)

        if not result then
            return nil, err
        end

        local crlf, err = sock:receive(2)

        if not crlf then
            return nil, err
        end

        return result
    end

    -- inline
    local result = {}

    for res in str_gmatch(line, "%S+") do
        result[#result + 1] = res
    end

    return result
end

local function fetch_response(sock)
    local line, err = sock:receive()

    if not line then
        return nil, err
    end

    local result = {line, "\r\n"}
    local prefix = str_byte(line)

    if prefix == 42 then -- char '*'
        local num = tonumber(str_sub(line, 2))

        if num <= 0 then
            return result
        end

        for i = 1, num do
            local res, err = fetch_response(sock)

            if res == nil then
                return nil, err
            end

            for x = 1, #res do
                result[#result + 1] = res[x]
            end
        end
    elseif prefix == 36 then -- char '$'
        local size = tonumber(str_sub(line, 2))

        if size < 0 then
            return result
        end

        local res, err = sock:receive(size)

        if not res then
            return nil, err
        end

        local crlf, err = sock:receive(2)

        if not crlf then
            return nil, err
        end

        result[#result + 1] = res
        result[#result + 1] = crlf
    end

    return result
end

local function build_data(value)
    local result = {"*", #value, "\r\n"}

    for i = 1, #value do
        local v = value[i]

        result[#result + 1] = "$"
        result[#result + 1] = #v
        result[#result + 1] = "\r\n"
        result[#result + 1] = v
        result[#result + 1] = "\r\n"
    end

    return result
end

local function status_reply(message)
    return "+" .. message .. "\r\n"
end

local function command_args(request)
    local command = request[1]
    command = str_upper(command)

    local args = {}

    for i = 2, #request do
        args[#args + 1] = request[i]
    end

    return command, args
end

local function exit(err)
    ngx.log(ngx.NOTICE, err)

    return ngx.exit(ngx.ERROR)
end

local _M = {}

_M._VERSION = "1.0"

function _M.new(self, config)
    local t = {
        _ip = config.ip or "127.0.0.1",
        _port = config.port or 6379,
        _timeout = config.timeout or 100000,
        _size = config.size or 10,
        _auth = config.auth,
    }

    return setmetatable(t, { __index = _M })
end

function _M.run(self)
    local ip = self._ip
    local port = self._port
    local timeout = self._timeout
    local size = self._size
    local auth = self._auth

    local red = redis:new()
    local ok, err = red:connect(ip, port)

    if not ok then
        return exit(err)
    end

    if auth then
        local times = assert(red:get_reused_times())

        if times == 0 then
            local ok, err = red:auth(auth)

            if not ok then
                return exit(err)
            end
        end
    end

    local database = 0
    local transactional = false

    local upstream_sock = rawget(red, "_sock")
    local downstream_sock = assert(ngx.req.socket(true))

    while true do
        local request, err = parse_request(downstream_sock)

        if not request then
            if err == "client aborted" then
                break
            end

            return exit(err)
        end

        local command, args = command_args(request)

        if command == "QUIT" then
            downstream_sock:send(status_reply("OK"))
            break
        end

        upstream_sock:send(build_data(request))
        local response, err = fetch_response(upstream_sock)

        if not response then
            return exit(err)
        end

        if command == "SELECT" then
            database = tonumber(args[1])
        elseif command == "MULTI" then
            transactional = true
        elseif command == "EXEC" or command == "DISCARD" then
            transactional = false
        end

        downstream_sock:send(response)
    end

    if database ~= 0 then
        red:select(0)
    end

    if transactional then
        red:discard()
    end

    red:set_keepalive(timeout, size)
end

return _M

測試的 PHP 腳本內容以下:github

<?php

// 使用鏈接池
$redis = new Redis();
$redis->connect('/tmp/redis.sock');
$redis->set("foo", bar);
$redis->get("foo");

?>

<?php

// 不使用鏈接池
$redis = new Redis();
$redis->connect('ip', 'port');
$redis->auth('password')
$redis->set("foo", bar);
$redis->get("foo");

?>

推薦在獨立服務器上用 ab 測試,須要注意 Nginx 的 worker_processes 別設置過小,不然併發能力上不來,此外,測試過程當中注意觀察 tw(TIME_WAIT) 數量:redis

shell> ab -k -n 10000 -c 10 http://test/url
shell> watch -n1 'cat /proc/net/sockstat'

經過引入鏈接池,connect 自己就變得很快了,並且由於咱們在鏈接池中統一完成了 auth 受權,因此 PHP 代碼裏不用再執行額外的請求。我在 4 核 8 G 配置的服務器上測試,發現使用鏈接池後,性能提高了 20% 以上,不過要注意的是,若是 redis 操做比較多,那麼使用鏈接池性能提高可能不明顯,這是由於鏈接池自己須要重複解析請求和響應,抵消了部分好處。固然了,鏈接池還能實現不少高級功能,好比咱們能夠在鏈接池裏動態判斷當前請求查詢的 key 是否是 hot key,是就本地緩存起來,直接用緩存響應請求。shell

大概說明一下鏈接池的原理,當咱們 connect 的時候,ngx lua 會優先從鏈接池中獲取鏈接,當咱們 set_keepalive 的時候,ngx lua 會把鏈接放回鏈接池。在一次鏈接裏,用戶可能須要屢次操做 Redis,因而咱們使用了 while true 來循環獲取用戶的屢次操做,不過這樣的話,須要有一個請求結束的標識,以便跳出循環執行 set_keepalive,從而把鏈接放回鏈接池,最簡單的方法無疑是監控客戶端關閉鏈接的事件,對 PHP 來講是很簡單,請求結束時天然會關閉鏈接,若是你但願提早釋放鏈接的話,那麼須要一個標識,語義上 QUIT 是很好的標識,用的話能夠手動發送一個 rawCommand('quit')。編程

相關文章
相關標籤/搜索