這篇博客主要學習rpc框架yar的協議頭和傳輸的實現, 能力有限,有些語句沒有看懂,因此猜想了一部分。
_yar_header的定義主要在yar_protocol.h和yar_protocol.c裏面,下面介紹這兩個文件裏的源碼。
借用網上的一幅圖片,請求體包括 yar_header + packager_name + yar_request_t 這三個部分,返回相似。
下面主要介紹yar_header的部分。php
// 定義yar_header_t這個結構體的內容,一共82個字節。 typedef struct _yar_header { unsigned int id; // 4 unsigned short version; // 2 unsigned int magic_num; // 4 unsigned int reserved; // 4 unsigned char provider[32]; // 32 unsigned char token[32]; // 32 unsigned int body_len; // 4 } // 下面是聲明兩個函數,稍後在.c文件中說明具體方法 yar_header_t * php_yar_protocol_parse(char *payload); void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved);
// 這個函數的做用應該是給 header指向的結構體賦值 /* htonl函數介紹:將主機數轉換成無符號長整型的網絡字節順序。 * 本函數將一個32位數從主機字節順序轉換成網絡字節順序。 */ void php_yar_protocol_render(yar_header_t *header, uint id, char *provider, char *token, uint body_len, uint reserved) /* {{{ */ { header->magic_num = htonl(YAR_PROTOCOL_MAGIC_NUM); header->id = htonl(id); header->body_len = htonl(body_len); header->reserved = htonl(reserved); if (provider) { memcpy(header->provider, provider, strlen(provider)); } if (token) { memcpy(header->token, token, strlen(token)); } return; } /* }}} */ // 這個函數是將payload指向的字符串,轉成header結構體對應的內容 而後返回。 yar_header_t * php_yar_protocol_parse(char *payload) /* {{{ */ { yar_header_t *header = (yar_header_t *)payload; header->magic_num = ntohl(header->magic_num); if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) { header->magic_num = htonl(header->magic_num); return NULL; } header->id = ntohl(header->id); header->body_len = ntohl(header->body_len); header->reserved = ntohl(header->reserved); return header; } /* }}} */
if (header->magic_num != YAR_PROTOCOL_MAGIC_NUM) { header->magic_num = htonl(header->magic_num); return NULL; }
關於yar傳輸方面源碼分析,仍是先從下面的圖開始吧。感謝做圖的人,比我說的直觀。
傳輸層主要涉及到以下幾個文件:
transports/curl.c、transports/socket.c 、yar_transport.h、yar_transport.c 4個文件。網絡
yar_transport.h 主要定義兩個函數和一些結構體。框架
// 這個應該是傳輸方式的方法定義,後面會在curl.c socket.c中實現下面的內容 typedef struct _yar_transport_interface { void *data; int (*open)(struct _yar_transport_interface *self, zend_string *address, long options, char **msg); int (*send)(struct _yar_transport_interface *self, struct _yar_request *request, char **msg); struct _yar_response * (*exec)(struct _yar_transport_interface *self, struct _yar_request *request); int (*setopt)(struct _yar_transport_interface *self, long type, void *value, void *addition); int (*calldata)(struct _yar_transport_interface *self, yar_call_data_t *calldata); void (*close)(struct _yar_transport_interface *self); } yar_transport_interface_t; typedef struct _yar_transport { const char *name; struct _yar_transport_interface * (*init)(); void (*destroy)(yar_transport_interface_t *self); yar_transport_multi_t *multi; } yar_transport_t; // 下面是並行調用時的結構體定義 typedef struct _yar_transport_multi_interface { void *data; int (*add)(struct _yar_transport_multi_interface *self, yar_transport_interface_t *cp); int (*exec)(struct _yar_transport_multi_interface *self, yar_concurrent_client_callback *callback); void (*close)(struct _yar_transport_multi_interface *self); } yar_transport_multi_interface_t; typedef struct _yar_transport_multi { struct _yar_transport_multi_interface * (*init)(); } yar_transport_multi_t; // 根據名稱得到對應的傳輸方式,curl / sock PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen); // 註冊傳輸方式到yar_transports_list PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport);
這個文件主要實現上面兩個函數,即註冊curl和socket兩種方式,而後根據name得到傳輸方式。curl
// 定義list struct _yar_transports_list { unsigned int size; unsigned int num; const yar_transport_t **transports; } yar_transports_list; // 註冊方法到list裏面 PHP_YAR_API int php_yar_transport_register(const yar_transport_t *transport) /* {{{ */ { if (!yar_transports_list.size) { yar_transports_list.size = 5; yar_transports_list.transports = (const yar_transport_t **)malloc(sizeof(yar_transport_t *) * yar_transports_list.size); } else if (yar_transports_list.num == yar_transports_list.size) { yar_transports_list.size += 5; yar_transports_list.transports = (const yar_transport_t **)realloc(yar_transports_list.transports, sizeof(yar_transport_t *) * yar_transports_list.size); } yar_transports_list.transports[yar_transports_list.num] = transport; return yar_transports_list.num++; } /* }}} */ // 根據name得到方式 PHP_YAR_API const yar_transport_t * php_yar_transport_get(char *name, int nlen) /* {{{ */ { int i = 0; for (;i<yar_transports_list.num;i++) { if (strncmp(yar_transports_list.transports[i]->name, name, nlen) == 0) { return yar_transports_list.transports[i]; } } return NULL; } /* }}} */ // 下面這兩句沒有看懂什麼意思 le_calldata = zend_register_list_destructors_ex(php_yar_calldata_dtor, NULL, "Yar Call Data", module_number); le_plink = zend_register_list_destructors_ex(NULL, php_yar_plink_dtor, "Yar Persistent Link", module_number);
主要是socket方式須要的方法的具體實現, 先看下面方法
用yar_transport_t 定義一個常量socket
const yar_transport_t yar_transport_socket = { "sock", php_yar_socket_init, php_yar_socket_destroy, NULL }; /* }}} */
php_yar_socket_init的定義以下ide
yar_transport_interface_t * php_yar_socket_init() /* {{{ */ { yar_socket_data_t *data; yar_transport_interface_t *self; self = emalloc(sizeof(yar_transport_interface_t)); self->data = data = ecalloc(1, sizeof(yar_socket_data_t)); self->open = php_yar_socket_open; self->send = php_yar_socket_send; self->exec = php_yar_socket_exec; self->setopt = php_yar_socket_setopt; self->calldata = NULL; self->close = php_yar_socket_close; return self; } /* }}} */
而後咱們再看yar_socket_data_t的定義, 其實主要使用php_stream這個流類型 底層的具體實現不追了。函數
typedef struct _yar_socket_data_t { char persistent; php_stream *stream; // 主要 } yar_socket_data_t;
下面是php_yar_socket_open函數的實現源碼分析
php_stream *stream = NULL; ... // 這個是核心代碼,應該是根據地址和配置建立一個流的對象。 stream = php_stream_xport_create(ZSTR_VAL(address), ZSTR_LEN(address), 0, STREAM_XPORT_CLIENT|STREAM_XPORT_CONNECT, persistent_key, &tv, NULL, &errstr, &err);
php_yar_socket_close, 這個函數就是關閉流,而後釋放空間學習
if (!data->persistent && data->stream) { php_stream_close(data->stream); }
php_yar_socket_send 這個函數主要是把須要發送的內容,按照必定格式pack,而後放到stream中。
裏面有幾個函數沒有看懂,之後在詳細查找:ui
// 先構造header的內容 php_yar_protocol_render(&header, request->id, "Yar PHP Client", NULL, ZSTR_LEN(payload), data->persistent? YAR_PROTOCOL_PERSISTENT : 0); // 複製到buf這個變量中 memcpy(buf, (char *)&header, sizeof(yar_header_t)); // 這個地方 我不太瞭解爲何要用goto wait_io: if (bytes_left) { retval = php_select(fd+1, NULL, &rfds, NULL, &tv); if (retval == -1) { zend_string_release(payload); spprintf(msg, 0, "select error '%s'", strerror(errno)); return 0; } else if (retval == 0) { zend_string_release(payload); spprintf(msg, 0, "select timeout %ldms reached", YAR_G(timeout)); return 0; } if (PHP_SAFE_FD_ISSET(fd, &rfds)) { if ((ret = php_stream_xport_sendto(data->stream, ZSTR_VAL(payload) + bytes_sent, bytes_left, 0, NULL, 0)) > 0) { bytes_left -= ret; bytes_sent += ret; } } goto wait_io; } }
curl.c跟socket相似,依賴curl/curl.h文件裏的內容,如
CURL *cp;