隨便讀讀skynet開源項目RILLSERVER

讀RILL SERVERnode


由於源碼是前段時間下載的,最近纔拿出來分析,今天發現已經更新了,好比刪除了module中訂閱那些代碼。可是並不影響整體的思路。
他加入了behavior3 、 pl 、FSM,DDZ等等有空在分析。
-----session

有幾個維度能夠分析。app

  1. 從啓動到、消息運轉的流程
  2. 從skynet的關鍵函數

啓動,從service中的main啓動。而後逐個啓動service,
service啓動的時候,加載了faci下的全部文件,這個是一個服務的各項基本功能,
包括,如何從skynet接受消息,並如何分發到幾個不一樣實體上,dispatch forward 或者event 和 watch等。less

從一次登陸消息來講,首先來到gateway,而後從faci中的dispatch,根據cmd,轉發到forward的具體函數。處理完再發回。函數

1 首先進入service下的main,好比開啓了 login
傳入了2個參數ui

local p = skynet.newservice("login", "login", i)

2 繼續將參數帶入到下個faci.servicelua

local name, id = ...
local s = require "faci.service"
s.init(name, id)

3 這個是一個公共模塊。spa

而後進入faci.service,
把參數保存下來debug

function service.init(name, id)
    env.name = name or "nameless server"
    env.id = tonumber(id) or 0
end

11 而後咱們看看,這個文件加載的東西,包括dispatch event module 等等。
這些模塊都是每一個服務的一部分(基礎部分)。一個一個看。code

首先是dispatch,終於,咱們看到了他跟skynet接洽的部分,也就是,將"client"消息讓
函數:client_dispatch 處理。這個函數是系統跟 skynet的入口處。

還有一個就是,"lua"消息,由lua_dispatch處理。

skynet.dispatch("lua", lua_dispatch)

skynet.register_protocol{
    name = "client",
    id = skynet.PTYPE_CLIENT,
    unpack = skynet.unpack,
    dispatch = client_dispatch, 
}

12 delicious,美味啊,來看看做者是怎麼寫客戶端傳入的消息的。
hmm。。彷佛多了一個check?先無論他。
get_queue_id(cmd) 獲得 CMD的對應編號;
爲每一個fd,創建了一個等待隊列
該隊列下,對應的命令也初始化下。env.waiting_queue[fd][queue_id] = {}

function client_dispatch(session, source, fd, cmd, check, msg)
    local queue_id = get_queue_id(cmd) -- get_queue_id(cmd) 獲得 CMD的對應編號;
    if not queue_id then    -- 若是指令不須要排隊則直接執行
        client_dispatch_help(cmd, check, msg, fd, source)
        return
    end
    if not env.waiting_queue[fd] then
        env.waiting_queue[fd] = {} --爲每一個fd,創建了一個等待隊列
    end
    if not env.waiting_queue[fd][queue_id] then
        env.waiting_queue[fd][queue_id] = {} --該隊列下,對應的命令也初始化下
    end
    local queues = env.waiting_queue[fd][queue_id]
    if #queues  > 0 then    -- 若是有未完成的任務則插入後直接返回。
        table.insert(queues, {cmd, check, msg, fd, source})
        return
    end
    -- 若是沒有則直接進入後執行
    table.insert(queues, {cmd, check, msg, fd, source})
    for i = 1, 100 do
        local queue = table.remove(queues) 
        if not queue then
            return
        end
        client_dispatch_help(table.unpack(queue))
    end
    if #queues > 0 then -- 若是執行到這,也就是隊列超過100個則拋棄該FD的該指令隊列
        log.error("%s queue is full, queue_id: %d", fd, queue_id)
    end
    env.waiting_queue[fd][queue_id] = nil
end

13 繼續看指令的拆分和執行,拆分CMD中的

local function client_dispatch_help(cmd, check, msg, fd, source)
    msg._cmd = cmd
    msg._check = check
    --TODO check校驗
    local cmdlist = string.split(cmd, ".")
    local isok, ret
    --派發到本服
    isok, ret = local_dispatch(cmdlist[1], cmdlist[2], fd, msg, source)
    --派發到遠端
    if not isok then
        isok, ret = romote_dispatch(cmdlist[1], cmdlist[2], fd, msg, source)
    end
    
    if ret then
        skynet.send(source, "lua", "send", fd, ret)
    end
end

14 local_dispatch中cmd1應該是模塊名,好比login/game,cmd2,是具體的指令,好比是msgLogin這種。
注意,本地是調用forward的上的名爲cmd2方法。
遠程的話須要額外知道,player.romote[cmd1],也就是玩家的遠程服務的地址。

function local_dispatch(cmd1, cmd2, fd, msg, source)
    local module = env.module[cmd1]
    if type(module) ~= "table" then
        log.info("local_dispatch module is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg))
        return false
    end
    
    local forward = module.forward
    if type(forward) ~= "table" then
        log.info("local_dispatch forward is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg))
        return false
    end
    
    local cb = forward[cmd2]
    if type(cb) ~= "function" then
        log.info("local_dispatch cb is not function, cmd = %s.%s, str = %s", cmd1, cmd2, tool.dump(msg))
        return false
    end
    --開始分發
    local v = get_v(fd)
    local isok, ret = xpcall(cb, traceback, v, msg, source)
    if not isok then
        log.error("local_dispatch handle msg error, cmd = %s, msg = %s, err=%s", cmd1, tool.dump(msg), ret)
        return true  --報錯的狀況也表示分發到位
    end

    return true, ret
end

15 在看內部的消息轉發 lua_dispatch,可是接受3種保留類型,

其餘的消息都經過dispatch來作,跟上面經過forward相似,各自處理一類消息。
(相似CMD和 request)
watch 和 sys 消息類型不太明白

local function lua_dispatch(session, addr, cmd, ...)

    local cmdlist = string.split(cmd, ".")
    local cmd1 = cmdlist[1]
    local cmd2 = cmdlist[2]
    --forward分發
    if cmd1 == "client_forward" and not cmd2 then
        local isok, msg = local_dispatch(...)
        skynet.retpack(isok, msg)
        return true
    elseif cmd1 == "watch" and not cmd2 then
        local isok, msg, acm = watch(...)
        skynet.retpack(isok, msg, acm)
        return true
    elseif cmd1 == "sys" then
        local isok, msg = sys_dispatch(cmd2, ...)
        skynet.retpack(isok, msg)
        return true
    end

    --模塊
    local module = env.module[cmd1]
    if type(module) ~= "table" then
        log.info("lua_dispatch module is not table, cmd = %s.%s", cmd1, cmd2)
        skynet.ret()
        return false
    end
    
    local dispatch = module.dispatch
    if type(dispatch) ~= "table" then
        log.info("lua_dispatch dispatch is not table, cmd = %s.%s", cmd1, cmd2)
        skynet.ret()
        return false
    end
    
    local cb = dispatch[cmd2]
    if type(cb) ~= "function" then
        log.info("lua_dispatch cb is not function, cmd = %s.%s", cmd1, cmd2)
        skynet.ret()
        return false
    end
    
    --分發
    local function skyret(ok, ...)
        if not ok then
            skynet.ret()
        else
            skynet.retpack(...)
        end
    end
    local ret = {xpcall(cb, traceback, ...)}
    local isok = ret[1]
    if not isok then
        log.info("lua_dispatch cb call fail, cmd = %s.%s, err = %s", cmd1, cmd2, ret)
        skynet.ret()
        return false
    end
    
    skyret(table.unpack(ret))
end

16 一個 watch 的例子,供其餘模塊查看該模塊內部的狀況

function module.watch(acm)
    --統計在線人數
    local logined = 0       --成功登錄
    for i, v in pairs(env.players) do
        logined = logined + 1
    end
    local ret = {logined = logined}
    --總統計
    acm.logined = acm.logined and acm.logined + logined or logined

    return ret, acm
end

17 sys的命令經過dispatch執行,熱更新reload和stop。

18 再看看事件機制。假設有2個節點A 和B
A要訂閱B,就調用module中的subscribe_event,

提供如下細信息:要訂閱的事件,要訂閱的節點,目標服務

function M.subscribe_event(event, nodename, service, key)
    local local_service = skynet.self()
    if nodename == local_nodename then
        return skynet.call(service, "sys.subscribe_event", event, local_nodename, local_service, key)
    end

    return cluster.call(nodename, service, "sys.subscribe_event", event, local_nodename, local_service, key)
end

此時,B收到這個訂閱消息,這樣處理

local sys = {
}

function sys.subscribe_event(event, nodename, service, key)
    faci._subscribe_event(event, nodename, service, key)
    return true
end
--上在DISPATH文件,下在MODULE中
function M._subscribe_event(event, nodename, service, key)
    if not env.events[event] then
        env.events[event] = {} 
    end
    if not env.events[event][nodename] then
        env.events[event][nodename] = {} 
    end
    if not env.events[event][nodename][service] then
        env.events[event][nodename][service] = {}
    end
    if not env.events[event][nodename][service][key] then
        env.events[event][nodename][service][key] = {}
    end

    env.events[event][nodename][service][key] = true
end

事件的觸發

local event_cache = {}
function M.fire_event(name, ...)
    --獲取列表
    local cache = event_cache[name]
    if not cache then
        event_cache[name] = {}
        for i, v in pairs(env.module) do
            if type(v.event[name]) == "function" then
                table.insert(event_cache[name], v.event[name])
            end
        end
    end
    cache = event_cache[name]
    --執行
    for _, fun in ipairs(cache) do
        log.info("fire event %s", name)
        xpcall(fun,  function(err) 
            log.error(tostring(err))
            log.error(debug.traceback())
        end, ...)
    end
    --遠程事件
    local events = env.events[name]
    if not events then
        return
    end
    for nodename, nodes in pairs(events) do
        for service, keys in pairs(nodes) do
            if nodename == localname then
                skynet.send(service, name, keys, ...)
            else
                cluster.send(nodename, service, name, keys, ...)
            end
        end
    end
end

4 servive中,而後進入start

skynet.start(function()
    init()
    if env.init then
        env.init()
    end
end)

5 看看init,將傳遞的name命名到該服務,設置他的全局變量。
初始化模塊的相關東西。
啓動了事件,喚醒和開始?

local function init()
    --名字和編號
    local name = env.name
    local id = env.id
    if not name then
        return
    end
    --命名
    local idstr = env.id > 0 and tostring(env.id) or ""
    local name = string.format("%s%s", name, idstr)
    skynet.name(name, skynet.self())
    --設置
    log.set_name(name)
    --全局變量
    _G["env"] = env
    _G["log"] = log
    --模塊
    module.init_modules()
    module.fire_event("awake")
    module.fire_event("start")
    log.debug("start ok "..name.."...")
end

6 看看這個module.init_modules()

function M.init_modules()
    require_modules()
end

7 繼續跟蹤,牛皮,加載了根目錄+mod+文件名(login)下的全部lua,直接require
在項目中,是2個文件login_forward.lua 和 login_mode_test.lua

local function require_modules()
    local path = skynet.getenv("app_root").."mod/"..env.name
    lfstool.attrdir(path, function(file)
    local file = string.match(file, ".*mod/(.*)%.lua")
        if file then
            log.info(string.format("%s%d require file:%s", env.name, env.id, file))
            require(file)
        end
    end)
end

8 回去繼續看 module.fire_event("awake")
從上下文理解,這個應該是啓動call event ,awake 不是解僱。
這裏沒太明白,大體意思應該是,凡是註冊過該節點的awake的都會調用,包括本地和遠程。

local event_cache = {}
function M.fire_event(name, ...)
    --獲取列表
    local cache = event_cache[name]
    if not cache then
        event_cache[name] = {}
        for i, v in pairs(env.module) do
            if type(v.event[name]) == "function" then
                table.insert(event_cache[name], v.event[name])
            end
        end
    end
    cache = event_cache[name]
    --執行
    for _, fun in ipairs(cache) do
        log.info("fire event %s", name)
        xpcall(fun,  function(err) 
            log.error(tostring(err))
            log.error(debug.traceback())
        end, ...)
    end
    --遠程事件
    local events = env.events[name]
    if not events then
        return
    end
    for nodename, nodes in pairs(events) do
        for service, keys in pairs(nodes) do
            if nodename == localname then
                skynet.send(service, name, keys, ...)
            else
                cluster.send(nodename, service, name, keys, ...)
            end
        end
    end
end

9 再回去看加載的那些,7.
這2個東西應該比較重要,一個是模塊,一個是全局變量。
先看看這個函數get_module

local module, static = faci.get_module("Login")

10 在文件module中,這裏給出了每一個模塊的4個基本動做
dispatch , forward , event, watch
dispatch是本文件處理的,這個是skynet必備的。其餘三個多是做者本身加的?

local module = {}
function M.get_module(name)
    --模塊處理函數
    env.module[name] = env.module[name] or {
        dispatch = {},
        forward = {},
        event = {},
        watch = nil,
    }
    --模塊全局變量
    env.static[name] = env.static[name] or {
    }
    return env.module[name], env.static[name]
end
相關文章
相關標籤/搜索