Lua層消息處理機制在lualib/skynet.lua,提供大部分Lua層的api(最終會調用到c層的api),包括啓動一個snlua服務時Lua層的處理,建立新服務,註冊服務協議,如何發送消息,如何處理對方發過來的消息等。本篇主要介紹消息處理機制,從而理解skynet如何實現高併發。node
爲了簡化,代碼裏用到的coroutine_resume,coroutine_yield當作coroutine.resume,coroutine.yield便可。api
local coroutine_resume = profile.resume local coroutine_yield = profile.yield
coroutine.create,建立一個co,惟一的參數是co要執行的閉包f,此時是不會執行閉包f的緩存
coroutine.resume,執行一個co,第一個參數是co的句柄,若是是第一次執行,其餘參數是傳遞給閉包f的。co啓動後,一直執行直到它終止或讓出。正常終止,返回true和閉包f的返回值;發生錯誤異常終止,則返回false和錯誤信息session
coroutine.yield,使co暫停,讓出執行權。對應最近的resume會馬上返回,返回true和yield的參數。下一次resume同一個co時,會從讓出點繼續執行,此時,yield的調用會馬上返回,返回值爲resume除第一個參數以外的其餘參數閉包
引用Lua文檔介紹協程coroutine(簡稱co)的經典例子,能夠看出,co能夠被不斷的暫停和重啓。skynet普遍使用co,當發送一個rpc請求時會暫停當前co,等對方返回時又重啓co。併發
先闡述下skynet建立協程(co)的方式,經過co_create(f)這個api建立一個協程,這段代碼很是有意思。爲了性能,skynet會把建立的co放到緩存裏(第9行),當協程執行完流程(閉包f)後不會終止,而是暫停(第10行)。當調用者調用co_create這個api時,若是緩存裏沒有,經過coroutine.create建立一個co,此時是不會執行閉包f,而後在某個時刻(一般是收到消息調用消息分發skynet.dispatch_message)會重啓(附帶須要的參數)這個co,co接着執行閉包f(第6行),最後暫停以等待下一次使用,對應最近的resume返回true和「EXIT」(第10行);若是是一個複用的co,重啓co(第15行,參數是將要執行的閉包f),yield會馬上返回把閉包賦值給f(第10行),在11行又暫停,一樣在某個時刻會重啓(附帶須要的參數)這個co,co接着執行閉包f(第11行),最後又在第10行暫停等待下一次使用。函數
1 -- lualib/skynet.lua 2 local function co_create(f) 3 local co = table.remove(coroutine_pool) 4 if co == nil then 5 co = coroutine.create(function(...) 6 f(...) 7 while true do 8 f = nil 9 coroutine_pool[#coroutine_pool+1] = co 10 f = coroutine_yield "EXIT" 11 f(coroutine_yield()) 12 end 13 end) 14 else 15 coroutine_resume(co, f) 16 end 17 return co 18 end
瞭解了co_create的原理後,接下來以服務A向服務B發一條消息爲例說明skynet是如何處理Lua層消息:高併發
-- A.lua local skynet = require "skynet" skynet.start(function() print(skynet.call("B", "lua", "aaa")) end)
-- B.lua local skynet = require "skynet" require "skynet.manager" skynet.start(function() skynet.dispatch("lua", function(session, source, ...) skynet.ret(skynet.pack("OK")) end) skynet.register "B" end)
在服務啓動最後會調用skynet.start,skynet.start調用skynet.timeout,在timeout裏會建立一個co(12行),稱之爲服務的主協程co1,此時co1不會執行性能
1 -- lualib/skynet.lua 2 function skynet.start(start_func) 3 c.callback(skynet.dispatch_message) 4 skynet.timeout(0, function() 5 skynet.init_service(start_func) 6 end) 7 end 8 9 function skynet.timeout(ti, func) 10 local session = c.intcommand("TIMEOUT",ti) 11 assert(session) 12 local co = co_create(func) 13 assert(session_id_coroutine[session] == nil) 14 session_id_coroutine[session] = co 15 end
定時器被觸發(由於定時器設置是0,因此下一幀就觸發)會向服務發送一條「RESPONSE」類型(PTYPE_RESPONSE=1)的消息ui
// skynet-src/skynet_timer.c
static inline void dispatch_list(struct timer_node *current) { ... message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT; ... }
服務收到消息後,調用消息分發api,因爲消息類型是RESPONSE,最終會執行到第7行。重啓主協程co1,執行co1的閉包f(這裏是skynet.init_service(start_func)),若是閉包f裏沒有暫停的操做,待閉包f成功運行完,co1暫停,resume會返回true和"EXIT",接下來,第7行就變成,suspend(co, true, "EXIT")
1 -- luablib/skynet.lua 2 local function raw_dispatch_message(prototype, msg, sz, session, source) 3 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 4 if prototype == 1 then 5 local co = session_id_coroutine[session] 6 ... 7 suspend(co, coroutine_resume(co, true, msg, sz)) 8 ... 9 end
而後,調用suspend,因爲類型是"EXIT",作一些清理工做便可。
-- lualib/skynet.lua function suspend(co, result, command, param, size) ... elseif command == "EXIT" then -- coroutine exit local address = session_coroutine_address[co] if address then release_watching(address) session_coroutine_id[co] = nil session_coroutine_address[co] = nil session_response[co] = nil end ... end
當閉包f裏有暫停操做,好比A服務向B服務發送消息skynet.call("B", "lua", "aaa"),這裏分別講解A服務和B服務是如何處理的:
對於A服務:
首先在c層把消息發送出去(第14行,把消息push到目的服務的次級消息隊列),而後暫停co1,resume返回true,"CALL"和session值
1 -- lualib/skynet.lua 2 local function yield_call(service, session) 3 watching_session[session] = service 4 local succ, msg, sz = coroutine_yield("CALL", session) 5 watching_session[session] = nil 6 if not succ then 7 error "call failed" 8 end 9 return msg,sz 10 end 11 12 function skynet.call(addr, typename, ...) 13 local p = proto[typename] 14 local session = c.send(addr, p.id , nil , p.pack(...)) 15 if session == nil then 16 error("call to invalid address " .. skynet.address(addr)) 17 end 18 return p.unpack(yield_call(addr, session)) 19 end
而後調用suspend(co, true, "CALL", session),類型是"CALL",以session爲key,co爲value保存在session_id_coroutine裏,以便當B服務對A的請求返回後,根據session找到對應的co,從而能夠重啓co
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 ... 4 if command == "CALL" then 5 session_id_coroutine[param] = co 6 ... 7 end
當A收到B的返回消息時,調用消息分發api,根據session找到對應的co(即主協程co1),從上一次暫停點重啓它,下面這一行代碼yield會馬上返回,打印出B返回的結果print(...)(A.lua),此時執行完co1整個流程,返回true和「EXIT」給suspend,對co1作一些清理工做。
local succ, msg, sz = coroutine_yield("CALL", session)
稍微改一下A.lua,co1執行閉包f流程中經過fork建立一個協程(稱爲co2),因爲co1沒有暫停,會一直執行完整個流程。此時co2並無執行。
1 -- A.lua 2 local skynet = require "skynet" 3 4 skynet.start(function() 5 skynet.fork(function() 6 print(skynet.call("B", "lua", "aaa")) 7 end) 8 end)
1 -- lualib/skynet.lua 2 function skynet.fork(func,...) 3 local args = table.pack(...) 4 local co = co_create(function() 5 func(table.unpack(args,1,args.n)) 6 end) 7 table.insert(fork_queue, co) 8 return co 9 end
消息分發api作的第二件事是處理fork_queue裏的co。因此收到定時器發送回來的消息後作的第二件事是重啓co2,向B服務發送消息後暫停co2,直到B返回時再重啓co2。
1 -- lualib/skynet.lua 2 function skynet.dispatch_message(...) 3 ... 4 local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co)) 5 ... 6 end
對於B服務:
收到A服務的消息後調用消息分發api,建立一個co(第12行),co要執行的閉包f是已註冊的消息回調函數p.dispatch(第4行),而後經過resume重啓它(第15行)
1 -- lualib/skynet.lua 2 local function raw_dispatch_message(prototype, msg, sz, session, source) 3 ... 4 local f = p.dispatch 5 if f then 6 local ref = watching_service[source] 7 if ref then 8 watching_service[source] = ref + 1 9 else 10 watching_service[source] = 1 11 end 12 local co = co_create(f) 13 session_coroutine_id[co] = session 14 session_coroutine_address[co] = source 15 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) 16 ... 17 end
執行skynet.ret(skynet.pack("OK")),調用yield暫停它(第4行),最近的resume返回,上面第15行變成suspend(co, true, "RETURN", msg, sz)
1 -- lualib/skynet.lua 2 function skynet.ret(msg, sz) 3 msg = msg or "" 4 return coroutine_yield("RETURN", msg, sz) 5 end
當command=="RETURN"時,作兩件事:1. 向源地址(即A服務)發送返回消息(第5行);2. 重啓co(第7行),co從skynet.ret返回,而後B服務的消息回調函數(p.dispatch)執行完,co的閉包f所有執行完放入緩存中,返回true和「EXIT「給suspend
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 ... 4 elseif command == "RETURN" then 5 ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) ~= nil 6 ... 7 return suspend(co, coroutine_resume(co, ret)) 8 ... 9 end
至此,就是Lua層消息處理的整個流程。
在一些狀況下須要作異常處理,好比沒有註冊對應消息類型的協議,沒有提供消息回調函數,執行co過程當中發生錯誤等。當一個服務處理一條消息的過程發生異常,必需要作兩件事:1. 異常終止當前co;2. 通知消息發送方,而不是讓對方一直忙等待。
當執行co過程當中發生錯誤時,resume第一個返回值是false,調用suspend,向對方發送一條PTYPE_ERROR類型消息(第9行),而後拋出異常,終止當前co(第14行)。
1 -- lualib/skynet.lua 2 function suspend(co, result, command, param, size) 3 if not result then 4 local session = session_coroutine_id[co] 5 if session then -- coroutine may fork by others (session is nil) 6 local addr = session_coroutine_address[co] 7 if session ~= 0 then 8 -- only call response error 9 c.send(addr, skynet.PTYPE_ERROR, session, "") 10 end 11 session_coroutine_id[co] = nil 12 session_coroutine_address[co] = nil 13 end 14 error(debug.traceback(co,tostring(command))) 15 end 16 ... 17 end
大部分異常狀況下,都會向對方發送一條PTYPE_ERROR類型消息通知對方,當收到PYTPE_ERROR類型消息,會調用_error_dispatch,把error_source記錄在dead_service裏,把error_session記錄在error_queue裏
1 -- lualib/skynet.lua 2 local function _error_dispatch(error_session, error_source) 3 if error_session == 0 then 4 -- service is down 5 -- Don't remove from watching_service , because user may call dead service 6 if watching_service[error_source] then 7 dead_service[error_source] = true 8 end 9 for session, srv in pairs(watching_session) do 10 if srv == error_source then 11 table.insert(error_queue, session) 12 end 13 end 14 else 15 -- capture an error for error_session 16 if watching_session[error_session] then 17 table.insert(error_queue, error_session) 18 end 19 end 20 end
在suspend最後會調用dispatch_error_queue處理error_queue,經過session查找到正在等待的co,而後強制終止它,保證co不會一直忙等待。
1 -- lualib/skynet.lua 2 local function dispatch_error_queue() 3 local session = table.remove(error_queue,1) 4 if session then 5 local co = session_id_coroutine[session] 6 session_id_coroutine[session] = nil 7 return suspend(co, coroutine_resume(co, false)) 8 end 9 end
一次同步的rpc請求的流程以下圖。當一個服務當前co暫停時,能夠去執行服務裏其餘co的流程,N個co之間能夠交叉執行,一個co暫停並不會影響其餘co的執行,最大化提供計算能力,實現高併發。