chuck-lua的併發處理

chuck-lua使用的是單線程模型,依賴於底層高效率的事件回調框架.從前文介紹過的使用示例中能夠看出,基本接口與node.js相似,大量依賴方法回調.node

對於lua這種支持coroutine的語言,使用coroutine來將異步回調轉換成同步接口是很方便的.git

chuck-lua提供了兩種使用coroutine解決異步問題的方式:github

  • 直接使用coroutine
  • coroutine pool配合任務隊列

首先來看一個例子,看下如何將異步的redis事件轉換成同步接口:redis

redis.lua服務器

local chuck   = require("chuck")
local engine  = require("distri.engine")
local Sche    = require("distri.uthread.sche")
local LinkQue = require("distri.linkque")

local map = {}

local client = {}

function client:new(c)
  local o = {}
  o.__index = client
  o.__gc    = function() print("redis_client gc") end      
  setmetatable(o,o)
  o.c = c
  o.pending = LinkQue:New()
  return o
end

function client:Close(err)
    local co
    while true do
        co = self.pending:Pop()
        if co then
            co = co[1]
            Sche.WakeUp(co,false,err)
        else
            map[self.c] = nil
            self.c:Close()
            self.c = nil
            return
        end
    end
end

function client:Do(cmd)
    local co = Sche.Running()

    if not co then return "should call in a coroutine context " end
    if not self.c then return "client close" end

    if "ok" == self.c:Execute(cmd,function (_,reply)
        Sche.WakeUp(co,true,reply)
    end) then
        local node = {co}
        self.pending:Push(node)
        --[[
            若是succ == true,則reply是操做結果,
            不然,reply是傳遞給Close的err值
        ]]--
        local succ,reply = Sche.Wait()
        self.pending:Remove(node)
        if succ then
            return nil,reply
        else
            return reply
        end 
    end
    return "error"
end

local redis = {}

function redis.Connect(ip,port,on_error)
    local err,c = chuck.redis.Connect(engine,ip,port,function (_,err)
        local c = map[_]
        if c then
            on_error(c,err)
        end
    end)
    if c then
        return err,client:new(c)
    else
        return err
    end
end

return redis

redis.lua封裝了異步事件接口,向用戶提供了同步的調用方式,惟一的使用約束是redis:Do必須在coroutine上下文中才能被使用.網絡

咱們首先看下如何在第一種方式下使用這個接口:併發

local Distri = require("distri.distri")
local Redis  = require("distri.redis")

local err,client = Redis.Connect("127.0.0.1",6379)

if client then

    local function co_fun(i)
        local cmd = string.format("hmget chaid:%d chainfo skills",i)    
        local err,reply = client:Do(cmd)
        if reply then
            for k,v in pairs(reply) do
                print(k,v)
            end
        end 
    end

    for i = 1,1000 do
        Sche.Spawn(co_fun,i)
    end
    Distri.Run()
end

在這種模式下,每一個redis任務都由一個單獨的coroutine直接執行.框架

接下來再看下如何利用pool和任務隊列完成一樣的效果:異步

local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Task   = require("distri.uthread.task")

local err,client = Redis.Connect("127.0.0.1",6379)

if client then
    for i = 1,1000 do
        Task.New(function ()
            local cmd = string.format("hmget chaid:%d chainfo skills",i)    
            local err,reply = client:Do(cmd)
            if reply then
                for k,v in pairs(reply) do
                    print(k,v)
                end
            end
        end)
    end
    Distri.Run()
end

對於每一個任務,使用Task.New建立一個任務,任務被建立以後會被添加到任務隊列尾部,預先建立的pool中的coroutine將會被喚醒,從隊列中提取任務並執行.socket

最後,再來看一下對於網絡消息,如何利用Task處理任務.

local Task   = require("distri.uthread.task")
local Distri = require("distri.distri")
local Redis  = require("distri.redis")
local Socket = require("distri.socket")
local Packet = require("chuck").packet
local clone     = Packet.clone

local err,client = Redis.Connect("127.0.0.1",6379)

if client then
    local server = Socket.stream.listen("127.0.0.1",8010,function (s,errno)
        if s then
            if s:Ok(4096,Socket.stream.rawdecoder,Task.Wrap(function (_,msg,errno)
                    if msg then                     
                        local cmd = string.format("hmget chaid:%d chainfo skills",1)    
                        local err,reply = client:Do(cmd)
                        local result = ""
                        if reply then
                            for k,v in pairs(reply) do
                                result = result .. v .. "\n"
                            end
                        else
                            result = "error\n"
                        end                 
                        s:Send(Packet.rawpacket(result))
                    else
                        s:Close()
                        s = nil
                    end
            end),client) then
                s:SetRecvTimeout(5000)
            end
        end     
    end)

    if server then
        Distri.Run()
    end 
end

這是一個簡單的echo服務,當用戶鏈接上服務器發送消息,服務器收到消息以後提交一個redis請求,並將結果返回給客戶.

這裏的關鍵點是使用Task.Wrap封裝了事件回調函數.也就是說,對網絡消息的處理也是在coroutine上下中執行的.所以在事件回調中的阻塞並不會致使沒法響應其它到來的併發請求.

https://github.com/sniperHW/chuck.git

相關文章
相關標籤/搜索