glusterfs通訊之rpc

在glusterfs中,gluster與glusterd通訊請求對卷的操做、集羣的操做、狀態的查看等;glusterd與glusterfsd通訊完成對卷的操做,集羣的操做,狀態的查看;glusterfs與glusterfsd通訊完成文件的存儲。全部這些通訊都是經過內部的RPC模塊來完成的。 緩存

有關RPC的相關概念、協議等這裏不展開描述,有興趣的能夠看看這兩篇文章(1, 2)。 服務器

========================================= 數據結構

從代碼的組織來看,RPC的服務端邏輯上可分爲四層,server-app、rpc-server、rpc-transport、protocol,每一層都提供相應的接口供上一層調用,同時,上一層會提供回調函數供下一層來調用;一樣,RPC的客戶端邏輯上也可分爲四層,cli-app、rpc-cli、rpc-transport、protocol。目前,protocol提供了tcp(socket)和rdma兩種方式,且都以動態庫的方式提供,rpc-transport會根據配置加載不一樣的動態庫。咱們以gluster與glusterd的通訊並選用tcp的方式爲例來看看RPC相關流程。 app

1. 服務端的初始化 socket

關鍵流程如圖所示: tcp

須要注意的是:rpc_transport_load時會根據協議的類型加載(使用dlopen)不一樣的動態庫,調用socket_listen時將fd與回調函數添加事件驅動器中。當有讀寫事件時,事件驅動器回調socket_server_event_handler函數(用於服務端的accept)或者socket_event_handler函數(用於通常的請求),而後依次回調rpc_transport_notify、rpcsvc_notify處理RPC請求。 函數

2. 客戶端的初始化 學習

關鍵流程: this

socket_connect函數會將fd以及回調處理函數註冊到事件驅動器中。 spa

3. 一次完整的RPC流程

(1) 客戶端發送RPC請求

關鍵的流程與數據結構:

客戶端經過調用rpc_clnt_submit函數發送RPC請求,該函數又會一層層調用,最終在socket_submit_request中經過writev將請求發送出去。在調用rpc_clnt_submit時會準備好RPC所須要的相關數據,例如程序號,程序版本號,過程號,參數信息等等,而後逐層按照接口組織好相關的數據。

例如: 執行 gluster volume info命令,其內部關鍵代碼:

int32_t gf_clie_1_get_volume(call_frame_t * frame, xlator_t * this)
{
    ...
    ret = cli_cmd_submit(&req,
                         frame,
                         cli_rpc_prog, //包含程序名,程序號,程序版本號等信息
                         GLUSTER_CLI_GET_VOLUME, //過程號
                         NULL,
                         this,
                         gf_cli3_1_get_volume_cbk, //結果處理回調函數
                         (xdrproc_t)xdr_gf_cli_req);
    ..
}

int cli_cmd_submit(void *req, call_frame_t * frame,
                   rpc_clnt_prog_t * prog, int procnum,
                   struct iobref * iobref,
                   xlator_t * this, fop_cbk_fn_t cbkfn,
                   xdrproc_t xdrproc)
{
    ...
    ret = cli_submit_request(req, frame, prog, procnum, NULL, this,
                             cbkfn, xdrproc);
    ...
}

int cli_submit_request(void * req, call_frame_t * frame,
                       rpc_clnt_prog_t * prog, int procnum,
                       struct iobref * iobref,
                       xlator_t * this, fop_cbk_fn_t cbkfn,
                       xdrproc_t xdrproc)
{
    ...
    ret = rpc_clnt_submit(global_rpc, prog, procnum, cbkfn,
                          &iov, count, NULL, 0, iobref, frame,
                          NULL, 0, NULL, 0, NULL)
}

int rpc_clnt_submit(struct rpc_clnt * rpc,
                    rpc_clnt_prog_t * prog, int procnum,
                    fop_cbk_fn_t cbkfn,
                    struct iovec * proghdr, int proghdrcount,
                    struct iovec * progpayload, int progpayloadcount,
                    struct iobref * iobref, void * frame,
                    struct iovec * rsphdr, int rsphdr_count,
                    struct iovec * rsp_payload,int rsp_payload_count,
                    struct iobref * rsp_iobref)
{
    struct iobuf * request_iob = NULL;
    rpc_transport_req_t req;
    ...
    request_iob = rpc_clnt_record(rpc, frame, prog, procnum, proglen,
                                  &rpchdr, callid);
    req.msg.rpchdr = &rpchdr;
    req.msg.rpchdrcount = 1;
    req.msg.proghdr = proghdr;
    req.msg.proghdrcount = proghdrcount;
    req.msg.progpayload = progpayload;
    req.msg.progpayloadcount = progpayloadcount;
    req.msg.iobref = iobref;
    ...
    ret = rpc_transport_submit_request(rpc->conn.trans, &req);
    ...
}

int32_t rpc_transport_submit_request(rpc_transport_t * this,
                                     rpc_transport_req_t * req)
{
    ret = this->ops->submit_request(this, req);
}


int32_t socket_submit_request(rpc_transport_t * this,
                              rpc_transport_req_t * req)
{
    struct ioq * entry = NULL;
    entry = __socket_ioq_new(this, &req->msg);
    
    ret = __socket_ioq_churn_entry(this, entry);
    ...
}

int __socket_ioq_churn_entry(rpc_transport_t *this,
                             struct ioq * entry)
{
    ret = __socket_writev(this, entry->pending_vector,
                          entry->pending_count,
                          &entry->pending_vector,
                          &entry->pending_count);
    ...
}

int __socket_writev(rpc_transport_t * this,
                    struct iovec * vector, int count,
                    struct iovec **pending_vector, int *pendint_count)
{
    ret = __socket_rwv(this, vector, count, pending_vector,
                       pending_count, NULL, 1);
    ...
}

int __socket_rwv(rpc_transport_t *this, struct iovec *vector,
                 int count, struct iovec **pending_vector,
                 int * pending_count, size_t * bytes, int write)
{
    int opcount = 0;
    struct iovec * opvector = NULL;

    opvector = vector;
    opcount = count;
    while(opcount)
    {
        if(write)
        {
            ret = wrtiev(sock, opvector, opcount);
        }
        ...
    }
    ...
}

(2) 服務端處理RPC請求

服務端收到請求後,從socket_event_handler回調到rpc_transport_notify,再回調到rpcsvc_notify,最終調用rpcsvc_handle_rpc_call函數。在這個函數中,解析客戶端RPC請求中包含的程序號,過程號以及相關參數等,而後根據這些程序號,過程號找到對應的處理函數。而這些處理函數就是先前經過rpcsvc_program_register函數註冊的。對應的處理函數處理完成後調用相關函數答覆客戶端。

注: actor並非一個真正的函數,僅標識不一樣RPC請求的處理函數.

(3) 客戶端處理RPC的回覆

客戶端在發送請求時,會將請求的相關信息緩存下來,當收到服務器的迴應後,再根據程序號、過程號找到對應的請求信息,而後調用相應的回調函數對請求結果進行處理。

========================================

總結:文本僅僅講述了RPC的大概運行流程,其中還有不少細節以及相關點都未涉及到,例如狀態機、鎖機制、frame概念、xdr等等,後續須要進一步展開學習研究。

相關文章
相關標籤/搜索