注:爲方便理解,本文貼出的代碼部分通過了縮減或展開,與實際skynet代碼可能會有所出入。
做爲一個skynet actor,在啓動腳本被加載的過程當中,老是要調用skynet.start和skynet.dispatch的,前者在skynet-os中作一些初始化工做,設置消息的Lua回調,後者則註冊針對某協議的解析回調。舉個例子:緩存
1 local skynet = require "skynet" 2 3 local function hello() 4 skynet.ret(skynet.pack("Hello, World!")) 5 print("hello OK") 6 end 7 8 skynet.start(function() 9 skynet.dispatch("lua", function(session, address, cmd, ...) 10 assert(cmd == "hello") 11 hello() 12 end) 13 end)
先是調用skynet.start註冊初始化回調,在其中調用skynet.dispatch註冊針對"lua"協議的解析回調。skynet的基本使用這裏咱們就很少說了,具體見官方文檔。下面咱們就從skynet.start(見skynet.lua)開始,逐一分析流程。session
1 function skynet.start(start_func) 2 c.callback(skynet.dispatch_message) 3 skynet.timeout(0, function() 4 skynet.init_service(start_func) 5 end) 6 end
這裏的c來自於require "skynet.core",它是在lua-skynet.c中註冊的,以下: 函數
1 int luaopen_skynet_core(lua_State *L) { 2 luaL_checkversion(L); 3 4 luaL_Reg l[] = { 5 ... 6 { "callback", _callback }, 7 { NULL, NULL }, 8 }; 9 10 luaL_newlibtable(L, l); 11 12 lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context"); 13 struct skynet_context *ctx = lua_touserdata(L,-1); 14 if (ctx == NULL) { 15 return luaL_error(L, "Init skynet context first"); 16 } 17 18 luaL_setfuncs(L,l,1); 19 20 return 1; 21 }
能夠看到,它註冊了幾個函數,並將skynet_context實例做爲各函數的upvalue,方便調用時獲取。skynet.start中調用c.callback,對應的就是lua-skynet.c中的_callback函數,skynet.dispatch_message回調就是它的參數:ui
1 static int _callback(lua_State *L) { 2 struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1)); 3 int forward = lua_toboolean(L, 2); 4 luaL_checktype(L,1,LUA_TFUNCTION); 5 lua_settop(L,1); 6 lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); 7 8 lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); 9 lua_State *gL = lua_tothread(L,-1); 10 11 if (forward) { 12 skynet_callback(context, gL, forward_cb); 13 } else { 14 skynet_callback(context, gL, _cb); 15 } 16 17 return 0; 18 }
能夠看到,其以函數_cb爲key,LUA回調(skynet.dispatch_message)做爲value被註冊到全局註冊表中。skynet_callback(在skynet_server.c中)則設置函數指針_cb爲C層面的消息處理函數:lua
1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { 2 context->cb = cb; 3 context->cb_ud = ud; 4 }
先不關注skynet-os內部的線程調度細節,只須要知道,skynet-context接收到消息後會轉發給context->cb處理,也就是_cb函數。在_cb中,從全局表中取到關聯的LUA回調,將type, msg, sz, session, source壓棧調用:spa
1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) { 2 lua_State *L = ud; 3 int trace = 1; 4 int r; 5 int top = lua_gettop(L); 6 if (top == 0) { 7 lua_pushcfunction(L, traceback); 8 lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); 9 } else { 10 assert(top == 2); 11 } 12 lua_pushvalue(L,2); 13 14 lua_pushinteger(L, type); 15 lua_pushlightuserdata(L, (void *)msg); 16 lua_pushinteger(L,sz); 17 lua_pushinteger(L, session); 18 lua_pushinteger(L, source); 19 20 r = lua_pcall(L, 5, 0 , trace); 21 22 if (r == LUA_OK) { 23 return 0; 24 } 25 }
此時調用流程正式轉到skynet.lua中的skynet.dispatch_message: 操作系統
1 function skynet.dispatch_message(...) 2 local succ, err = pcall(raw_dispatch_message,...) 3 while true do 4 local key,co = next(fork_queue) 5 fork_queue[key] = nil 6 pcall(suspend,co,coroutine_resume(co)) 7 end 8 end
首先是將msg交由raw_dispatch_message做分發,而後開始處理fork_queue中緩存的fork協程:prototype
pcall(suspend, co, coroutine_resume(co))線程
這行代碼是咱們今天關注的重點。在繼續以前,我假設你對lua的協程有必定的瞭解,瞭解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua裏的線程,它擁有本身的函數棧,但與咱們日常接觸的大多數操做系統裏的線程不一樣,是非搶佔式的。skynet對lua的coroutine做了封裝(詳見lua-profile.c),主要是增長了starttime和totaltime的監測,最終仍是交由lua的coroutine庫來處理的。既然這裏分析到了fork_queue,那咱們就先以skynet.fork爲例,看看它做了什麼:debug
1 function skynet.fork(func,...) 2 local args = table.pack(...) 3 local co = co_create(function() 4 func(table.unpack(args,1,args.n)) 5 end) 6 table.insert(fork_queue, co) 7 return co 8 end
skynet.fork作的事情很簡單,經過co_create建立一個coroutine並將其入隊fork_queue。看看co_create是如何建立協程的:
1 local function co_create(f) 2 local co = table.remove(coroutine_pool) 3 if co == nil then 4 co = coroutine.create(function(...) 5 f(...) 6 while true do 7 f = nil 8 coroutine_pool[#coroutine_pool+1] = co 9 f = coroutine_yield "EXIT" 10 f(coroutine_yield()) 11 end 12 end) 13 else 14 coroutine_resume(co, f) 15 end 16 return co 17 end
調用co_create時,若是coroutine_pool爲空,它會建立一個新的co。co在第一次被resume時,會執行f,接着便進入一個使用和回收的無限循環。在這個循環中,先是收回co到coroutine_pool中,接着便yield "EXIT"到上一次的resume點A。當下一次被resume在點B喚醒時,會先將函數f傳遞過來,接着再次yield到點B,等待下一次在點D被resume喚醒時,傳遞須要的參數過來加以執行,完畢後回收,如此反覆。這樣看來,co的執行彷佛至關簡單。可是實際上要複雜一些,由於在執行f的過程當中,能夠再反覆地yield和resume。下面咱們舉個簡單的例子:
1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 skynet.sleep(10) 4 print ("skynet fork: <2>") 5 end)
咱們把skynet.sleep展開:
1 skynet.fork(function() 2 print ("skynet fork: <1>") 3 coroutine_yield("SLEEP", COMMAND_TIMEOUT(10)) 4 print ("skynet fork: <2>") 5 end)
下面開始分析調用流程。fork-co入隊,主co在skynet.dispatch_message中分發消息後取出fork-co,調用resume開始進入fork-co的函數f執行,以下圖所示,若是fork-co是第一次執行,是走圈1,若是是複用,則走圈1*(若是是複用的話,調用co_create時,會先coroutine_resume(co,f)一次進入fork-co,將用戶函數傳遞給while循環中的coroutine_yield "EXIT"點以後,接着fork-co再次yield讓出,等待實際傳參的調用)。接着進入用戶函數,COMMAND_TIMEOUT會先向skynet-kernal發送TIMEOUT命令,如圈2所示。而後yield "SLEEP"到主co的resume點1以後繼續執行,如圈3所示,按圈4的指向,調用suspend進入"SLEEP"分支,記錄下TIMEOUT-session與fork-co的映射關係。此時主co回到skynet.dispatch_message中繼續下一個fork-co的處理。當TIMEOUT消息回來時,會由主co再次進入skynet.dispatch_message並調用raw_dispatch_message分發,這時經過session拿到以前映射的fork-co,再次resume,按照圈5的指向,會跳轉到fork-co的yield "SLEEP"點以後繼續向下處理。用戶函數處理完畢後,回到上層調用,即圈6所指,回收fork-co,接着yield "EXIT"到主co所在raw_dispatch_message中的resume點以後,如圈7所示。進入suspend後,無額外命令,raw_dispatch_message處理結束,繼續主co的消息處理流程。
由以上分析能夠看到,實際的協程跳轉過程是比較複雜的,也更顯得小小的LUA在skynet中的精巧運用。爲方便理解,順便貼出suspend的代碼(只列出了咱們關注的幾個命令,並作了刪減):
1 function suspend(co, result, command, param, size) 2 if command == "CALL" then 3 session_id_coroutine[param] = co 4 elseif command == "SLEEP" then 5 session_id_coroutine[param] = co 6 sleep_session[co] = param 7 elseif command == "RETURN" then 8 local co_session = session_coroutine_id[co] 9 local co_address = session_coroutine_address[co] 10 session_response[co] = true 11 c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) 12 return suspend(co, coroutine_resume(co, ret)) 13 elseif command == "EXIT" then 14 -- coroutine exit 15 local address = session_coroutine_address[co] 16 session_coroutine_id[co] = nil 17 session_coroutine_address[co] = nil 18 session_response[co] = nil 19 elseif command == nil then 20 -- debug trace 21 return 22 end 23 end
看完skynet.fork,咱們再回過頭來,看一看skynet.dispatch_message中消息分發raw_dispatch_message的具體細節:
1 local function raw_dispatch_message(prototype, msg, sz, session, source) 2 -- skynet.PTYPE_RESPONSE = 1, read skynet.h 3 if prototype == 1 then 4 local co = session_id_coroutine[session] 5 session_id_coroutine[session] = nil 6 suspend(co, coroutine_resume(co, true, msg, sz)) 7 else 8 local p = proto[prototype] 9 local f = p.dispatch 10 local co = co_create(f) 11 session_coroutine_id[co] = session 12 session_coroutine_address[co] = source 13 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) 14 end 15 end
RESPONSE的處理先就不說了,在敘述skynet.sleep時已經有所討論。消息會根據它的prototype查找proto,接着調用co_create取得協程user-co,將message解包後resume到user-co進入用戶函數f。然後續的流程則與上文咱們討論的fork是同樣的了。好比,在f中調用skynet.call時,會向目標發送消息,接着會yield到主co,回到這裏的resume點,接着進入suspend的"CALL"分支,記錄session與user-co的映射關係。下一次response消息回來時,會查找到user-co並resume喚醒,在skynet.call後繼續執行,用戶函數f結束後進入上層調用,回收user-co,等待新的調用。
由於本篇咱們關注的是協程調度模型,而非具體的處理細節,所以有空再對skynet.call,skynet.ret等做詳細的細節分析。