經過lua-nginx-module的ngx.socket能夠方便的創建與其餘服務器的鏈接和數據傳輸,這些也是lua-resty-redis,lua-resty-mysql等衆多請求第三方服務的模塊的基礎。這裏只介紹ngx.socket.tcp,udp的實現相似。mysql
在Lua中經過下面的方式使用ngx.socket APInginx
local sock = ngx.socket.tcp() local ok, err = sock:connect("google.com", 80) if not ok then return end local ok, err = sock:send("GET / HTTP/1.0\r\nHost:google.com\r\n\r\n") if not ok then return end local data, err = sock:receive() if not data then return end
sock:connect,sock:send,sock:receive這三個均可能不能當即完成,都是經過與ngx.sleep相似的方式,先調用lua_yield將協程掛起,等到事件就緒(或鏈接創建,或收到數據包等)再調用lua_resume繼續運行協程。這樣底層是異步結構,在Lua協程中看一個Lua腳本確實連續的在執行。redis
不一樣的是ngx.sleep沒有任何返回值,socket操做確是須要的,創建鏈接須要知道成功了仍是失敗了,發送接受數據須要知道是否成功,失敗時也須要知道緣由。這些方法的返回值是經過lua_resume實現的。sql
以connect爲例下面這行代碼能夠分紅兩部分,api
local ok, err = sock:connect("GET")
第一步調用sock:connect函數的時候發生了協程的掛起(lua_yield),協程繼續運行時是從第二步開始的。那麼給ok,err這兩個局部變量賦於什麼值呢?值從哪裏來呢?很明顯只能經過lua_resume來操做。服務器
下面是lua_resume的聲明,第二個參數是返回值的個數。異步
int lua_resume (lua_State *L, int narg);
在執行lua_resume前將返回值壓到棧上,調用lua_resume的第二個參數控制返回值的個數。socket
connect操做成功時,經過下面的操做tcp
lua_pushboolean(L, 1); lua_resume(L, 1);
將ok賦值爲true,err賦值爲nil,代表操做成功。函數
失敗時,用下面的方法,將ok賦值爲nil,err賦值爲錯誤信息,這裏使用"connect refused"只是爲了示例。
lua_pushnil(L); lua_pushliteral(L, "connect refuled"); lua_resume(L, 2);
lua-nginx-module經過ngx_http_lua_inject_socket_tcp_api將tcpx相關的API註冊到ngx中。這裏其實只是註冊了ngx.socket.tcp(ngx.socket.stream)和ngx.socket.connect。ngx.socket.connect只是一個語法糖,即便去掉也沒有什麼影響。
void ngx_http_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L) { ngx_int_t rc; lua_createtable(L, 0, 4 /* nrec */); /* ngx.socket */ lua_pushcfunction(L, ngx_http_lua_socket_tcp); lua_pushvalue(L, -1); lua_setfield(L, -3, "tcp"); lua_setfield(L, -2, "stream"); { const char buf[] = "local sock = ngx.socket.tcp()" " local ok, err = sock:connect(...)" " if ok then return sock else return nil, err end"; rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "=ngx.socket.connect"); } if (rc != NGX_OK) { ngx_log_error(NGX_LOG_CRIT, log, 0, "failed to load Lua code for ngx.socket.connect(): %i", rc); } else { lua_setfield(L, -2, "connect"); } lua_setfield(L, -2, "socket");
爲何這裏沒有connect, send, receive等方法呢?這個涉及到ngx.socket.tcp方法的實現。
調用ngx.socket.tcp時,實際執行的是ngx_http_lua_socket_tcp這個函數。這裏只是建立了一個Lua中的table並將其返回,並無真正建立一個socket。
注意這裏的lua_setmetatable(L, -2)。 在建立table的同時給這個table設置了一個元表,將socket須要執行的connect,send,receive等方法放在了元表中。
static int ngx_http_lua_socket_tcp(lua_State *L) { ngx_http_request_t *r; ngx_http_lua_ctx_t *ctx; if (lua_gettop(L) != 0) { return luaL_error(L, "expecting zero arguments, but got %d", lua_gettop(L)); } r = ngx_http_lua_get_req(L); if (r == NULL) { return luaL_error(L, "no request found"); } ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return luaL_error(L, "no ctx found"); } ngx_http_lua_check_context(L, ctx, NGX_HTTP_LUA_CONTEXT_REWRITE | NGX_HTTP_LUA_CONTEXT_ACCESS | NGX_HTTP_LUA_CONTEXT_CONTENT | NGX_HTTP_LUA_CONTEXT_TIMER | NGX_HTTP_LUA_CONTEXT_SSL_CERT | NGX_HTTP_LUA_CONTEXT_SSL_SESS_FETCH); lua_createtable(L, 3 /* narr */, 1 /* nrec */); lua_pushlightuserdata(L, &ngx_http_lua_tcp_socket_metatable_key); lua_rawget(L, LUA_REGISTRYINDEX); lua_setmetatable(L, -2); dd("top: %d", lua_gettop(L)); return 1; }
相似這樣的使用, sock只是一個空的table,調用connect方法時利用了Lua中元表的特性來實現。
local sock = ngx.socket.tcp() local ok, err = sock:connect("1.1.1.1", 80)
其他connect, send, receive等的實現與以前介紹的sleep等相似。這裏用的是非阻塞的socket,若是操做沒有當即結束,會將當前的協程掛起,等到操做完成後再恢復協程運行。
socket的connect, send,receive等操做都可能致使協程的掛起。以發送數據爲例,函數ngx_http_lua_socket_tcp_send中,若是當前socket緩衝區已滿,此時rc == NGX_AGAIN, 先設置r->write_event_handler回調函數,最後經過lua_yield掛起協程。
/* rc == NGX_AGAIN */ coctx = ctx->cur_co_ctx; ngx_http_lua_cleanup_pending_operation(coctx); coctx->cleanup = ngx_http_lua_coctx_cleanup; coctx->data = u; if (u->raw_downstream) { ctx->writing_raw_req_socket = 1; } if (ctx->entered_content_phase) { r->write_event_handler = ngx_http_lua_content_wev_handler; } else { r->write_event_handler = ngx_http_core_run_phases; } u->write_co_ctx = coctx; u->write_waiting = 1; u->write_prepare_retvals = ngx_http_lua_socket_tcp_send_retval_handler; dd("setting data to %p", u); return lua_yield(L, 0);
等到socket的緩衝區空閒,此時socket可寫,會調用r->write_event_handler繼續運行,最終經過ngx_http_lua_socket_tcp_resume_helper繼續協程的運行。
static ngx_int_t ngx_http_lua_socket_tcp_resume_helper(ngx_http_request_t *r, int socket_op) { int nret; lua_State *vm; ngx_int_t rc; ngx_connection_t *c; ngx_http_lua_ctx_t *ctx; ngx_http_lua_co_ctx_t *coctx; ngx_http_lua_socket_tcp_retval_handler prepare_retvals; ngx_http_lua_socket_tcp_upstream_t *u; ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module); if (ctx == NULL) { return NGX_ERROR; } ctx->resume_handler = ngx_http_lua_wev_handler; ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp operation done, resuming lua thread"); coctx = ctx->cur_co_ctx; dd("coctx: %p", coctx); u = coctx->data; switch (socket_op) { case SOCKET_OP_CONNECT: case SOCKET_OP_WRITE: prepare_retvals = u->write_prepare_retvals; break; case SOCKET_OP_READ: prepare_retvals = u->read_prepare_retvals; break; default: /* impossible to reach here */ return NGX_ERROR; } ngx_log_debug2(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua tcp socket calling prepare retvals handler %p, " "u:%p", prepare_retvals, u); nret = prepare_retvals(r, u, ctx->cur_co_ctx->co); if (nret == NGX_AGAIN) { return NGX_DONE; } c = r->connection; vm = ngx_http_lua_get_lua_vm(r, ctx); rc = ngx_http_lua_run_thread(vm, r, ctx, nret); ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0, "lua run thread returned %d", rc); if (rc == NGX_AGAIN) { return ngx_http_lua_run_posted_threads(c, vm, r, ctx); } if (rc == NGX_DONE) { ngx_http_lua_finalize_request(r, NGX_DONE); return ngx_http_lua_run_posted_threads(c, vm, r, ctx); } if (ctx->entered_content_phase) { ngx_http_lua_finalize_request(r, rc); return NGX_DONE; } return rc; }
仍是一樣的套路,ngx_http_lua_get_lua_vm, 而後經過ngx_http_lua_run_thread運行協程,根據返回值作相應的處理。