ngx_lua中的協程調度(三)

經過lua-nginx-module的ngx.socket能夠方便的創建與其餘服務器的鏈接和數據傳輸,這些也是lua-resty-redis,lua-resty-mysql等衆多請求第三方服務的模塊的基礎。這裏只介紹ngx.socket.tcp,udp的實現相似。mysql

經過lua_resume返回值

在Lua中經過下面的方式使用ngx.socket APInginx

local sock = ngx.socket.tcp()
local ok, err = sock:connect("google.com", 80)
if not ok then
    return
end

local ok, err = sock:send("GET / HTTP/1.0\r\nHost:google.com\r\n\r\n")
if not ok then
    return
end

local data, err = sock:receive()
if not data then
    return
end

sock:connect,sock:send,sock:receive這三個均可能不能當即完成,都是經過與ngx.sleep相似的方式,先調用lua_yield將協程掛起,等到事件就緒(或鏈接創建,或收到數據包等)再調用lua_resume繼續運行協程。這樣底層是異步結構,在Lua協程中看一個Lua腳本確實連續的在執行。redis

不一樣的是ngx.sleep沒有任何返回值,socket操做確是須要的,創建鏈接須要知道成功了仍是失敗了,發送接受數據須要知道是否成功,失敗時也須要知道緣由。這些方法的返回值是經過lua_resume實現的。sql

以connect爲例下面這行代碼能夠分紅兩部分,api

  1. sock:connect函數調用
  2. 將函數調用的結果賦值給ok,err兩個局部變量
local ok, err = sock:connect("GET")

第一步調用sock:connect函數的時候發生了協程的掛起(lua_yield),協程繼續運行時是從第二步開始的。那麼給ok,err這兩個局部變量賦於什麼值呢?值從哪裏來呢?很明顯只能經過lua_resume來操做。服務器

下面是lua_resume的聲明,第二個參數是返回值的個數。異步

int lua_resume (lua_State *L, int narg);

在執行lua_resume前將返回值壓到棧上,調用lua_resume的第二個參數控制返回值的個數。socket

connect操做成功時,經過下面的操做tcp

lua_pushboolean(L, 1);
lua_resume(L, 1);

將ok賦值爲true,err賦值爲nil,代表操做成功。函數

失敗時,用下面的方法,將ok賦值爲nil,err賦值爲錯誤信息,這裏使用"connect refused"只是爲了示例。

lua_pushnil(L);
lua_pushliteral(L, "connect refuled");
lua_resume(L, 2);

 

註冊API

lua-nginx-module經過ngx_http_lua_inject_socket_tcp_api將tcpx相關的API註冊到ngx中。這裏其實只是註冊了ngx.socket.tcp(ngx.socket.stream)和ngx.socket.connect。ngx.socket.connect只是一個語法糖,即便去掉也沒有什麼影響。

void
ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
{
    ngx_int_t         rc;

    lua_createtable(L, 0, 4 /* nrec */);    /* ngx.socket */

    lua_pushcfunction(L, ngx_http_lua_socket_tcp);
    lua_pushvalue(L, -1);
    lua_setfield(L, -3, "tcp");
    lua_setfield(L, -2, "stream");

    {
        const char  buf[] = "local sock = ngx.socket.tcp()"
                            " local ok, err = sock:connect(...)"
                            " if ok then return sock else return nil, err end";

        rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "=ngx.socket.connect");
    }

    if (rc != NGX_OK) {
        ngx_log_error(NGX_LOG_CRIT, log, 0,
                      "failed to load Lua code for ngx.socket.connect(): %i",
                      rc);

    } else {
        lua_setfield(L, -2, "connect");
    }

    lua_setfield(L, -2, "socket");

爲何這裏沒有connect, send, receive等方法呢?這個涉及到ngx.socket.tcp方法的實現。

 

ngx.socket.tcp

調用ngx.socket.tcp時,實際執行的是ngx_http_lua_socket_tcp這個函數。這裏只是建立了一個Lua中的table並將其返回,並無真正建立一個socket。

注意這裏的lua_setmetatable(L, -2)。 在建立table的同時給這個table設置了一個元表,將socket須要執行的connect,send,receive等方法放在了元表中。

static int
ngx_http_lua_socket_tcp(lua_State *L)
{
    ngx_http_request_t      *r;
    ngx_http_lua_ctx_t      *ctx;

    if (lua_gettop(L) != 0) {
        return luaL_error(L, "expecting zero arguments, but got %d",
                          lua_gettop(L));
    }

    r = ngx_http_lua_get_req(L);
    if (r == NULL) {
        return luaL_error(L, "no request found");
    }

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return luaL_error(L, "no ctx found");
    }

    ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE
                               | NGX_HTTP_LUA_CONTEXT_ACCESS
                               | NGX_HTTP_LUA_CONTEXT_CONTENT
                               | NGX_HTTP_LUA_CONTEXT_TIMER
                               | NGX_HTTP_LUA_CONTEXT_SSL_CERT
                               | NGX_HTTP_LUA_CONTEXT_SSL_SESS_FETCH);

    lua_createtable(L, 3 /* narr */, 1 /* nrec */);
    lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key);
    lua_rawget(L, LUA_REGISTRYINDEX);
    lua_setmetatable(L, -2);

    dd("top: %d", lua_gettop(L));

    return 1;
}

相似這樣的使用, sock只是一個空的table,調用connect方法時利用了Lua中元表的特性來實現。

local sock = ngx.socket.tcp()
local ok, err = sock:connect("1.1.1.1", 80)

 

非阻塞socket操做的處理

其他connect, send, receive等的實現與以前介紹的sleep等相似。這裏用的是非阻塞的socket,若是操做沒有當即結束,會將當前的協程掛起,等到操做完成後再恢復協程運行。

socket的connect, send,receive等操做都可能致使協程的掛起。以發送數據爲例,函數ngx_http_lua_socket_tcp_send中,若是當前socket緩衝區已滿,此時rc == NGX_AGAIN, 先設置r->write_event_handler回調函數,最後經過lua_yield掛起協程。

/* rc == NGX_AGAIN */

    coctx = ctx->cur_co_ctx;

    ngx_http_lua_cleanup_pending_operation(coctx);
    coctx->cleanup = ngx_http_lua_coctx_cleanup;
    coctx->data = u;

    if (u->raw_downstream) {
        ctx->writing_raw_req_socket = 1;
    }

    if (ctx->entered_content_phase) {
        r->write_event_handler = ngx_http_lua_content_wev_handler;

    } else {
        r->write_event_handler = ngx_http_core_run_phases;
    }

    u->write_co_ctx = coctx;
    u->write_waiting = 1;
    u->write_prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler;

    dd("setting data to %p", u);

    return lua_yield(L, 0);

等到socket的緩衝區空閒,此時socket可寫,會調用r->write_event_handler繼續運行,最終經過ngx_http_lua_socket_tcp_resume_helper繼續協程的運行。

static ngx_int_t
ngx_http_lua_socket_tcp_resume_helper(ngx_http_request_t *r, int socket_op)
{
    int                          nret;
    lua_State                   *vm;
    ngx_int_t                    rc;
    ngx_connection_t            *c;
    ngx_http_lua_ctx_t          *ctx;
    ngx_http_lua_co_ctx_t       *coctx;

    ngx_http_lua_socket_tcp_retval_handler  prepare_retvals;

    ngx_http_lua_socket_tcp_upstream_t      *u;

    ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
    if (ctx == NULL) {
        return NGX_ERROR;
    }

    ctx->resume_handler = ngx_http_lua_wev_handler;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua tcp operation done, resuming lua thread");

    coctx = ctx->cur_co_ctx;

    dd("coctx: %p", coctx);

    u = coctx->data;

    switch (socket_op) {
    case SOCKET_OP_CONNECT:
    case SOCKET_OP_WRITE:
        prepare_retvals = u->write_prepare_retvals;
        break;

    case SOCKET_OP_READ:
        prepare_retvals = u->read_prepare_retvals;
        break;

    default:
        /* impossible to reach here */
        return NGX_ERROR;
    }

    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua tcp socket calling prepare retvals handler %p, "
                   "u:%p", prepare_retvals, u);

    nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);
    if (nret == NGX_AGAIN) {
        return NGX_DONE;
    }

    c = r->connection;
    vm = ngx_http_lua_get_lua_vm(r, ctx);

    rc = ngx_http_lua_run_thread(vm, r, ctx, nret);

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "lua run thread returned %d", rc);

    if (rc == NGX_AGAIN) {
        return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
    }

    if (rc == NGX_DONE) {
        ngx_http_lua_finalize_request(r, NGX_DONE);
        return ngx_http_lua_run_posted_threads(c, vm, r, ctx);
    }

    if (ctx->entered_content_phase) {
        ngx_http_lua_finalize_request(r, rc);
        return NGX_DONE;
    }

    return rc;
}

仍是一樣的套路,ngx_http_lua_get_lua_vm, 而後經過ngx_http_lua_run_thread運行協程,根據返回值作相應的處理。

相關文章
相關標籤/搜索