skynet源碼分析之skynet_server

skynet是以服務爲主體進行運做的,服務稱做爲skynet_context(簡稱ctx),是一個c結構,是skynet裏最重要的結構,整個skynet的運做都是圍繞ctx進行的。skynet_server提供的api主要分兩大類:html

1.對ctx的一系列操做,好比建立,刪除ctx等api

2.如何發送消息和處理自身的消息session

1.對ctx的一系列操做

ctx的結構以下,建立一個服務時,會構建一個skynet上下文skynet_context,而後把該ctx存放到handle_storage(skynet_handle.c)裏。ctx有一個引用計數(ctx->ref)控制其生命週期,當引用計數爲0,刪除ctx,釋放內存。數據結構

struct skynet_context { //一個skynet服務ctx的結構
        void * instance; //每一個ctx本身的數據塊,不一樣類型ctx有不一樣數據結構,相同類型ctx數據結構相同,但具體的數據不同,由指定module的create函數返回
        struct skynet_module * mod; //保存module的指針,方便以後調用create,init,signal,release
        void * cb_ud; //給callback函數調用第二個參數,能夠是NULL
        skynet_cb cb; //消息回調函數指針,一般在module的init裏設置
        struct message_queue *queue; //ctx本身的消息隊列指針
        FILE * logfile; //日誌句柄
        uint64_t cpu_cost;      // in microsec
        uint64_t cpu_start;     // in microsec
        char result[32]; //保存skynet_command操做後的結果
        uint32_t handle; //標識惟一的ctx id
        int session_id; //本方發出請求會設置一個對應的session,當收到對方消息返回時,經過session匹配是哪個請求的返回
        int ref; //引用計數,當爲0,能夠刪除ctx
        int message_count; //累計收到的消息數量
        bool init; //標記是否完成初始化
        bool endless; //標記消息是否堵住
        bool profile; //標記是否須要開啓性能監測
 CHECKCALLING_DECL // 自旋鎖
};

爲了統一對ctx操做的接口,採用指令的格式,定義了一系列指令(cmd_xxx),cmd_launch建立一個新服務,cmd_exit服務自身退出,cmd_kill殺掉一個服務等,上層統一調用skynet_command接口便可執行這些操做。對ctx操做,一般會先調用skynet_context_grab將引用計數+1,操做完調用skynet_context_release將引用計數-1,以保證操做ctx過程當中,不會被其餘線程釋放掉。下面介紹幾個常見的操做:less

struct command_func { //skynet指令結構
        const char *name; const char * (*func)(struct skynet_context * context, const char * param); }; static struct command_func cmd_funcs[] = { //skynet可接收的一系列指令
        { "TIMEOUT", cmd_timeout }, { "REG", cmd_reg }, { "QUERY", cmd_query }, { "NAME", cmd_name }, { "EXIT", cmd_exit }, { "KILL", cmd_kill }, { "LAUNCH", cmd_launch }, { "GETENV", cmd_getenv }, { "SETENV", cmd_setenv }, { "STARTTIME", cmd_starttime }, { "ABORT", cmd_abort }, { "MONITOR", cmd_monitor }, { "STAT", cmd_stat }, { "LOGON", cmd_logon }, { "LOGOFF", cmd_logoff }, { "SIGNAL", cmd_signal }, { NULL, NULL }, };

(1). cmd_launch,啓動一個新服務,最終會經過skynet_context_new建立一個ctx,初始化ctx中各個數據。函數

struct skynet_context * skynet_context_new(const char * name, const char *param) { //啓動一個新服務ctx
        struct skynet_module * mod = skynet_module_query(name); //從skynet_module獲取對應的模板
 ... void *inst = skynet_module_instance_create(mod); //ctx獨有的數據塊,最終會調用c服務裏的xxx_create
 ... ctx->mod = mod; ctx->instance = inst; ctx->ref = 2; //初始化完成會調用skynet_context_release將引用計數-1,ref變成1而不會被釋放掉
 ... // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
        ctx->handle = 0; ctx->handle = skynet_handle_register(ctx); //從skynet_handle得到惟一的標識id
        struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); //初始化次級消息隊列
 ... CHECKCALLING_BEGIN(ctx) int r = skynet_module_instance_init(mod, inst, ctx, param);//初始化ctx獨有的數據塊
 CHECKCALLING_END(ctx) } 

經過指定名稱查找對應的module,並獲取不一樣c服務獨有的數據塊(struct snlua, struct logger,  struct gate等),賦值給ctx->mod,ctx->instance。ctx->ref=2是爲了保證建立ctx期間不會被其餘線程釋放掉,建立完成後會調用skynet_context_release將引用計數-1。將ctx保存在handle_storage,並得到一個惟一的標識id(ctx->handle = skynet_handle_register(ctx)),每一個ctx都有一個惟一的id對應起來。建立ctx的次級消息隊列(skynet_mq_create)。最後再調用skynet_module_instance_init初始化獨有的數據塊,放到skynet_mq_create以後是由於init過程當中可能會發送消息。至此就是一個ctx的整個建立流程。性能

(2). cmd_exit,服務主動退出;cmd_kill殺掉某個服務(被動),都會調用到handle_exit,而後調用到skynet_handle_retire,回收ctx->handle,供以後建立新的ctx使用,並將引用計數-1,若是ctx沒有在其餘地方引用,ctx->ref此時是0,因此能夠刪掉,delete_context主要是作一些清理回收工做。若是其餘地方有引用,在下一次skynet_context_release時刪掉。注:這時還不能釋放ctx->queue,只能作個標記(skyne_module_instance_release),緣由參考 http://www.cnblogs.com/RainRill/p/8252480.htmlui

struct skynet_context * skynet_context_release(struct skynet_context *ctx) { //ctx引用計數-1
        if (ATOM_DEC(&ctx->ref) == 0) { delete_context(ctx); return NULL; } return ctx; } static void delete_context(struct skynet_context *ctx) {//刪除ctx,釋放內存
        if (ctx->logfile) { fclose(ctx->logfile); } skynet_module_instance_release(ctx->mod, ctx->instance); skynet_mq_mark_release(ctx->queue); CHECKCALLING_DESTROY(ctx) skynet_free(ctx); context_dec(); }

(3). cmd_reg,給自身起一個名字(支持多個),cmd_name給指定ctx起一個名字,即將ctx->handle綁定一個名稱(skynet_handle_namehandle)lua

(4). cmd_query,經過名字查找對應的handle,發送消息前先要找到對應的ctx,才能給ctx發送消息spa

(5). cmd_stat,查看ctx的內部狀態信息,好比查看當前的消息隊列長度,查看累計消耗CPU時間,查看消息是否阻塞等

(6). cmd_setenv,cmd_getenv,設置/獲取skynet環境變量,是key-value結構,全部ctx共享的

(7). cmd_signal,在skynet控制檯,能夠給指定的ctx發信號以完成相應的命令

2. 如理髮送消息和處理消息

ctx之間經過消息進行通訊,調用skynet_send向對方發送消息(skynet_sendname最終也會調用skynet_send)。

int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { if ((sz & MESSAGE_TYPE_MASK) != sz) { skynet_error(context, "The message to %x is too large", destination); if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -1; } _filter_args(context, type, &session, (void **)&data, &sz); //預處理消息數據塊

        if (source == 0) { source = context->handle; } if (destination == 0) { return session; } if (skynet_harbor_message_isremote(destination)) { //不是同一skynet節點裏服務,交給harbor
                struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); rmsg->destination.handle = destination; rmsg->message = data; rmsg->sz = sz; skynet_harbor_send(rmsg, source, session); } else { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = data; smsg.sz = sz; if (skynet_context_push(destination, &smsg)) { //push給目的ctx
 skynet_free(data); return -1; } } return session; }

接口參數:

struct skynet_context *context: 源服務的ctx,能夠爲NULL,drop_message時這個參數爲NULL

uint32_t  source:源服務的地址,一般設置爲0便可,api裏會設置成ctx->handle,當context爲NULL時,需指定source

uint32_t destination:目的服務地址

int type:消息類型, skynet定義了多種消息,PTYPE_TEXT,PTYPE_CLIENT,PTYPE_RESPONSE等(詳情見skynet.h)

int session:若是在type裏設上allocsession的tag(PTYPE_TAG_ALLOCSESSION),api會忽略掉傳入的session參數,從新生成一個新的惟一的

void *data, size_t sz:消息包和長度

返回: int session:源服務保存這個session,同時約定,目的服務處理完這個消息後,把這個session原樣發送回來(skynet_message結構裏帶有一個session字段),源服務就知道是哪一個請求的返回,從而正確調用對應的回調函數。

首先檢查sz是否合法,而後在filter_args裏進行預處理,若是type設上PTYPE_TAG_ALLOCSESSION,則須要生成一個惟一的session(skynet_context_newsession)。若是type沒有設置PTYPE_TAG_DONTCOPY,則須要拷貝一份消息包。把type打包在sz的高8位*sz |= (size_t)type<<MESSAGE_TYPE_SHIFT,由於 消息包的長度限制在24位(16M),是個合理的限制。判斷目的地址和源地址是否在一個skynet節點內,若在,則構建一個skynet_message結構,push給目的服務(skynet_context_push);若不在,則交給harbor,讓harbor去轉發。

 

skynet能夠啓動多個工做線程,調用skynet_context_message_dispatch這個api不斷的分發消息,有三個參數:skynet_monitor監測消息是否堵住,message_queue(mq)須要分發的次級消息隊列,weight權重(以後會詳細說明),api返回下一個要分發的mq。

struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {//工做線程分發消息,處理完返回全局隊列的下一條次級消息隊列,供下一幀調用
        if (q == NULL) { q = skynet_globalmq_pop(); //從全局消息隊列裏pop一條次級消息隊列
                if (q==NULL) return NULL; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); //獲取次級消息隊列的ctx
        if (ctx == NULL) { //ctx不存在了
                struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); //釋放次級消息隊列
                return skynet_globalmq_pop(); } int i,n=1; struct skynet_message msg; for (i=0;i<n;i++) { if (skynet_mq_pop(q,&msg)) { //若是ctx的次級消息隊列爲空,返回
 skynet_context_release(ctx); return skynet_globalmq_pop(); } else if (i==0 && weight >= 0) { //weight:-1只處理一條消息,0處理完q中全部消息,>0處理長度右移weight位(1/(2*weight))條消息
                        n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d", overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); //處理消息
 } skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { // If global mq is not empty , push q back, and return next queue (nq) // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
 skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; } 

若是傳入的mq爲空,則從全局消息隊列裏pop一個(skynet_globalmq_pop)。查找mq對應的ctx,若ctx不存在,須要刪除該mq(skynet_mq_release)。接下來就是分發mq中的消息包,一幀分發多少條消息取決於weight參數(稍後討論)。調用dispatch_message分發消息包,最終調用ctx->cb消息回調函數,在消息回調函數處理消息包,返回0表示回調成功,至此完成一個消息包的處理流程。最後把mq push回全局消息隊列,供下次工做線程調度(前提是mq不爲空,且global mq也不爲空)。

static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { //處理一條ctx消息,即調用ctx的消息回調函數
 ... reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);//消息回調函數
 ... }

關於weight參數,在啓動工做線程時,會爲每一個線程設置一個weight,即線程0-3是-1,4-7是0,8-15是1,16-23是2,24-31是3。-1表示每幀只處理mq中一個消息包,0表示每幀處理mq中全部消息包,1表示每幀處理mq長度的1/2條(>>1右移一位)消息,2表示每幀處理mq長度的1/4(右移2位),3表示每幀處理mq長度的1/8(右移3位)。

//skynet_start.c
static
int weight[] = { -1, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, };
相關文章
相關標籤/搜索