本篇主要介紹在Lua服務裏調用skynet網絡層底層接口的流程,Lua層的api主要在lualib/skynet/socket.lua,可參考官方wiki https://github.com/cloudwu/sk...。git
經過一個簡單的例子說明Lua服務是如何最終調用到網絡層底層接口的:
:github
`local socket = require 「socket」 local skynet = require "skynet" local function loop(fd) socket.start(fd) while true do local data = socket.readline('n') print(data, #data) end end skynet.start(function() local listen_fd = socket.listen(ip, hort) socket.start(listen_fd, function(fd, addr) print("connect fd[%d], addr[%s]", fd, addr) skynet.fork(loop, fd) end) end)`
在服務啓動時,調用socket.listen監聽。調用流程是:driver.listen(第7行)——>skynet_socket_listen(第17行)——>socket_server_listen(第29行)——>send_request(第47行),最後向發送管道寫數據。Lua接口執行流程是:socket.lua -> lua-socket.c ->skynet_socket.c -> socket_server.capi
注:第34行,do_listen依次調用了unix網絡系統接口socket,bind,listen。網絡
`// lualib/skynet/socket.lua
function socket.listen(host, port, backlog)session
if port == nil then host, port = string.match(host, "([^:]+):(.+)$") port = tonumber(port) end return driver.listen(host, port, backlog)
end數據結構
// lualib-src/lua-socket.c
static int
llisten(lua_State *L) {socket
const char * host = luaL_checkstring(L,1); int port = luaL_checkinteger(L,2); int backlog = luaL_optinteger(L,3,BACKLOG); struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1)); int id = skynet_socket_listen(ctx, host,port,backlog); if (id < 0) { return luaL_error(L, "Listen error"); } lua_pushinteger(L,id); return 1;
}tcp
// skynet-src/skynet_socket.c
skynet_socket_listen(struct skynet_context ctx, const char host, int port, int backlog) {函數
uint32_t source = skynet_context_handle(ctx); return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);
}oop
// skynet-src/socket_server.c
socket_server_listen(struct socket_server ss, uintptr_t opaque, const char addr, int port, int backlog) {
int fd = do_listen(addr, port, backlog); if (fd < 0) { return -1; } struct request_package request; int id = reserve_id(ss); if (id < 0) { close(fd); return id; } request.u.listen.opaque = opaque; request.u.listen.id = id; request.u.listen.fd = fd; send_request(ss, &request, 'L', sizeof(request.u.listen)); return id;
}`
skynet裏的socket結構有幾種狀態:
#define SOCKET_TYPE_INVALID 0 //可以使用 #define SOCKET_TYPE_RESERVE 1 //已佔用 #define SOCKET_TYPE_PLISTEN 2 //等待監聽(監聽套接字擁有) #define SOCKET_TYPE_LISTEN 3 //監聽,可接受客戶端的鏈接(監聽套接字才擁有) #define SOCKET_TYPE_CONNECTING 4 //正在鏈接(connect失敗時狀態,tcp會嘗試從新connect) #define SOCKET_TYPE_CONNECTED 5 //已鏈接,能夠收發數據 #define SOCKET_TYPE_HALFCLOSE 6 #define SOCKET_TYPE_PACCEPT 7 //等待鏈接(鏈接套接字才擁有) #define SOCKET_TYPE_BIND 8
當工做線程執行socket.listen後,socket線程從接收管道讀取數據,執行ctrl_cmd,調用listen_socket(第6行),此時該socket狀態是SOCKET_TYPE_PLISTEN(第18行)
`// skynet-src/socket_server.c static int ctrl_cmd(struct socket_server *ss, struct socket_message *result) { ... case 'L': return listen_socket(ss,(struct request_listen *)buffer, result); ... } static int listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) { int id = request->id; int listen_fd = request->fd; struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false); if (s == NULL) { goto _failed; } s->type = SOCKET_TYPE_PLISTEN; return -1; ... }`
接着,Lua服務調用socket.start,最終socket線程執行start_socket,此時socket狀態是SOCKET_TYPE_LISTEN,等待客戶端的鏈接請求。
`// skynet-src/socket_server.c static int start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { ... if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) { if (sp_add(ss->event_fd, s->fd, s)) { force_close(ss, s, &l, result); result->data = strerror(errno); return SOCKET_ERR; } s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; s->opaque = request->opaque; result->data = "start"; return SOCKET_OPEN; } ... }`
當客戶端發起鏈接請求後,epoll事件返回,調用report_accept(第5行)
第14行,調用unix網絡系統接口accept,接受客戶端的請求。因爲客戶端已發起鏈接,因此不會阻塞。
第16行,從socket池中獲取可用的socket id
17-22行,初始化該socket,此時socket狀態是SOCKET_TYPE_PACCEPT
`int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { ... case SOCKET_TYPE_LISTEN: { int ok = report_accept(ss, s, result); ... } // return 0 when failed, or -1 when file limit static int report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) { union sockaddr_all u; socklen_t len = sizeof(u); int client_fd = accept(s->fd, &u.s, &len); ... int id = reserve_id(ss); struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false); ns->type = SOCKET_TYPE_PACCEPT; result->opaque = s->opaque; result->id = s->id; result->ud = id; result->data = NULL; ... return 1; }`
接着,Lua服務再次調用socket.start(id),此時id是鏈接的socket,而不是監聽的socket。此時,socket狀態是SOCKET_TYPE_CONNECTED,鏈接已經創建,能夠收發數據。這就是整個socket鏈接過程。
至於怎麼通知到 Lua服務稍後分析。
`// skynet-src/socket_server.c static int start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) { ... s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN; ... }`
關閉socket,socket.close
發送數據有兩個api,正常發送socket.write, 低優先級發送socket.lwrite。
socket線程在運行過程(socket_server_poll)中,當收到網絡數據會調用forward_message_tcp
第19行,調用unix系統接口讀取socket上的數據
21-24行,採用args-value形式構造result,opaque是Lua服務的地址,id是該socket在池中的索引,ud是實際讀取到的字節數,data是數據
第25行,返回SOCKET_DATA,表示接收到數據。
`// skynet-src/socket_server.c int socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) { ... default: if (e->read) { int type; if (s->protocol == PROTOCOL_TCP) { type = forward_message_tcp(ss, s, &l, result); ... return type } static int forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * resu lt) { int sz = s->p.size; char * buffer = MALLOC(sz); int n = (int)read(s->fd, buffer, sz); ... result->opaque = s->opaque; result->id = s->id; result->ud = n; result->data = buffer; return SOCKET_DATA; }`
因爲socket_server_poll返回的是SOCKET_DATA,調用forward_message(第11行),
23-26行,構造即將要發送的消息數據,用到了上面返回的result
28-32行,構造skynet消息結構,由於是在網絡層發送的,不是具體的某個服務,因此source,session字段都設置成0便可
第34行,把消息發送給與socket對應的服務地址。
至此,網絡消息通知給具體的Lua服務。
`// skynet-src/skynet_socket.c int skynet_socket_poll() { struct socket_server *ss = SOCKET_SERVER; assert(ss); struct socket_message result; int more = 1; int type = socket_server_poll(ss, &result, &more); switch (type) { case SOCKET_DATA: forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result); break; ... return 1; } // mainloop thread static void forward_message(int type, bool padding, struct socket_message * result) { struct skynet_socket_message *sm; size_t sz = sizeof(*sm); ... sm = (struct skynet_socket_message *)skynet_malloc(sz); sm->type = type; sm->id = result->id; sm->ud = result->ud; ... struct skynet_message message; message.source = 0; message.session = 0; message.data = sm; message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT); if (skynet_context_push((uint32_t)result->opaque, &message)) { // todo: report somewhere to close socket // don't call skynet_socket_close here (It will block mainloop) skynet_free(sm->buffer); skynet_free(sm); } }`
當網絡數據到達Lua服務時,lualib/skynet/socket.lua中提供了相應的處理方案。調用消息分發函數socket_message,網絡數據類型包含正常數據傳輸(DATA),鏈接(CONNECT),關閉(CLOSE),錯誤(ERROR)等。
第15行,把客戶端發過來的數據push到該socket的緩衝池中。
`-- lualib/skynet/socket.lua skynet.register_protocol { name = "socket", id = skynet.PTYPE_SOCKET, -- PTYPE_SOCKET = 6 unpack = driver.unpack, dispatch = function (_, _, t, ...) socket_message[t](...) end } -- SKYNET_SOCKET_TYPE_DATA = 1 socket_message[1] = function(id, size, data) local s = socket_pool[id] ... local sz = driver.push(s.buffer, buffer_pool, data, size) ... }`
socket.read(id, sz),從一個socket上讀sz指定的字節數,若是緩衝池裏有足夠多的數據,從緩衝池裏pop出直接返回(第5行),不然,暫停當前協程(第15行),當數據夠或者鏈接斷開時重啓協程。
`-- lualib/skynet/socket.lua function socket.read(id, sz) local s = socket_pool[id] assert(s) ... local ret = driver.pop(s.buffer, buffer_pool, sz) if ret then return ret end if not s.connected then return false, driver.readall(s.buffer, buffer_pool) end assert(not s.read_required) s.read_required = sz suspend(s) ret = driver.pop(s.buffer, buffer_pool, sz) if ret then return ret else return false, driver.readall(s.buffer, buffer_pool) end end`
socket.readline(id, sep),從一個socket上讀以sep分割的數據,默認是"n",即讀一行數據。注:該api能夠指定分隔符,不僅僅是一行數據。
socket.abandon(id),清除socket id在本服務內的數據結構,但不併關閉這個socket,用於把id轉給其餘服務控制。一般,會設計一個master服務接收外部鏈接,等鏈接上後再將socket分配給一個slave服務控制,減小master服務的壓力。
socket庫的使用流程通常是:
-- master服務 local listen_fd = socket.listen(ip, port) //監聽一個地址 socket.start(listen_fd, function(fd, addr) slave.post.start(fd) //客戶端鏈接上,轉交給slave socket.abandon(fd) end) -- slave服務 function accept.start(fd) socket.start(fd) //接管socket ... end
最後,小編推薦本身的Linux、C/C++技術交流羣:【960994558】整理了一些我的以爲比較好的學習書籍、視頻資料共享在裏面(包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK等等.),有須要的能夠自行添加哦!~