skynet源碼閱讀<5>--協程調度模型

注:爲方便理解,本文貼出的代碼部分通過了縮減或展開,與實際skynet代碼可能會有所出入。
    做爲一個skynet actor,在啓動腳本被加載的過程當中,老是要調用skynet.start和skynet.dispatch的,前者在skynet-os中作一些初始化工做,設置消息的Lua回調,後者則註冊針對某協議的解析回調。舉個例子:緩存

 1 local skynet = require "skynet"
 2 
 3 local function hello()
 4     skynet.ret(skynet.pack("Hello, World!"))
 5     print("hello OK")
 6 end
 7 
 8 skynet.start(function()
 9     skynet.dispatch("lua", function(session, address, cmd, ...)
10         assert(cmd == "hello")
11         hello()
12     end)
13 end)

    先是調用skynet.start註冊初始化回調,在其中調用skynet.dispatch註冊針對"lua"協議的解析回調。skynet的基本使用這裏咱們就很少說了,具體見官方文檔。下面咱們就從skynet.start(見skynet.lua)開始,逐一分析流程。session

1 function skynet.start(start_func)
2     c.callback(skynet.dispatch_message)
3     skynet.timeout(0, function()
4         skynet.init_service(start_func)
5     end)
6 end

這裏的c來自於require "skynet.core",它是在lua-skynet.c中註冊的,以下: 函數

 1 int luaopen_skynet_core(lua_State *L) {
 2     luaL_checkversion(L);
 3 
 4     luaL_Reg l[] = {
 5         ...
 6         { "callback", _callback },
 7         { NULL, NULL },
 8     };
 9 
10     luaL_newlibtable(L, l);
11 
12     lua_getfield(L, LUA_REGISTRYINDEX, "skynet_context");
13     struct skynet_context *ctx = lua_touserdata(L,-1);
14     if (ctx == NULL) {
15         return luaL_error(L, "Init skynet context first");
16     }
17 
18     luaL_setfuncs(L,l,1);
19 
20     return 1;
21 }

     能夠看到,它註冊了幾個函數,並將skynet_context實例做爲各函數的upvalue,方便調用時獲取。skynet.start中調用c.callback,對應的就是lua-skynet.c中的_callback函數,skynet.dispatch_message回調就是它的參數:ui

 1 static int _callback(lua_State *L) {
 2     struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
 3     int forward = lua_toboolean(L, 2);
 4     luaL_checktype(L,1,LUA_TFUNCTION);
 5     lua_settop(L,1);
 6     lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);
 7 
 8     lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
 9     lua_State *gL = lua_tothread(L,-1);
10 
11     if (forward) {
12         skynet_callback(context, gL, forward_cb);
13     } else {
14         skynet_callback(context, gL, _cb);
15     }
16 
17     return 0;
18 }

    能夠看到,其以函數_cb爲key,LUA回調(skynet.dispatch_message)做爲value被註冊到全局註冊表中。skynet_callback(在skynet_server.c中)則設置函數指針_cb爲C層面的消息處理函數:lua

1 void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
2     context->cb = cb;
3     context->cb_ud = ud;
4 }

    先不關注skynet-os內部的線程調度細節,只須要知道,skynet-context接收到消息後會轉發給context->cb處理,也就是_cb函數。在_cb中,從全局表中取到關聯的LUA回調,將type, msg, sz, session, source壓棧調用:spa

 1 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
 2     lua_State *L = ud;
 3     int trace = 1;
 4     int r;
 5     int top = lua_gettop(L);
 6     if (top == 0) {
 7         lua_pushcfunction(L, traceback);
 8         lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
 9     } else {
10         assert(top == 2);
11     }
12     lua_pushvalue(L,2);
13 
14     lua_pushinteger(L, type);
15     lua_pushlightuserdata(L, (void *)msg);
16     lua_pushinteger(L,sz);
17     lua_pushinteger(L, session);
18     lua_pushinteger(L, source);
19 
20     r = lua_pcall(L, 5, 0 , trace);
21 
22     if (r == LUA_OK) {
23         return 0;
24     }
25 }

    此時調用流程正式轉到skynet.lua中的skynet.dispatch_message: 操作系統

1 function skynet.dispatch_message(...)
2     local succ, err = pcall(raw_dispatch_message,...)
3     while true do
4         local key,co = next(fork_queue)
5         fork_queue[key] = nil
6         pcall(suspend,co,coroutine_resume(co))
7     end
8 end

    首先是將msg交由raw_dispatch_message做分發,而後開始處理fork_queue中緩存的fork協程:prototype

        pcall(suspend, co, coroutine_resume(co))線程

    這行代碼是咱們今天關注的重點。在繼續以前,我假設你對lua的協程有必定的瞭解,瞭解coroutine.resume,coroutine.yield的基本用法。coroutine就是lua裏的線程,它擁有本身的函數棧,但與咱們日常接觸的大多數操做系統裏的線程不一樣,是非搶佔式的。skynet對lua的coroutine做了封裝(詳見lua-profile.c),主要是增長了starttime和totaltime的監測,最終仍是交由lua的coroutine庫來處理的。既然這裏分析到了fork_queue,那咱們就先以skynet.fork爲例,看看它做了什麼:debug

1 function skynet.fork(func,...)
2     local args = table.pack(...)
3     local co = co_create(function()
4         func(table.unpack(args,1,args.n))
5     end)
6     table.insert(fork_queue, co)
7     return co
8 end

    skynet.fork作的事情很簡單,經過co_create建立一個coroutine並將其入隊fork_queue。看看co_create是如何建立協程的:

 1 local function co_create(f)
 2     local co = table.remove(coroutine_pool)
 3     if co == nil then
 4         co = coroutine.create(function(...)
 5             f(...)
 6             while true do
 7                 f = nil
 8                 coroutine_pool[#coroutine_pool+1] = co
 9                 f = coroutine_yield "EXIT"
10                 f(coroutine_yield())
11             end
12         end)
13     else
14         coroutine_resume(co, f)
15     end
16     return co
17 end

    調用co_create時,若是coroutine_pool爲空,它會建立一個新的co。co在第一次被resume時,會執行f,接着便進入一個使用和回收的無限循環。在這個循環中,先是收回co到coroutine_pool中,接着便yield "EXIT"到上一次的resume點A。當下一次被resume在點B喚醒時,會先將函數f傳遞過來,接着再次yield到點B,等待下一次在點D被resume喚醒時,傳遞須要的參數過來加以執行,完畢後回收,如此反覆。這樣看來,co的執行彷佛至關簡單。可是實際上要複雜一些,由於在執行f的過程當中,能夠再反覆地yield和resume。下面咱們舉個簡單的例子: 

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         skynet.sleep(10)
4         print ("skynet fork: <2>")
5     end)

     咱們把skynet.sleep展開: 

1     skynet.fork(function()
2         print ("skynet fork: <1>")
3         coroutine_yield("SLEEP", COMMAND_TIMEOUT(10))
4         print ("skynet fork: <2>")
5     end)

     下面開始分析調用流程。fork-co入隊,主co在skynet.dispatch_message中分發消息後取出fork-co,調用resume開始進入fork-co的函數f執行,以下圖所示,若是fork-co是第一次執行,是走圈1,若是是複用,則走圈1*(若是是複用的話,調用co_create時,會先coroutine_resume(co,f)一次進入fork-co,將用戶函數傳遞給while循環中的coroutine_yield "EXIT"點以後,接着fork-co再次yield讓出,等待實際傳參的調用)。接着進入用戶函數,COMMAND_TIMEOUT會先向skynet-kernal發送TIMEOUT命令,如圈2所示。而後yield "SLEEP"到主co的resume點1以後繼續執行,如圈3所示,按圈4的指向,調用suspend進入"SLEEP"分支,記錄下TIMEOUT-session與fork-co的映射關係。此時主co回到skynet.dispatch_message中繼續下一個fork-co的處理。當TIMEOUT消息回來時,會由主co再次進入skynet.dispatch_message並調用raw_dispatch_message分發,這時經過session拿到以前映射的fork-co,再次resume,按照圈5的指向,會跳轉到fork-co的yield "SLEEP"點以後繼續向下處理。用戶函數處理完畢後,回到上層調用,即圈6所指,回收fork-co,接着yield "EXIT"到主co所在raw_dispatch_message中的resume點以後,如圈7所示。進入suspend後,無額外命令,raw_dispatch_message處理結束,繼續主co的消息處理流程。

    由以上分析能夠看到,實際的協程跳轉過程是比較複雜的,也更顯得小小的LUA在skynet中的精巧運用。爲方便理解,順便貼出suspend的代碼(只列出了咱們關注的幾個命令,並作了刪減): 

 1 function suspend(co, result, command, param, size)
 2     if command == "CALL" then
 3         session_id_coroutine[param] = co
 4     elseif command == "SLEEP" then
 5         session_id_coroutine[param] = co
 6         sleep_session[co] = param
 7     elseif command == "RETURN" then
 8         local co_session = session_coroutine_id[co]
 9         local co_address = session_coroutine_address[co]
10         session_response[co] = true
11         c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
12         return suspend(co, coroutine_resume(co, ret))
13     elseif command == "EXIT" then
14         -- coroutine exit
15         local address = session_coroutine_address[co]
16         session_coroutine_id[co] = nil
17         session_coroutine_address[co] = nil
18         session_response[co] = nil
19     elseif command == nil then
20         -- debug trace
21         return
22     end
23 end

    看完skynet.fork,咱們再回過頭來,看一看skynet.dispatch_message中消息分發raw_dispatch_message的具體細節:

 1 local function raw_dispatch_message(prototype, msg, sz, session, source)
 2     -- skynet.PTYPE_RESPONSE = 1, read skynet.h
 3     if prototype == 1 then
 4         local co = session_id_coroutine[session]
 5         session_id_coroutine[session] = nil
 6         suspend(co, coroutine_resume(co, true, msg, sz))
 7     else
 8         local p = proto[prototype]
 9         local f = p.dispatch
10         local co = co_create(f)
11         session_coroutine_id[co] = session
12         session_coroutine_address[co] = source
13         suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
14     end
15 end

    RESPONSE的處理先就不說了,在敘述skynet.sleep時已經有所討論。消息會根據它的prototype查找proto,接着調用co_create取得協程user-co,將message解包後resume到user-co進入用戶函數f。然後續的流程則與上文咱們討論的fork是同樣的了。好比,在f中調用skynet.call時,會向目標發送消息,接着會yield到主co,回到這裏的resume點,接着進入suspend的"CALL"分支,記錄session與user-co的映射關係。下一次response消息回來時,會查找到user-co並resume喚醒,在skynet.call後繼續執行,用戶函數f結束後進入上層調用,回收user-co,等待新的調用。

    由於本篇咱們關注的是協程調度模型,而非具體的處理細節,所以有空再對skynet.call,skynet.ret等做詳細的細節分析。

相關文章
相關標籤/搜索