讀RILL SERVERnode
由於源碼是前段時間下載的,最近纔拿出來分析,今天發現已經更新了,好比刪除了module中訂閱那些代碼。可是並不影響整體的思路。
他加入了behavior3 、 pl 、FSM,DDZ等等有空在分析。
-----session
有幾個維度能夠分析。app
啓動,從service中的main啓動。而後逐個啓動service,
service啓動的時候,加載了faci下的全部文件,這個是一個服務的各項基本功能,
包括,如何從skynet接受消息,並如何分發到幾個不一樣實體上,dispatch forward 或者event 和 watch等。less
從一次登陸消息來講,首先來到gateway,而後從faci中的dispatch,根據cmd,轉發到forward的具體函數。處理完再發回。函數
1 首先進入service下的main,好比開啓了 login
傳入了2個參數ui
local p = skynet.newservice("login", "login", i)
2 繼續將參數帶入到下個faci.servicelua
local name, id = ... local s = require "faci.service" s.init(name, id)
3 這個是一個公共模塊。spa
而後進入faci.service,
把參數保存下來debug
function service.init(name, id) env.name = name or "nameless server" env.id = tonumber(id) or 0 end
11 而後咱們看看,這個文件加載的東西,包括dispatch event module 等等。
這些模塊都是每一個服務的一部分(基礎部分)。一個一個看。code
首先是dispatch,終於,咱們看到了他跟skynet接洽的部分,也就是,將"client"消息讓
函數:client_dispatch 處理。這個函數是系統跟 skynet的入口處。
還有一個就是,"lua"消息,由lua_dispatch處理。
skynet.dispatch("lua", lua_dispatch) skynet.register_protocol{ name = "client", id = skynet.PTYPE_CLIENT, unpack = skynet.unpack, dispatch = client_dispatch, }
12 delicious,美味啊,來看看做者是怎麼寫客戶端傳入的消息的。
hmm。。彷佛多了一個check?先無論他。
get_queue_id(cmd) 獲得 CMD的對應編號;
爲每一個fd,創建了一個等待隊列
該隊列下,對應的命令也初始化下。env.waiting_queue[fd][queue_id] = {}
function client_dispatch(session, source, fd, cmd, check, msg) local queue_id = get_queue_id(cmd) -- get_queue_id(cmd) 獲得 CMD的對應編號; if not queue_id then -- 若是指令不須要排隊則直接執行 client_dispatch_help(cmd, check, msg, fd, source) return end if not env.waiting_queue[fd] then env.waiting_queue[fd] = {} --爲每一個fd,創建了一個等待隊列 end if not env.waiting_queue[fd][queue_id] then env.waiting_queue[fd][queue_id] = {} --該隊列下,對應的命令也初始化下 end local queues = env.waiting_queue[fd][queue_id] if #queues > 0 then -- 若是有未完成的任務則插入後直接返回。 table.insert(queues, {cmd, check, msg, fd, source}) return end -- 若是沒有則直接進入後執行 table.insert(queues, {cmd, check, msg, fd, source}) for i = 1, 100 do local queue = table.remove(queues) if not queue then return end client_dispatch_help(table.unpack(queue)) end if #queues > 0 then -- 若是執行到這,也就是隊列超過100個則拋棄該FD的該指令隊列 log.error("%s queue is full, queue_id: %d", fd, queue_id) end env.waiting_queue[fd][queue_id] = nil end
13 繼續看指令的拆分和執行,拆分CMD中的
local function client_dispatch_help(cmd, check, msg, fd, source) msg._cmd = cmd msg._check = check --TODO check校驗 local cmdlist = string.split(cmd, ".") local isok, ret --派發到本服 isok, ret = local_dispatch(cmdlist[1], cmdlist[2], fd, msg, source) --派發到遠端 if not isok then isok, ret = romote_dispatch(cmdlist[1], cmdlist[2], fd, msg, source) end if ret then skynet.send(source, "lua", "send", fd, ret) end end
14 local_dispatch中cmd1應該是模塊名,好比login/game,cmd2,是具體的指令,好比是msgLogin這種。
注意,本地是調用forward的上的名爲cmd2方法。
遠程的話須要額外知道,player.romote[cmd1],也就是玩家的遠程服務的地址。
function local_dispatch(cmd1, cmd2, fd, msg, source) local module = env.module[cmd1] if type(module) ~= "table" then log.info("local_dispatch module is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg)) return false end local forward = module.forward if type(forward) ~= "table" then log.info("local_dispatch forward is not table, cmd = %s.%s, msg = %s", cmd1, cmd2, tool.dump(msg)) return false end local cb = forward[cmd2] if type(cb) ~= "function" then log.info("local_dispatch cb is not function, cmd = %s.%s, str = %s", cmd1, cmd2, tool.dump(msg)) return false end --開始分發 local v = get_v(fd) local isok, ret = xpcall(cb, traceback, v, msg, source) if not isok then log.error("local_dispatch handle msg error, cmd = %s, msg = %s, err=%s", cmd1, tool.dump(msg), ret) return true --報錯的狀況也表示分發到位 end return true, ret end
15 在看內部的消息轉發 lua_dispatch,可是接受3種保留類型,
其餘的消息都經過dispatch來作,跟上面經過forward相似,各自處理一類消息。
(相似CMD和 request)
watch 和 sys 消息類型不太明白
local function lua_dispatch(session, addr, cmd, ...) local cmdlist = string.split(cmd, ".") local cmd1 = cmdlist[1] local cmd2 = cmdlist[2] --forward分發 if cmd1 == "client_forward" and not cmd2 then local isok, msg = local_dispatch(...) skynet.retpack(isok, msg) return true elseif cmd1 == "watch" and not cmd2 then local isok, msg, acm = watch(...) skynet.retpack(isok, msg, acm) return true elseif cmd1 == "sys" then local isok, msg = sys_dispatch(cmd2, ...) skynet.retpack(isok, msg) return true end --模塊 local module = env.module[cmd1] if type(module) ~= "table" then log.info("lua_dispatch module is not table, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end local dispatch = module.dispatch if type(dispatch) ~= "table" then log.info("lua_dispatch dispatch is not table, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end local cb = dispatch[cmd2] if type(cb) ~= "function" then log.info("lua_dispatch cb is not function, cmd = %s.%s", cmd1, cmd2) skynet.ret() return false end --分發 local function skyret(ok, ...) if not ok then skynet.ret() else skynet.retpack(...) end end local ret = {xpcall(cb, traceback, ...)} local isok = ret[1] if not isok then log.info("lua_dispatch cb call fail, cmd = %s.%s, err = %s", cmd1, cmd2, ret) skynet.ret() return false end skyret(table.unpack(ret)) end
16 一個 watch 的例子,供其餘模塊查看該模塊內部的狀況
function module.watch(acm) --統計在線人數 local logined = 0 --成功登錄 for i, v in pairs(env.players) do logined = logined + 1 end local ret = {logined = logined} --總統計 acm.logined = acm.logined and acm.logined + logined or logined return ret, acm end
17 sys的命令經過dispatch執行,熱更新reload和stop。
18 再看看事件機制。假設有2個節點A 和B
A要訂閱B,就調用module中的subscribe_event,
提供如下細信息:要訂閱的事件,要訂閱的節點,目標服務
function M.subscribe_event(event, nodename, service, key) local local_service = skynet.self() if nodename == local_nodename then return skynet.call(service, "sys.subscribe_event", event, local_nodename, local_service, key) end return cluster.call(nodename, service, "sys.subscribe_event", event, local_nodename, local_service, key) end
此時,B收到這個訂閱消息,這樣處理
local sys = { } function sys.subscribe_event(event, nodename, service, key) faci._subscribe_event(event, nodename, service, key) return true end --上在DISPATH文件,下在MODULE中 function M._subscribe_event(event, nodename, service, key) if not env.events[event] then env.events[event] = {} end if not env.events[event][nodename] then env.events[event][nodename] = {} end if not env.events[event][nodename][service] then env.events[event][nodename][service] = {} end if not env.events[event][nodename][service][key] then env.events[event][nodename][service][key] = {} end env.events[event][nodename][service][key] = true end
事件的觸發
local event_cache = {} function M.fire_event(name, ...) --獲取列表 local cache = event_cache[name] if not cache then event_cache[name] = {} for i, v in pairs(env.module) do if type(v.event[name]) == "function" then table.insert(event_cache[name], v.event[name]) end end end cache = event_cache[name] --執行 for _, fun in ipairs(cache) do log.info("fire event %s", name) xpcall(fun, function(err) log.error(tostring(err)) log.error(debug.traceback()) end, ...) end --遠程事件 local events = env.events[name] if not events then return end for nodename, nodes in pairs(events) do for service, keys in pairs(nodes) do if nodename == localname then skynet.send(service, name, keys, ...) else cluster.send(nodename, service, name, keys, ...) end end end end
4 servive中,而後進入start
skynet.start(function() init() if env.init then env.init() end end)
5 看看init,將傳遞的name命名到該服務,設置他的全局變量。
初始化模塊的相關東西。
啓動了事件,喚醒和開始?
local function init() --名字和編號 local name = env.name local id = env.id if not name then return end --命名 local idstr = env.id > 0 and tostring(env.id) or "" local name = string.format("%s%s", name, idstr) skynet.name(name, skynet.self()) --設置 log.set_name(name) --全局變量 _G["env"] = env _G["log"] = log --模塊 module.init_modules() module.fire_event("awake") module.fire_event("start") log.debug("start ok "..name.."...") end
6 看看這個module.init_modules()
function M.init_modules() require_modules() end
7 繼續跟蹤,牛皮,加載了根目錄+mod+文件名(login)下的全部lua,直接require
在項目中,是2個文件login_forward.lua 和 login_mode_test.lua
local function require_modules() local path = skynet.getenv("app_root").."mod/"..env.name lfstool.attrdir(path, function(file) local file = string.match(file, ".*mod/(.*)%.lua") if file then log.info(string.format("%s%d require file:%s", env.name, env.id, file)) require(file) end end) end
8 回去繼續看 module.fire_event("awake")
從上下文理解,這個應該是啓動call event ,awake 不是解僱。
這裏沒太明白,大體意思應該是,凡是註冊過該節點的awake的都會調用,包括本地和遠程。
local event_cache = {} function M.fire_event(name, ...) --獲取列表 local cache = event_cache[name] if not cache then event_cache[name] = {} for i, v in pairs(env.module) do if type(v.event[name]) == "function" then table.insert(event_cache[name], v.event[name]) end end end cache = event_cache[name] --執行 for _, fun in ipairs(cache) do log.info("fire event %s", name) xpcall(fun, function(err) log.error(tostring(err)) log.error(debug.traceback()) end, ...) end --遠程事件 local events = env.events[name] if not events then return end for nodename, nodes in pairs(events) do for service, keys in pairs(nodes) do if nodename == localname then skynet.send(service, name, keys, ...) else cluster.send(nodename, service, name, keys, ...) end end end end
9 再回去看加載的那些,7.
這2個東西應該比較重要,一個是模塊,一個是全局變量。
先看看這個函數get_module
local module, static = faci.get_module("Login")
10 在文件module中,這裏給出了每一個模塊的4個基本動做
dispatch , forward , event, watch
dispatch是本文件處理的,這個是skynet必備的。其餘三個多是做者本身加的?
local module = {} function M.get_module(name) --模塊處理函數 env.module[name] = env.module[name] or { dispatch = {}, forward = {}, event = {}, watch = nil, } --模塊全局變量 env.static[name] = env.static[name] or { } return env.module[name], env.static[name] end