注意:本文中列出的全部代碼只是 Proof Of Concept,基本上都沒有進行錯誤處理。另外對於一些邊際狀況,也可能沒有考慮清楚。因此對於直接複製文中代碼到項目中所形成的一切後果,請自負責任。編程
OK,言歸正題。OpenResty 提供了以 ngx.thread.*
,coroutine.*
和 ngx.semaphore
等一系列協程 API。雖然受限於 Nginx 的請求處理方式,表現力不如通用語言的協程 API 那麼強大。可是開開腦洞,仍是能夠玩出一些花樣來的。
藉助這些 API,讓咱們嘗試模擬下其餘編程平臺裏面的調度方式。promise
Java 裏的 Future 可讓咱們建立一個任務,而後在須要的時候纔去 get 任務的返回值。另外 Future 還有超時功能。
咱們能夠啓用一個協程來完成具體的任務,再加一個定時結束的協程,用於實現超時。app
像這樣:dom
local function task() ngx.sleep(3) ngx.say("Done") end local task_thread = ngx.thread.spawn(task) local timeout_thread = ngx.thread.spawn(function(timeout) ngx.sleep(timeout) error("timeout") end, 2) local ok, res = ngx.thread.wait(task_thread, timeout_thread) if not ok then if res == "timeout" then ngx.thread.kill(task_thread) ngx.say("task cancelled by timeout") return end ngx.say("task failed, result: ", res) end ngx.thread.kill(timeout_thread)
注意一點,在某一協程退出以後,咱們須要 kill 掉另一個協程。由於若是沒有調用 ngx.exit
之類的方法顯式退出的話,一直到全部協程退出爲止,當前階段都不會結束。post
引用文檔裏相關的內容:ui
By default, the corresponding Nginx handler (e.g., rewrite_by_lua handler) will not terminate untillua
both the "entry thread" and all the user "light threads" terminates,spa
a "light thread" (either the "entry thread" or a user "light thread" aborts by calling ngx.exit, ngx.exec, ngx.redirect, or ngx.req.set_uri(uri, true), orcode
the "entry thread" terminates with a Lua error.協程
Promise.race/all 能夠接收多個 Promise,而後打包成一個新的 Promise 返回。引用相關的文檔:
The Promise.race(iterable) method returns a promise that resolves or rejects as soon as one of the promises in the iterable resolves or rejects, with the value or reason from that promise.
The Promise.all(iterable) method returns a promise that resolves when all of the promises in the iterable argument have resolved, or rejects with the reason of the first passed promise that rejects.
這裏 reject 等價於協程運行中拋出 error,而 resolve 相對於協程返回告終果。這兩個 API 對於 reject 的處理是一致的,都是有任一出錯則馬上返回異常結果。對於正常結果,race 會在第一個結果出來時返回,而 all 則會在全部結果都出來後返回。
值得注意的是,Javascript 原生的 Promise 暫時沒有 cancell 的功能。因此即便其中一個 Promise reject 了,其餘 Promise 依然會繼續運行。對此咱們也照搬過來。
Promise.race 的實現:
local function apple() ngx.sleep(0.1) --error("apple lost") return "apple done" end local function banana() ngx.sleep(0.2) return "banana done" end local function carrot() ngx.sleep(0.3) return "carrot done" end local function race(...) local functions = {...} local threads = {} for _, f in ipairs(functions) do local th, err = ngx.thread.spawn(f) if not th then -- Promise.race 沒有實現 cancell 接口, -- 因此我偷下懶,無論已經建立的協程了 return nil, err end table.insert(threads, th) end local ok, res = ngx.thread.wait(unpack(threads)) if not ok then return nil, res end return res end local res, err = race(apple, banana, carrot) ngx.say("res: ", res, " err: ", err) ngx.exit(ngx.OK)
Promise.all 的實現:
local function all(...) local functions = {...} local threads = {} for _, f in ipairs(functions) do local th, err = ngx.thread.spawn(f) if not th then return nil, err end table.insert(threads, th) end local res_group = {} for _ = 1, #threads do local ok, res = ngx.thread.wait(unpack(threads)) if not ok then return nil, res end table.insert(res_group, res) end return res_group end
再進一步,試試模擬下 Go 裏面的 channel。
咱們須要實現以下的語義:
當數據沒有被消費時,生產者會在發送數據以後中斷運行。
當數據沒有被生產時,消費者會在接收數據以前中斷運行。
當存在等待消費者接收數據的生產者時,其餘生產者會在發送數據以前中斷運行。
此次要用到 ngx.semaphore
。
local semaphore = require "ngx.semaphore" local Chan = { new = function(self) local chan_attrs = { _read_sema = semaphore:new(), _write_sema = semaphore:new(), _exclude_sema = semaphore:new(), _buffer = nil, _waiting_thread_num = 0, } return setmetatable(chan_attrs, {__index = self}) end, send = function(self, value, timeout) timeout = timeout or 60 while self._buffer do self._waiting_thread_num = self._waiting_thread_num + 1 self._exclude_sema:wait(timeout) self._waiting_thread_num = self._waiting_thread_num - 1 end self._buffer = value self._read_sema:post() self._write_sema:wait(timeout) end, receive = function(self, timeout) timeout = timeout or 60 self._read_sema:wait(timeout) local value = self._buffer self._buffer = nil self._write_sema:post() if self._waiting_thread_num > 0 then self._exclude_sema:post() end return value end, } local chan = Chan:new() -- 如下是使用方法 local function worker_a(ch) for i = 1, 10 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_c(ch) for i = 11, 20 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_d(ch) for i = 21, 30 do ngx.sleep(math.random() / 10) ch:send(i, 1) end end local function worker_b(ch) for _ = 1, 20 do ngx.sleep(math.random() / 10) local v = ch:receive(1) ngx.say("recv ", v) end end local function worker_e(ch) for _ = 1, 10 do ngx.sleep(math.random() / 10) local v = ch:receive(1) ngx.say("recv ", v) end end ngx.thread.spawn(worker_a, chan) ngx.thread.spawn(worker_b, chan) ngx.thread.spawn(worker_c, chan) ngx.thread.spawn(worker_d, chan) ngx.thread.spawn(worker_e, chan)
模擬 Buffered channel 也是可行的。
local ok, new_tab = pcall(require, "table.new") if not ok then new_tab = function (_, _) return {} end end local BufferedChan = { new = function(self, buffer_size) if not buffer_size or buffer_size <= 0 then error("Invalid buffer_size " .. (buffer_size or "nil") .. " given") end local chan_attrs = { _read_sema = semaphore:new(), _write_sema = semaphore:new(), _waiting_thread_num = 0, _buffer_size = buffer_size, } chan_attrs._buffer = new_tab(buffer_size, 0) return setmetatable(chan_attrs, {__index = self}) end, send = function (self, value, timeout) timeout = timeout or 60 while #self._buffer >= self._buffer_size do self._waiting_thread_num = self._waiting_thread_num + 1 self._write_sema:wait(timeout) self._waiting_thread_num = self._waiting_thread_num - 1 end table.insert(self._buffer, value) self._read_sema:post() end, receive = function(self, timeout) timeout = timeout or 60 self._read_sema:wait(timeout) local value = table.remove(self._buffer) if self._waiting_thread_num > 0 then self._write_sema:post() end return value end, } local chan = BufferedChan:new(2) -- ...
固然上面的山寨貨仍是有不少問題的。好比它缺乏相當重要的 select 支持,另外也沒有實現 close 相關的特性。