chuck-lua使用的是單線程模型,依賴於底層高效率的事件回調框架.從前文介紹過的使用示例中能夠看出,基本接口與node.js相似,大量依賴方法回調.node
對於lua這種支持coroutine的語言,使用coroutine來將異步回調轉換成同步接口是很方便的.git
chuck-lua提供了兩種使用coroutine解決異步問題的方式:github
首先來看一個例子,看下如何將異步的redis事件轉換成同步接口:redis
redis.lua服務器
local chuck = require("chuck") local engine = require("distri.engine") local Sche = require("distri.uthread.sche") local LinkQue = require("distri.linkque") local map = {} local client = {} function client:new(c) local o = {} o.__index = client o.__gc = function() print("redis_client gc") end setmetatable(o,o) o.c = c o.pending = LinkQue:New() return o end function client:Close(err) local co while true do co = self.pending:Pop() if co then co = co[1] Sche.WakeUp(co,false,err) else map[self.c] = nil self.c:Close() self.c = nil return end end end function client:Do(cmd) local co = Sche.Running() if not co then return "should call in a coroutine context " end if not self.c then return "client close" end if "ok" == self.c:Execute(cmd,function (_,reply) Sche.WakeUp(co,true,reply) end) then local node = {co} self.pending:Push(node) --[[ 若是succ == true,則reply是操做結果, 不然,reply是傳遞給Close的err值 ]]-- local succ,reply = Sche.Wait() self.pending:Remove(node) if succ then return nil,reply else return reply end end return "error" end local redis = {} function redis.Connect(ip,port,on_error) local err,c = chuck.redis.Connect(engine,ip,port,function (_,err) local c = map[_] if c then on_error(c,err) end end) if c then return err,client:new(c) else return err end end return redis
redis.lua封裝了異步事件接口,向用戶提供了同步的調用方式,惟一的使用約束是redis:Do必須在coroutine上下文中才能被使用.網絡
咱們首先看下如何在第一種方式下使用這個接口:併發
local Distri = require("distri.distri") local Redis = require("distri.redis") local err,client = Redis.Connect("127.0.0.1",6379) if client then local function co_fun(i) local cmd = string.format("hmget chaid:%d chainfo skills",i) local err,reply = client:Do(cmd) if reply then for k,v in pairs(reply) do print(k,v) end end end for i = 1,1000 do Sche.Spawn(co_fun,i) end Distri.Run() end
在這種模式下,每一個redis任務都由一個單獨的coroutine直接執行.框架
接下來再看下如何利用pool和任務隊列完成一樣的效果:異步
local Distri = require("distri.distri") local Redis = require("distri.redis") local Task = require("distri.uthread.task") local err,client = Redis.Connect("127.0.0.1",6379) if client then for i = 1,1000 do Task.New(function () local cmd = string.format("hmget chaid:%d chainfo skills",i) local err,reply = client:Do(cmd) if reply then for k,v in pairs(reply) do print(k,v) end end end) end Distri.Run() end
對於每一個任務,使用Task.New建立一個任務,任務被建立以後會被添加到任務隊列尾部,預先建立的pool中的coroutine將會被喚醒,從隊列中提取任務並執行.socket
最後,再來看一下對於網絡消息,如何利用Task處理任務.
local Task = require("distri.uthread.task") local Distri = require("distri.distri") local Redis = require("distri.redis") local Socket = require("distri.socket") local Packet = require("chuck").packet local clone = Packet.clone local err,client = Redis.Connect("127.0.0.1",6379) if client then local server = Socket.stream.listen("127.0.0.1",8010,function (s,errno) if s then if s:Ok(4096,Socket.stream.rawdecoder,Task.Wrap(function (_,msg,errno) if msg then local cmd = string.format("hmget chaid:%d chainfo skills",1) local err,reply = client:Do(cmd) local result = "" if reply then for k,v in pairs(reply) do result = result .. v .. "\n" end else result = "error\n" end s:Send(Packet.rawpacket(result)) else s:Close() s = nil end end),client) then s:SetRecvTimeout(5000) end end end) if server then Distri.Run() end end
這是一個簡單的echo服務,當用戶鏈接上服務器發送消息,服務器收到消息以後提交一個redis請求,並將結果返回給客戶.
這裏的關鍵點是使用Task.Wrap封裝了事件回調函數.也就是說,對網絡消息的處理也是在coroutine上下中執行的.所以在事件回調中的阻塞並不會致使沒法響應其它到來的併發請求.