skynet1.0閱讀筆記2_skynet的消息投遞skynet.call


爲了瞭解 skynet.call 的調用過程,須要先看看 skynet的隊列是如何把包分到不一樣工做線程的。看下圖session

查看 global_queue 的skynet_globalmq_push和skynet_globamq_pop,很容易能夠找到兩個關鍵的函數:函數

skyent_context_push

ui

skynet_context_message_dispatch

先來看出口,skynet_context_message_dispatch。在skynet的啓動函數中,咱們已經知道skynet_start裏面的start(config->thread)啓動了 worker等線程:lua

thread_worker(void *p) {
    //初始化
    ...
    struct message_queue * q = NULL;
    while (!m->quit) {
        //循環調用 skynet_context_message_dispatch
        q = skynet_context_message_dispatch(sm, q, weight);
        if (q == NULL) {
            //沒包了就掛其線程
            ...
        }
    }
    return NULL;
}

很清晰的代碼,worker線程不斷調用 skynet_context_message_dispatch 來讀取q裏面的skynet_message 隊列,並進行分發。咱們來看它是怎麼分發的。spa

 

struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    //q爲空時,從新從global_queue中取下一個 message_queue
    if (q == NULL) {
        q = skynet_globalmq_pop();
        if (q==NULL)
            return NULL;
    }

    //當前 message_queue 所屬服務的 context 的handle id
    uint32_t handle = skynet_mq_handle(q);

    struct skynet_context * ctx = skynet_handle_grab(handle);
    if (ctx == NULL) {
        struct drop_t d = { handle };
        skynet_mq_release(q, drop_message, &d);
        return skynet_globalmq_pop();
    }

    int i,n=1;
    struct skynet_message msg;

    for (i=0;i<n;i++) {
        //從message_queue 中 pop一個msg出來
        if (skynet_mq_pop(q,&msg)) {
            //若message_queue爲空,返回1表示失敗,釋放ctx的引用次數
            skynet_context_release(ctx);
            //把返回global_queue裏面的下一個message_queue,以供skynet_context_message_dispatch調用
            return skynet_globalmq_pop();
        } else if (i==0 && weight >= 0) {
            n = skynet_mq_length(q);
            n >>= weight;
        }
        int overload = skynet_mq_overload(q);
        if (overload) {
            skynet_error(ctx, "May overload, message queue length = %d", overload);
        }

        skynet_monitor_trigger(sm, msg.source , handle);

        //若 ctx->cb不爲空,使用dispatch_message調用 ctx->cb
        if (ctx->cb == NULL) {
            skynet_free(msg.data);
        } else {
            dispatch_message(ctx, &msg);
        }

        skynet_monitor_trigger(sm, 0,0);
    }

    assert(q == ctx->queue);
    //若global_queue中還有下一個message_queue,返回下一個message_queue供分發,若爲空則繼續執行當前message_queue的請求
    struct message_queue *nq = skynet_globalmq_pop();
    if (nq) {
        // If global mq is not empty , push q back, and return next queue (nq)
        // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
        skynet_globalmq_push(q);
        q = nq;
    } 
    skynet_context_release(ctx);

    return q;
}

那麼,從全局隊列最終拿到的 skynet_message包,最後交由了 dispatch_message和ctx-cb來處理了。dispatch_message把msg裏面的東西取出來後,調用ctx->cb來進行處理prototype

    static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
        ...
        if (!ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz)) {
            skynet_free(msg->data);
        } 
        CHECKCALLING_END(ctx)
    }

在skynet的啓動筆記中,已經知道了,首先是:
snlua_init 用 skynet_callback(ctx,l,_launch) 把 ctx->cb註冊爲 _launch
而後立馬投遞第一個消息
消息從新進到 dispatch_message,調用 _launch ,把 ctx->cb註冊爲 skynet.dispatch_message線程

啓動完成後,之後的全部消息其實都進到了 skynet.dispatch_message,而後調用了 raw_dispatch_messagecode

    function skynet.dispatch_message(...)
        local succ, err = pcall(raw_dispatch_message,...)
        ...
    end

那接着來看 raw_dispatch_message協程

local function raw_dispatch_message(prototype, msg, sz, session, source, ...)
        if prototype == 1 then
            ...
        else
            local p = proto[prototype]
            ...
            local f = p.dispatch

            if f then
                ...
                local co = co_create(f)
                ...
                suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
            end
        end
    end

接下來要找的是 proto[prototype].disproto全部位置,找出那裏定義 dispatch
的。看到 這個函數:blog

  function skynet.register_protocol(class)
        local name = class.name
        local id = class.id
        ...
        proto[name] = class
        proto[id] = class
    end

以及

    function skynet.dispatch(typename, func)
        local p = proto[typename]
        if func then
            local ret = p.dispatch
            p.dispatch = func
            return ret
        else
            return p and p.dispatch
        end
    end

 

沒錯了,就是class.dispatch 。全部的消息,最終進到的就是 class.dispatch。

///////////////////////////////////////////////////////////////////////
skynet.regiser_protocol 和 skynet.dispatch 你會在lua服務中常常看見。以launcher.lua爲例子

launcher.lua在啓動時,註冊一個 name爲"text"的table,它的dispatch也定義在下面
因此你應該能看到 skynet.call(".launcher","text",...)這種調用

    skynet.register_protocol {
        name = "text",
        id = skynet.PTYPE_TEXT,
        unpack = skynet.tostring,
        dispatch = function(session, address , cmd)
            if cmd == "" then
                command.LAUNCHOK(address)
            elseif cmd == "ERROR" then
                command.ERROR(address)
            else
                error ("Invalid text command " .. cmd)
            end
        end,
    }

    //定義 launcher服務的 proto["lua"] 的dispatch 
    skynet.dispatch("lua", function(session, address, cmd , ...)
        cmd = string.upper(cmd)
        local f = command[cmd]
        if f then
            local ret = f(address, ...)
            if ret ~= NORET then
                skynet.ret(skynet.pack(ret))
            end
        else
            skynet.ret(skynet.pack {"Unknown command"} )
        end
    end)

 

但這裏還有一個問題,上面的proto["lua"] 是誰註冊的呢? 查找skynet.register_protocol,咱們能找到這個位置:

--skynet.lua
----- register protocol
do
    local REG = skynet.register_protocol

    REG {
        name = "lua",
        id = skynet.PTYPE_LUA,
        pack = skynet.pack,
        unpack = skynet.unpack,
    }

    REG {
        name = "response",
        id = skynet.PTYPE_RESPONSE,
    }

    REG {
        name = "error",
        id = skynet.PTYPE_ERROR,
        unpack = function(...) return ... end,
        dispatch = _error_dispatch,
    }
end

在你第一次require "skynet"
的時候,它已經默認幫你註冊了"lua","response","error"3種消息,而後你建立新的lua服務時,調用skynet.dispatch 爲 proto["lua"] 指定dispatch,以後經過 skynet.call("服務名","lua",...) 調用的消息就能最終投遞到你定義的處理函數裏面了。


到了這裏,從隊列取出數據,並分發到指定處理函數dispath的完整流程咱們以及看到了。接下來,咱們來看 消息是若是放入global_queue的。

來看 skynet.call 函數(skynet.send其實也同樣的,只是它無論返回)

    function skynet.call(addr, typename, ...)
        //如proto["lua"] ,消息類型id放入msg中
        local p = proto[typename]
        local session = c.send(addr, p.id , nil , p.pack(...))
        ...
        //等待返回
        return p.unpack(yield_call(addr, session))
    end

這裏的 c.send 的調用,咱們看一下 c 的定義:
local c = require "skynet.core"
這裏的 skynet.core ,實際上調用的是 skynet.so ,而從 skynet 的make log咱們能夠看到這樣一行:

cc -g -O2 -Wall -I3rd/lua -fPIC --shared lualib-src/lua-skynet.c lualib-src/lua-seri.c -o luaclib/skynet.so -Iskynet-src -Iservice-src -Ilualib-src

在 lualib-src/lua-skynet.c 中,咱們看到這段代碼:

    luaL_Reg l[] = {
        { "send" , _send },
        { "genid", _genid },
        { "redirect", _redirect },
        { "command" , _command },
        { "intcommand", _intcommand },
        { "error", _error },
        { "tostring", _tostring },
        { "harbor", _harbor },
        { "pack", _luaseri_pack },
        { "unpack", _luaseri_unpack },
        { "packstring", lpackstring },
        { "trash" , ltrash },
        { "callback", _callback },
        { NULL, NULL },
    };

這裏的 luaL_Reg 把c函數註冊到lua中,從而讓lua調用這些函數。
因此 c.send 調用的,就是這裏的 _send
_send 調用了 skynet_send ,若是目標在當前進程,將調用 skynet_context_push
而後 skyent_context_push 調用
skynet_mq_push(ctx->queue, message);
把消息放如了全局隊列,最後來看看 skynet_mq_push :

void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
    assert(message);
    SPIN_LOCK(q)

    //把msg放到隊列尾,而後 ++ q->taiskynet_globalmq_pushl
    q->queue[q->tail] = *message;
    if (++ q->tail >= q->cap) {
        q->tail = 0;
    }

    if (q->head == q->tail) {
        expand_queue(q);
    }

    //若ctx->queue未放入global_queue,放進去
    if (q->in_global == 0) {
        q->in_global = MQ_IN_GLOBAL;
        skynet_globalmq_push(q);
    }
    
    SPIN_UNLOCK(q)
}

 

值得一提的是,取消息從 ctx->queue 的head開始取,push消息則是從 tail push。因此先投遞的消息會先執行,但因爲協程的緣由,仍是不能保證先投遞的消息先執行完。

相關文章
相關標籤/搜索