玩轉 OpenResty 協程 API

注意:本文中列出的全部代碼只是 Proof Of Concept,基本上都沒有進行錯誤處理。另外對於一些邊際狀況,也可能沒有考慮清楚。因此對於直接複製文中代碼到項目中所形成的一切後果,請自負責任。編程

OK,言歸正題。OpenResty 提供了以 ngx.thread.*coroutine.*ngx.semaphore 等一系列協程 API。雖然受限於 Nginx 的請求處理方式,表現力不如通用語言的協程 API 那麼強大。可是開開腦洞,仍是能夠玩出一些花樣來的。
藉助這些 API,讓咱們嘗試模擬下其餘編程平臺裏面的調度方式。promise

模擬 Java 裏面的 Future

Java 裏的 Future 可讓咱們建立一個任務,而後在須要的時候纔去 get 任務的返回值。另外 Future 還有超時功能。
咱們能夠啓用一個協程來完成具體的任務,再加一個定時結束的協程,用於實現超時。app

像這樣:dom

local function task()
    ngx.sleep(3)
    ngx.say("Done")
end

local task_thread = ngx.thread.spawn(task)
local timeout_thread = ngx.thread.spawn(function(timeout)
    ngx.sleep(timeout)
    error("timeout")
end, 2)
local ok, res = ngx.thread.wait(task_thread, timeout_thread)
if not ok then
    if res == "timeout" then
        ngx.thread.kill(task_thread)
        ngx.say("task cancelled by timeout")
        return
    end
    ngx.say("task failed, result: ", res)
end
ngx.thread.kill(timeout_thread)

注意一點,在某一協程退出以後,咱們須要 kill 掉另一個協程。由於若是沒有調用 ngx.exit 之類的方法顯式退出的話,一直到全部協程退出爲止,當前階段都不會結束。post

引用文檔裏相關的內容:ui

By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate untillua

  1. both the "entry thread" and all the user "light threads" terminates,spa

  2. a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), orcode

  3. the "entry thread" terminates with a Lua error.協程

模擬 Javascript 裏面的 Promise.race/all

Promise.race/all 能夠接收多個 Promise,而後打包成一個新的 Promise 返回。引用相關的文檔:

The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.

The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.

這裏 reject 等價於協程運行中拋出 error,而 resolve 相對於協程返回告終果。這兩個 API 對於 reject 的處理是一致的,都是有任一出錯則馬上返回異常結果。對於正常結果,race 會在第一個結果出來時返回,而 all 則會在全部結果都出來後返回。
值得注意的是,Javascript 原生的 Promise 暫時沒有 cancell 的功能。因此即便其中一個 Promise reject 了,其餘 Promise 依然會繼續運行。對此咱們也照搬過來。

Promise.race 的實現:

local function apple()
    ngx.sleep(0.1)
    --error("apple lost")
    return "apple done"
end

local function banana()
    ngx.sleep(0.2)
    return "banana done"
end

local function carrot()
    ngx.sleep(0.3)
    return "carrot done"
end

local function race(...)
    local functions = {...}
    local threads = {}
    for _, f in ipairs(functions) do
        local th, err = ngx.thread.spawn(f)
        if not th then
            -- Promise.race 沒有實現 cancell 接口,
            -- 因此我偷下懶,無論已經建立的協程了
            return nil, err
        end
        table.insert(threads, th)
    end
    local ok, res = ngx.thread.wait(unpack(threads))
    if not ok then
        return nil, res
    end
    return res
end

local res, err = race(apple, banana, carrot)
ngx.say("res: ", res, " err: ", err)
ngx.exit(ngx.OK)

Promise.all 的實現:

local function all(...)
    local functions = {...}
    local threads = {}
    for _, f in ipairs(functions) do
        local th, err = ngx.thread.spawn(f)
        if not th then
            return nil, err
        end
        table.insert(threads, th)
    end
    local res_group = {}
    for _ = 1, #threads do
        local ok, res = ngx.thread.wait(unpack(threads))
        if not ok then
            return nil, res
        end
        table.insert(res_group, res)
    end
    return res_group
end

模擬 Go 裏面的 channel (僅部分實現)

再進一步,試試模擬下 Go 裏面的 channel。
咱們須要實現以下的語義:

  1. 當數據沒有被消費時,生產者會在發送數據以後中斷運行。

  2. 當數據沒有被生產時,消費者會在接收數據以前中斷運行。

  3. 當存在等待消費者接收數據的生產者時,其餘生產者會在發送數據以前中斷運行。

此次要用到 ngx.semaphore

local semaphore = require "ngx.semaphore"

local Chan = {
    new = function(self)
        local chan_attrs = {
            _read_sema = semaphore:new(),
            _write_sema = semaphore:new(),
            _exclude_sema = semaphore:new(),
            _buffer = nil,
            _waiting_thread_num = 0,
        }
        return setmetatable(chan_attrs, {__index = self})
    end,
    send = function(self, value, timeout)
        timeout = timeout or 60
        while self._buffer do
            self._waiting_thread_num = self._waiting_thread_num + 1
            self._exclude_sema:wait(timeout)
            self._waiting_thread_num = self._waiting_thread_num - 1
        end
        self._buffer = value
        self._read_sema:post()
        self._write_sema:wait(timeout)
    end,
    receive = function(self, timeout)
        timeout = timeout or 60
        self._read_sema:wait(timeout)
        local value = self._buffer
        self._buffer = nil
        self._write_sema:post()
        if self._waiting_thread_num > 0 then
            self._exclude_sema:post()
        end
        return value
    end,
}

local chan = Chan:new()

-- 如下是使用方法
local function worker_a(ch)
    for i = 1, 10 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end

local function worker_c(ch)
    for i = 11, 20 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end

local function worker_d(ch)
    for i = 21, 30 do
        ngx.sleep(math.random() / 10)
        ch:send(i, 1)
    end
end


local function worker_b(ch)
    for _ = 1, 20 do
        ngx.sleep(math.random() / 10)
        local v = ch:receive(1)
        ngx.say("recv ", v)
    end
end

local function worker_e(ch)
    for _ = 1, 10 do
        ngx.sleep(math.random() / 10)
        local v = ch:receive(1)
        ngx.say("recv ", v)
    end
end

ngx.thread.spawn(worker_a, chan)
ngx.thread.spawn(worker_b, chan)
ngx.thread.spawn(worker_c, chan)
ngx.thread.spawn(worker_d, chan)
ngx.thread.spawn(worker_e, chan)

模擬 Buffered channel 也是可行的。

local ok, new_tab = pcall(require, "table.new")
if not ok then
    new_tab = function (_, _) return {} end
end


local BufferedChan = {
    new = function(self, buffer_size)
        if not buffer_size or buffer_size <= 0 then
            error("Invalid buffer_size " .. (buffer_size or "nil") .. " given")
        end
        local chan_attrs = {
            _read_sema = semaphore:new(),
            _write_sema = semaphore:new(),
            _waiting_thread_num = 0,
            _buffer_size = buffer_size,
        }
        chan_attrs._buffer = new_tab(buffer_size, 0)
        return setmetatable(chan_attrs, {__index = self})
    end,
    send = function (self, value, timeout)
        timeout = timeout or 60
        while #self._buffer >= self._buffer_size do
            self._waiting_thread_num = self._waiting_thread_num + 1
            self._write_sema:wait(timeout)
            self._waiting_thread_num = self._waiting_thread_num - 1
        end
        table.insert(self._buffer, value)
        self._read_sema:post()
    end,
    receive = function(self, timeout)
        timeout = timeout or 60
        self._read_sema:wait(timeout)
        local value = table.remove(self._buffer)
        if self._waiting_thread_num > 0 then
            self._write_sema:post()
        end
        return value
    end,
}

local chan = BufferedChan:new(2)
-- ...

固然上面的山寨貨仍是有不少問題的。好比它缺乏相當重要的 select 支持,另外也沒有實現 close 相關的特性。

相關文章
相關標籤/搜索