skynet源碼分析之網絡層—Lua層

本篇主要介紹在Lua服務裏調用skynet網絡層底層接口的流程,Lua層的api主要在lualib/skynet/socket.lua,可參考官方wiki https://github.com/cloudwu/sk...git

經過一個簡單的例子說明Lua服務是如何最終調用到網絡層底層接口的:
github

`local socket = require 「socket」
   local skynet = require "skynet"

   local function loop(fd)
       socket.start(fd)
       while true do
           local data = socket.readline('n')
           print(data, #data)
       end
   end

  skynet.start(function()
      local listen_fd = socket.listen(ip, hort)
      socket.start(listen_fd, function(fd, addr)
          print("connect fd[%d], addr[%s]", fd, addr)
          skynet.fork(loop, fd)
      end)
  end)`

api調用流程概述

在服務啓動時,調用socket.listen監聽。調用流程是:driver.listen(第7行)——>skynet_socket_listen(第17行)——>socket_server_listen(第29行)——>send_request(第47行),最後向發送管道寫數據。Lua接口執行流程是:socket.lua -> lua-socket.c ->skynet_socket.c -> socket_server.capi

注:第34行,do_listen依次調用了unix網絡系統接口socket,bind,listen。網絡

`// lualib/skynet/socket.lua
function socket.listen(host, port, backlog)session

if port == nil then
    host, port = string.match(host, "([^:]+):(.+)$")
    port = tonumber(port)
end
return driver.listen(host, port, backlog)

end數據結構

// lualib-src/lua-socket.c
static int
llisten(lua_State *L) {socket

const char * host = luaL_checkstring(L,1);
int port = luaL_checkinteger(L,2);
int backlog = luaL_optinteger(L,3,BACKLOG);
struct skynet_context * ctx = lua_touserdata(L, lua_upvalueindex(1));
int id = skynet_socket_listen(ctx, host,port,backlog);
if (id < 0) {
    return luaL_error(L, "Listen error");
}

lua_pushinteger(L,id);
return 1;

}tcp

// skynet-src/skynet_socket.c
skynet_socket_listen(struct skynet_context ctx, const char host, int port, int backlog) {函數

uint32_t source = skynet_context_handle(ctx);
return socket_server_listen(SOCKET_SERVER, source, host, port, backlog);

}oop

// skynet-src/socket_server.c
socket_server_listen(struct socket_server ss, uintptr_t opaque, const char addr, int port, int backlog) {

int fd = do_listen(addr, port, backlog);
if (fd < 0) {
    return -1;
}
struct request_package request;
int id = reserve_id(ss);
if (id < 0) {
    close(fd);
    return id;
}
request.u.listen.opaque = opaque;
request.u.listen.id = id;
request.u.listen.fd = fd;
send_request(ss, &request, 'L', sizeof(request.u.listen));
return id;

}`

socket鏈接過程

skynet裏的socket結構有幾種狀態:

#define SOCKET_TYPE_INVALID 0 //可以使用
#define SOCKET_TYPE_RESERVE 1 //已佔用
#define SOCKET_TYPE_PLISTEN 2 //等待監聽(監聽套接字擁有)
#define SOCKET_TYPE_LISTEN 3 //監聽,可接受客戶端的鏈接(監聽套接字才擁有)
#define SOCKET_TYPE_CONNECTING 4 //正在鏈接(connect失敗時狀態,tcp會嘗試從新connect)
#define SOCKET_TYPE_CONNECTED 5 //已鏈接,能夠收發數據
#define SOCKET_TYPE_HALFCLOSE 6
#define SOCKET_TYPE_PACCEPT 7 //等待鏈接(鏈接套接字才擁有)
#define SOCKET_TYPE_BIND 8

當工做線程執行socket.listen後,socket線程從接收管道讀取數據,執行ctrl_cmd,調用listen_socket(第6行),此時該socket狀態是SOCKET_TYPE_PLISTEN(第18行)

`// skynet-src/socket_server.c
static int
ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
    ...
    case 'L':
        return listen_socket(ss,(struct request_listen *)buffer, result);
    ...
}

static int
listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
    int id = request->id;
    int listen_fd = request->fd;
    struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false);
    if (s == NULL) {
        goto _failed;
    }
    s->type = SOCKET_TYPE_PLISTEN;
    return -1;
    ...
}`

接着,Lua服務調用socket.start,最終socket線程執行start_socket,此時socket狀態是SOCKET_TYPE_LISTEN,等待客戶端的鏈接請求。

`// skynet-src/socket_server.c
  static int
  start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
      ...
      if (s->type == SOCKET_TYPE_PACCEPT || s->type == SOCKET_TYPE_PLISTEN) {
          if (sp_add(ss->event_fd, s->fd, s)) {
              force_close(ss, s, &l, result);
              result->data = strerror(errno);
              return SOCKET_ERR;
          }
         s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
         s->opaque = request->opaque;
         result->data = "start";
         return SOCKET_OPEN;
     }
     ...
 }`

當客戶端發起鏈接請求後,epoll事件返回,調用report_accept(第5行)

第14行,調用unix網絡系統接口accept,接受客戶端的請求。因爲客戶端已發起鏈接,因此不會阻塞。

第16行,從socket池中獲取可用的socket id

17-22行,初始化該socket,此時socket狀態是SOCKET_TYPE_PACCEPT

`int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    ...
    case SOCKET_TYPE_LISTEN: {
        int ok = report_accept(ss, s, result);
    ...
}

// return 0 when failed, or -1 when file limit
static int
report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
    union sockaddr_all u;
    socklen_t len = sizeof(u);
    int client_fd = accept(s->fd, &u.s, &len);
    ...
    int id = reserve_id(ss);
    struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
    ns->type = SOCKET_TYPE_PACCEPT;
    result->opaque = s->opaque;
    result->id = s->id;
    result->ud = id;
    result->data = NULL;

    ...
    return 1;
}`

接着,Lua服務再次調用socket.start(id),此時id是鏈接的socket,而不是監聽的socket。此時,socket狀態是SOCKET_TYPE_CONNECTED,鏈接已經創建,能夠收發數據。這就是整個socket鏈接過程。

至於怎麼通知到 Lua服務稍後分析。

`// skynet-src/socket_server.c
 static int
 start_socket(struct socket_server *ss, struct request_start *request, struct socket_message *result) {
     ...
     s->type = (s->type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN;
     ...
 }`

關閉socket,socket.close

發送數據有兩個api,正常發送socket.write, 低優先級發送socket.lwrite。

網絡層如何通知給Lua服務

socket線程在運行過程(socket_server_poll)中,當收到網絡數據會調用forward_message_tcp

第19行,調用unix系統接口讀取socket上的數據

21-24行,採用args-value形式構造result,opaque是Lua服務的地址,id是該socket在池中的索引,ud是實際讀取到的字節數,data是數據

第25行,返回SOCKET_DATA,表示接收到數據。

`// skynet-src/socket_server.c
int
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
    ...
    default:
        if (e->read) {
            int type;
            if (s->protocol == PROTOCOL_TCP) {
                type = forward_message_tcp(ss, s, &l, result);
        ...
    return type
}

static int
forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * resu
lt) {
    int sz = s->p.size;
    char * buffer = MALLOC(sz);
    int n = (int)read(s->fd, buffer, sz);
    ...
    result->opaque = s->opaque;
    result->id = s->id;
    result->ud = n;
    result->data = buffer;
    return SOCKET_DATA;
}`

因爲socket_server_poll返回的是SOCKET_DATA,調用forward_message(第11行),

23-26行,構造即將要發送的消息數據,用到了上面返回的result

28-32行,構造skynet消息結構,由於是在網絡層發送的,不是具體的某個服務,因此source,session字段都設置成0便可

第34行,把消息發送給與socket對應的服務地址。

至此,網絡消息通知給具體的Lua服務。

`// skynet-src/skynet_socket.c
int
skynet_socket_poll() {
    struct socket_server *ss = SOCKET_SERVER;
    assert(ss);
    struct socket_message result;
    int more = 1;
    int type = socket_server_poll(ss, &result, &more);
    switch (type) {
    case SOCKET_DATA:
        forward_message(SKYNET_SOCKET_TYPE_DATA, false, &result);
        break;
        ...
    return 1;
}

// mainloop thread
static void
forward_message(int type, bool padding, struct socket_message * result) {
    struct skynet_socket_message *sm;
    size_t sz = sizeof(*sm);
    ...
    sm = (struct skynet_socket_message *)skynet_malloc(sz);
    sm->type = type;
    sm->id = result->id;
    sm->ud = result->ud;
    ...
    struct skynet_message message;
    message.source = 0;
    message.session = 0;
    message.data = sm;
    message.sz = sz | ((size_t)PTYPE_SOCKET << MESSAGE_TYPE_SHIFT);

    if (skynet_context_push((uint32_t)result->opaque, &message)) {
        // todo: report somewhere to close socket
        // don't call skynet_socket_close here (It will block mainloop)
        skynet_free(sm->buffer);
        skynet_free(sm);
    }
}`

Lua服務處理流程

當網絡數據到達Lua服務時,lualib/skynet/socket.lua中提供了相應的處理方案。調用消息分發函數socket_message,網絡數據類型包含正常數據傳輸(DATA),鏈接(CONNECT),關閉(CLOSE),錯誤(ERROR)等。

第15行,把客戶端發過來的數據push到該socket的緩衝池中。

`-- lualib/skynet/socket.lua
 skynet.register_protocol {
     name = "socket",
     id = skynet.PTYPE_SOCKET,       -- PTYPE_SOCKET = 6
     unpack = driver.unpack,
     dispatch = function (_, _, t, ...)
         socket_message[t](...)
     end
 }

 -- SKYNET_SOCKET_TYPE_DATA = 1
 socket_message[1] = function(id, size, data)
     local s = socket_pool[id]
     ...
     local sz = driver.push(s.buffer, buffer_pool, data, size)
     ...
 }`

socket.read(id, sz),從一個socket上讀sz指定的字節數,若是緩衝池裏有足夠多的數據,從緩衝池裏pop出直接返回(第5行),不然,暫停當前協程(第15行),當數據夠或者鏈接斷開時重啓協程。

`-- lualib/skynet/socket.lua
 function socket.read(id, sz)
     local s = socket_pool[id]
     assert(s)
     ...
     local ret = driver.pop(s.buffer, buffer_pool, sz)
     if ret then
         return ret
     end
     if not s.connected then
         return false, driver.readall(s.buffer, buffer_pool)
     end

     assert(not s.read_required)
     s.read_required = sz
     suspend(s)
     ret = driver.pop(s.buffer, buffer_pool, sz)
     if ret then
         return ret
     else
         return false, driver.readall(s.buffer, buffer_pool)
     end
 end`

socket.readline(id, sep),從一個socket上讀以sep分割的數據,默認是"n",即讀一行數據。注:該api能夠指定分隔符,不僅僅是一行數據。

socket.abandon(id),清除socket id在本服務內的數據結構,但不併關閉這個socket,用於把id轉給其餘服務控制。一般,會設計一個master服務接收外部鏈接,等鏈接上後再將socket分配給一個slave服務控制,減小master服務的壓力。

總結

socket庫的使用流程通常是:

-- master服務
local listen_fd = socket.listen(ip, port)  //監聽一個地址
socket.start(listen_fd, function(fd, addr)
     slave.post.start(fd)  //客戶端鏈接上,轉交給slave
     socket.abandon(fd)
end)

-- slave服務
function accept.start(fd)
      socket.start(fd) //接管socket
       ...
end

最後,小編推薦本身的Linux、C/C++技術交流羣:【960994558】整理了一些我的以爲比較好的學習書籍、視頻資料共享在裏面(包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,TCP/IP,協程,DPDK等等.),有須要的能夠自行添加哦!~

相關文章
相關標籤/搜索