swoole_client
提供了 tcp/udp socket
的客戶端的封裝代碼,使用時僅需 new swoole_client
便可。 swoole
的 socket client
對比 PHP
提供的 stream
族函數有哪些好處:php
stream
函數存在超時設置的陷阱和 Bug
,一旦沒處理好會致使 Server
端長時間阻塞fread
有 8192
長度限制,沒法支持 UDP
的大包swoole_client
支持 waitall
,在有肯定包長度時可一次取完,沒必要循環讀取swoole_client
支持 UDP connect
,解決了 UDP
串包問題swoole_client
是純 C
的代碼,專門處理 socket
,stream
函數很是複雜。swoole_client
性能更好除了普通的同步阻塞+select
的使用方法外,swoole_client
還支持異步非阻塞回調。react
swoole_client::__construct
構造函數構造函數的邏輯很簡單,就是更新 swoole_client_class_entry_ptr
指針的 type
屬性與 key
屬性。apache
當 type
是異步客戶端的時候,不能在 CLI
模式下使用 SWOOLE_KEEP
。數組
#define php_swoole_socktype(type) (type & (~SW_FLAG_SYNC) & (~SW_FLAG_ASYNC) & (~SW_FLAG_KEEP) & (~SW_SOCK_SSL)) static PHP_METHOD(swoole_client, __construct) { long async = 0; long type = 0; char *id = NULL; zend_size_t len = 0; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l|ls", &type, &async, &id, &len) == FAILURE) { swoole_php_fatal_error(E_ERROR, "socket type param is required."); RETURN_FALSE; } if (async == 1) { type |= SW_FLAG_ASYNC; } if ((type & SW_FLAG_ASYNC)) { if ((type & SW_FLAG_KEEP) && SWOOLE_G(cli)) { swoole_php_fatal_error(E_ERROR, "The 'SWOOLE_KEEP' flag can only be used in the php-fpm or apache environment."); } php_swoole_check_reactor(); } int client_type = php_swoole_socktype(type); if (client_type < SW_SOCK_TCP || client_type > SW_SOCK_UNIX_STREAM) { swoole_php_fatal_error(E_ERROR, "Unknown client type '%d'.", client_type); } zend_update_property_long(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("type"), type TSRMLS_CC); if (id) { zend_update_property_stringl(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("id"), id, len TSRMLS_CC); } else { zend_update_property_null(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("id") TSRMLS_CC); } //init swoole_set_object(getThis(), NULL); swoole_set_property(getThis(), client_property_callback, NULL); #ifdef SWOOLE_SOCKETS_SUPPORT swoole_set_property(getThis(), client_property_socket, NULL); #endif RETURN_TRUE; }
swoole_client->set
屬性設置設置客戶端參數,必須在 connect
前執行。本函數用於從 zend
中讀出 client
的屬性,而且和用戶的配置參數合併。安全
static PHP_METHOD(swoole_client, set) { zval *zset; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "z", &zset) == FAILURE) { return; } if (Z_TYPE_P(zset) != IS_ARRAY) { RETURN_FALSE; } zval *zsetting = php_swoole_read_init_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("setting") TSRMLS_CC); sw_php_array_merge(Z_ARRVAL_P(zsetting), Z_ARRVAL_P(zset)); RETURN_TRUE; } static sw_inline zval* php_swoole_read_init_property(zend_class_entry *scope, zval *object, const char *p, size_t pl TSRMLS_DC) { zval *property = sw_zend_read_property(scope, object, p, pl, 1 TSRMLS_CC); if (property == NULL || ZVAL_IS_NULL(property)) { SW_MAKE_STD_ZVAL(property); array_init(property); zend_update_property(scope, object, p, pl, property TSRMLS_CC); sw_zval_ptr_dtor(&property); return sw_zend_read_property(scope, object, p, pl, 1 TSRMLS_CC); } else { return property; } }
swoole_client->on
註冊異步事件回調函數註冊異步事件回調函數,函數主要更新 swoole_client_class_entry_ptr
中的各個屬性,並將其屬性回調函數賦值給 client_property_callback
當中。服務器
static PHP_METHOD(swoole_client, on) { char *cb_name; zend_size_t cb_name_len; zval *zcallback; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sz", &cb_name, &cb_name_len, &zcallback) == FAILURE) { return; } zval *ztype = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), SW_STRL("type")-1, 0 TSRMLS_CC); client_callback *cb = (client_callback *) swoole_get_property(getThis(), client_property_callback); if (!cb) { cb = (client_callback *) emalloc(sizeof(client_callback)); bzero(cb, sizeof(client_callback)); swoole_set_property(getThis(), client_property_callback, cb); } if (strncasecmp("connect", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onConnect"), zcallback TSRMLS_CC); cb->onConnect = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onConnect"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onConnect, cb->_onConnect); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onConnect = func_cache; #endif } else if (strncasecmp("receive", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onReceive"), zcallback TSRMLS_CC); cb->onReceive = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onReceive"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onReceive, cb->_onReceive); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onReceive = func_cache; #endif } else if (strncasecmp("close", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onClose"), zcallback TSRMLS_CC); cb->onClose = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onClose"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onClose, cb->_onClose); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onClose = func_cache; #endif } else if (strncasecmp("error", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onError"), zcallback TSRMLS_CC); cb->onError = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onError"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onError, cb->_onError); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onError = func_cache; #endif } else if (strncasecmp("bufferFull", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onBufferFull"), zcallback TSRMLS_CC); cb->onBufferFull = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onBufferFull"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onBufferFull, cb->_onBufferFull); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onBufferFull = func_cache; #endif } else if (strncasecmp("bufferEmpty", cb_name, cb_name_len) == 0) { zend_update_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onBufferEmpty"), zcallback TSRMLS_CC); cb->onBufferEmpty = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("onBufferEmpty"), 0 TSRMLS_CC); sw_copy_to_stack(cb->onBufferEmpty, cb->_onBufferEmpty); #ifdef PHP_SWOOLE_ENABLE_FASTCALL cb->cache_onBufferEmpty = func_cache; #endif } else { swoole_php_fatal_error(E_WARNING, "Unknown event callback type name '%s'.", cb_name); RETURN_FALSE; } RETURN_TRUE; }
swoole_client->connect
connect
connect
是客戶端模塊核心的函數。swoole
PHP
內部函數使用 zend_parse_parameters() API
接受參數,將輸入參數轉換成 c
變量。不幸的是,每次調用這個函數時都要對這個這個字符串進行解析,這會加劇性能開銷。在PHP7中新提供的方式。是爲了提升參數解析的性能。對應常用的方法,建議使用 FAST ZPP
方式。php_swoole_client_new
函數建立一個 swClient
客戶端對象TCP
客戶端,設置 sock_flag
爲 1,爲後面的異步 connect
的參數php_swoole_client_check_setting
設置 swClient
對象的屬性若是客戶端是異步的:多線程
TCP
客戶端,須要驗證是否設置了 onConnect
、onError
、onClose
三個回調函數,若是沒有返回錯誤。onBufferFull
、onBufferEmpty
是可選回調函數。UDP
客戶端,須要驗證是否設置了 onReceive
等函數,不然返回錯誤。onConnect
、onClose
兩個函數是可選回調函數。swClient
中的 onConnect
等函數並無直接使用用戶的回調函數,而是使用 client_onConnect
等函數,將用戶的回調函數放在了 cli->object
的屬性中。connect
函數與服務端進行鏈接。static PHP_METHOD(swoole_client, connect) { zend_long port = 0, sock_flag = 0; char *host = NULL; zend_size_t host_len; double timeout = SW_CLIENT_DEFAULT_TIMEOUT; #ifdef FAST_ZPP ZEND_PARSE_PARAMETERS_START(1, 4) Z_PARAM_STRING(host, host_len) Z_PARAM_OPTIONAL Z_PARAM_LONG(port) Z_PARAM_DOUBLE(timeout) Z_PARAM_LONG(sock_flag) ZEND_PARSE_PARAMETERS_END(); #else if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|ldl", &host, &host_len, &port, &timeout, &sock_flag) == FAILURE) { return; } #endif swClient *cli = (swClient *) swoole_get_object(getThis()); cli = php_swoole_client_new(getThis(), host, host_len, port); swoole_set_object(getThis(), cli); if (cli->type == SW_SOCK_TCP || cli->type == SW_SOCK_TCP6) { if (cli->async == 1) { //for tcp: nonblock //for udp: have udp connect sock_flag = 1; } } if (cli->keep == 1 && cli->socket->active == 1) { zend_update_property_bool(swoole_client_class_entry_ptr, getThis(), SW_STRL("reuse")-1, 1 TSRMLS_CC); RETURN_TRUE; } else if (cli->socket->active == 1) { swoole_php_fatal_error(E_WARNING, "connection to the server has already been established."); RETURN_FALSE; } zval *zset = sw_zend_read_property(swoole_client_class_entry_ptr, getThis(), ZEND_STRL("setting"), 1 TSRMLS_CC); if (zset && !ZVAL_IS_NULL(zset)) { php_swoole_client_check_setting(cli, zset TSRMLS_CC); } //nonblock async if (cli->async) { client_callback *cb = (client_callback *) swoole_get_property(getThis(), 0); if (!cb) { swoole_php_fatal_error(E_ERROR, "no event callback function."); RETURN_FALSE; } if (swSocket_is_stream(cli->type)) { if (!cb->onConnect) { swoole_php_fatal_error(E_ERROR, "no 'onConnect' callback function."); RETURN_FALSE; } if (!cb->onError) { swoole_php_fatal_error(E_ERROR, "no 'onError' callback function."); RETURN_FALSE; } if (!cb->onClose) { swoole_php_fatal_error(E_ERROR, "no 'onClose' callback function."); RETURN_FALSE; } cli->onConnect = client_onConnect; cli->onClose = client_onClose; cli->onError = client_onError; cli->onReceive = client_onReceive; cli->reactor_fdtype = PHP_SWOOLE_FD_STREAM_CLIENT; if (cb->onBufferFull) { cli->onBufferFull = client_onBufferFull; } if (cb->onBufferEmpty) { cli->onBufferEmpty = client_onBufferEmpty; } } else { if (!cb || !cb->onReceive) { swoole_php_fatal_error(E_ERROR, "no 'onReceive' callback function."); RETURN_FALSE; } if (cb->onConnect) { cli->onConnect = client_onConnect; } if (cb->onClose) { cli->onClose = client_onClose; } cli->onReceive = client_onReceive; cli->reactor_fdtype = PHP_SWOOLE_FD_DGRAM_CLIENT; } zval *zobject = getThis(); cli->object = zobject; sw_copy_to_stack(cli->object, cb->_object); sw_zval_add_ref(&zobject); } //nonblock async if (cli->connect(cli, host, port, timeout, sock_flag) < 0) { if (errno == 0 ) { if (SwooleG.error == SW_ERROR_DNSLOOKUP_RESOLVE_FAILED) { swoole_php_error(E_WARNING, "connect to server[%s:%d] failed. Error: %s[%d]", host, (int )port, hstrerror(h_errno), h_errno); } zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC); } else { swoole_php_sys_error(E_WARNING, "connect to server[%s:%d] failed.", host, (int )port); zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, errno TSRMLS_CC); } RETURN_FALSE; } RETURN_TRUE; }
php_swoole_client_new
php_swoole_client_new
會新建一個 swClient
客戶端對象,具體流程以下:dom
type
屬性,觀察是不是異步客戶端 SW_FLAG_ASYNC
id
屬性,觀察是否存在着 connection_id
,若是沒有傳入 connection_id
就要根據主域與端口號來建立 connection_id
若是當前客戶端要保持長鏈接,要試圖從 php_sw_long_connections
中根據 connection_id
取出客戶端對象,複用鏈接異步
php_sw_long_connections
哈希表中並無 connection_id
,那麼新申請一個 swClient
放入哈希表中,並調到 create_socket
建立客戶端對象。create_socket
建立一個新的客戶端對象;若是還可使用鏈接,就遞增 reuse_count
,swClient_create
建立一個新的客戶端swClient* php_swoole_client_new(zval *object, char *host, int host_len, int port) { zval *ztype; int async = 0; char conn_key[SW_LONG_CONNECTION_KEY_LEN]; int conn_key_len = 0; uint64_t tmp_buf; int ret; ztype = sw_zend_read_property(Z_OBJCE_P(object), object, SW_STRL("type")-1, 0 TSRMLS_CC); long type = Z_LVAL_P(ztype); //new flag, swoole-1.6.12+ if (type & SW_FLAG_ASYNC) { async = 1; } swClient *cli; bzero(conn_key, SW_LONG_CONNECTION_KEY_LEN); zval *connection_id = sw_zend_read_property(Z_OBJCE_P(object), object, ZEND_STRL("id"), 1 TSRMLS_CC); if (connection_id == NULL || ZVAL_IS_NULL(connection_id)) { conn_key_len = snprintf(conn_key, SW_LONG_CONNECTION_KEY_LEN, "%s:%d", host, port) + 1; } else { conn_key_len = snprintf(conn_key, SW_LONG_CONNECTION_KEY_LEN, "%s", Z_STRVAL_P(connection_id)) + 1; } //keep the tcp connection if (type & SW_FLAG_KEEP) { swClient *find = swHashMap_find(php_sw_long_connections, conn_key, conn_key_len); if (find == NULL) { cli = (swClient*) pemalloc(sizeof(swClient), 1); if (swHashMap_add(php_sw_long_connections, conn_key, conn_key_len, cli) == FAILURE) { swoole_php_fatal_error(E_WARNING, "failed to add swoole_client_create_socket to hashtable."); } goto create_socket; } else { cli = find; //try recv, check connection status ret = recv(cli->socket->fd, &tmp_buf, sizeof(tmp_buf), MSG_DONTWAIT | MSG_PEEK); if (ret == 0 || (ret < 0 && swConnection_error(errno) == SW_CLOSE)) { cli->close(cli); goto create_socket; } cli->reuse_count ++; zend_update_property_long(Z_OBJCE_P(object), object, ZEND_STRL("reuseCount"), cli->reuse_count TSRMLS_CC); } } else { cli = (swClient*) emalloc(sizeof(swClient)); create_socket: if (swClient_create(cli, php_swoole_socktype(type), async) < 0) { swoole_php_fatal_error(E_WARNING, "swClient_create() failed. Error: %s [%d]", strerror(errno), errno); zend_update_property_long(Z_OBJCE_P(object), object, ZEND_STRL("errCode"), errno TSRMLS_CC); return NULL; } //don't forget free it cli->server_str = sw_strndup(conn_key, conn_key_len); cli->server_strlen = conn_key_len; } zend_update_property_long(Z_OBJCE_P(object), object, ZEND_STRL("sock"), cli->socket->fd TSRMLS_CC); if (type & SW_FLAG_KEEP) { cli->keep = 1; } #ifdef SW_USE_OPENSSL if (type & SW_SOCK_SSL) { cli->open_ssl = 1; } #endif return cli; }
swClient_create
type
類型來建立不一樣的 socket
套接字reactor
對象(能夠分爲 reactor
線程和其餘進程 reactor
)cli->socket
屬性cli->reactor->setHandle
設定 reactor
的回調函數swClient
的各類函數int swClient_create(swClient *cli, int type, int async) { int _domain; int _type; bzero(cli, sizeof(swClient)); switch (type) { case SW_SOCK_TCP: _domain = AF_INET; _type = SOCK_STREAM; break; case SW_SOCK_TCP6: _domain = AF_INET6; _type = SOCK_STREAM; break; case SW_SOCK_UNIX_STREAM: _domain = AF_UNIX; _type = SOCK_STREAM; break; case SW_SOCK_UDP: _domain = AF_INET; _type = SOCK_DGRAM; break; case SW_SOCK_UDP6: _domain = AF_INET6; _type = SOCK_DGRAM; break; case SW_SOCK_UNIX_DGRAM: _domain = AF_UNIX; _type = SOCK_DGRAM; break; default: return SW_ERR; } #ifdef SOCK_CLOEXEC int sockfd = socket(_domain, _type | SOCK_CLOEXEC, 0); #else int sockfd = socket(_domain, _type, 0); #endif if (sockfd < 0) { swWarn("socket() failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } if (async) { if (swIsMaster() && SwooleTG.type == SW_THREAD_REACTOR) { cli->reactor = SwooleTG.reactor; } else { cli->reactor = SwooleG.main_reactor; } cli->socket = swReactor_get(cli->reactor, sockfd); } else { cli->socket = sw_malloc(sizeof(swConnection)); } cli->buffer_input_size = SW_CLIENT_BUFFER_SIZE; bzero(cli->socket, sizeof(swConnection)); cli->socket->fd = sockfd; cli->socket->object = cli; if (async) { swSetNonBlock(cli->socket->fd); if (!swReactor_handle_isset(cli->reactor, SW_FD_STREAM_CLIENT)) { cli->reactor->setHandle(cli->reactor, SW_FD_STREAM_CLIENT | SW_EVENT_READ, swClient_onStreamRead); cli->reactor->setHandle(cli->reactor, SW_FD_DGRAM_CLIENT | SW_EVENT_READ, swClient_onDgramRead); cli->reactor->setHandle(cli->reactor, SW_FD_STREAM_CLIENT | SW_EVENT_WRITE, swClient_onWrite); cli->reactor->setHandle(cli->reactor, SW_FD_STREAM_CLIENT | SW_EVENT_ERROR, swClient_onError); } } if (swSocket_is_stream(type)) { cli->recv = swClient_tcp_recv_no_buffer; if (async) { cli->connect = swClient_tcp_connect_async; cli->send = swClient_tcp_send_async; cli->sendfile = swClient_tcp_sendfile_async; cli->pipe = swClient_tcp_pipe; cli->socket->dontwait = 1; } else { cli->connect = swClient_tcp_connect_sync; cli->send = swClient_tcp_send_sync; cli->sendfile = swClient_tcp_sendfile_sync; } cli->reactor_fdtype = SW_FD_STREAM_CLIENT; } else { cli->connect = swClient_udp_connect; cli->recv = swClient_udp_recv; cli->send = swClient_udp_send; cli->reactor_fdtype = SW_FD_DGRAM_CLIENT; } cli->_sock_domain = _domain; cli->_sock_type = _type; cli->close = swClient_close; cli->type = type; cli->async = async; cli->protocol.package_length_type = 'N'; cli->protocol.package_length_size = 4; cli->protocol.package_body_offset = 0; cli->protocol.package_max_length = SW_BUFFER_INPUT_SIZE; cli->protocol.onPackage = swClient_onPackage; return SW_OK; }
swClient_tcp_connect_async
異步客戶端數據流鏈接onConnect
、onError
、onClose
回調函數不能少。swClient_inet_addr
用於爲 cli->server_addr.addr
賦值,主要是要利用 htons
、inet_pton
轉化數值。cli->wait_dns
是 1,須要 AIO
模塊來異步加載 DNS
,進行 swAio_dispatch
以後本函數就會馬上返回 true
AIO
模塊解析了 DNS
以後,cli->wait_dns
會被重置爲 0,再次調用本函數swClient_tcp_connect_async
connect
函數進行創建鏈接,遇到 EINTR
信號中斷要進行重試EINPROGRESS
的時候,將套接字放入 reactor
中,並設置超時時間。當咱們以非阻塞的方式來進行鏈接的時候,返回的結果若是是 -1, 這並不表明此次鏈接發生了錯誤,若是它的返回結果是 EINPROGRESS
,那麼就表明鏈接還在進行中。 後面能夠將套接字放入 reactor
中,若是能夠寫,說明鏈接完成了。static int swClient_tcp_connect_async(swClient *cli, char *host, int port, double timeout, int nonblock) { int ret; cli->timeout = timeout; if (!cli->buffer) { //alloc input memory buffer cli->buffer = swString_new(cli->buffer_input_size); if (!cli->buffer) { return SW_ERR; } } if (!(cli->onConnect && cli->onError && cli->onClose)) { swWarn("onConnect/onError/onClose callback have not set."); return SW_ERR; } if (cli->onBufferFull && cli->buffer_high_watermark == 0) { cli->buffer_high_watermark = cli->socket->buffer_size * 0.8; } if (swClient_inet_addr(cli, host, port) < 0) { return SW_ERR; } if (cli->wait_dns) { if (SwooleAIO.init == 0) { swAio_init(); } swAio_event ev; bzero(&ev, sizeof(swAio_event)); int len = strlen(cli->server_host); if (strlen(cli->server_host) < SW_IP_MAX_LENGTH) { ev.nbytes = SW_IP_MAX_LENGTH; } else { ev.nbytes = len + 1; } ev.buf = sw_malloc(ev.nbytes); if (!ev.buf) { swWarn("malloc failed."); return SW_ERR; } memcpy(ev.buf, cli->server_host, len); ((char *) ev.buf)[len] = 0; ev.flags = cli->_sock_domain; ev.type = SW_AIO_GETHOSTBYNAME; ev.object = cli; ev.fd = cli->socket->fd; ev.callback = swClient_onResolveCompleted; if (swAio_dispatch(&ev) < 0) { sw_free(ev.buf); return SW_ERR; } else { return SW_OK; } } while (1) { ret = connect(cli->socket->fd, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len); if (ret < 0) { if (errno == EINTR) { continue; } SwooleG.error = errno; } break; } if ((ret < 0 && errno == EINPROGRESS) || ret == 0) { if (cli->reactor->add(cli->reactor, cli->socket->fd, cli->reactor_fdtype | SW_EVENT_WRITE) < 0) { return SW_ERR; } if (timeout > 0) { if (SwooleG.timer.fd == 0) { swTimer_init((int) (timeout * 1000)); } cli->timer = SwooleG.timer.add(&SwooleG.timer, (int) (timeout * 1000), 0, cli, swClient_onTimeout); } return SW_OK; } return ret; }
swClient_inet_addr
轉化地址爲了解決大端小端問題,就不能直接在 connect
函數參數中傳數組,而是應該利用 htons
、inet_pton
等函數進行轉化。
static int swClient_inet_addr(swClient *cli, char *host, int port) { ... cli->server_host = host; cli->server_port = port; void *addr = NULL; if (cli->type == SW_SOCK_TCP || cli->type == SW_SOCK_UDP) { cli->server_addr.addr.inet_v4.sin_family = AF_INET; cli->server_addr.addr.inet_v4.sin_port = htons(port); cli->server_addr.len = sizeof(cli->server_addr.addr.inet_v4); addr = &cli->server_addr.addr.inet_v4.sin_addr.s_addr; if (inet_pton(AF_INET, host, addr)) { return SW_OK; } } else if (cli->type == SW_SOCK_TCP6 || cli->type == SW_SOCK_UDP6) { cli->server_addr.addr.inet_v6.sin6_family = AF_INET6; cli->server_addr.addr.inet_v6.sin6_port = htons(port); cli->server_addr.len = sizeof(cli->server_addr.addr.inet_v6); addr = cli->server_addr.addr.inet_v6.sin6_addr.s6_addr; if (inet_pton(AF_INET6, host, addr)) { return SW_OK; } } else if (cli->type == SW_SOCK_UNIX_STREAM || cli->type == SW_SOCK_UNIX_DGRAM) { cli->server_addr.addr.un.sun_family = AF_UNIX; strncpy(cli->server_addr.addr.un.sun_path, host, sizeof(cli->server_addr.addr.un.sun_path) - 1); cli->server_addr.addr.un.sun_path[sizeof(cli->server_addr.addr.un.sun_path) - 1] = 0; cli->server_addr.len = sizeof(cli->server_addr.addr.un.sun_path); return SW_OK; } else { return SW_ERR; } if (!cli->async) { ... } else { cli->wait_dns = 1; } return SW_OK; }
AIO
異步 DNS
解析所謂的 AIO
異步並非操做系統中真正的異步系統調用,而是 swoole
利用線程池 + reactor
實現的異步任務系統,當線程完成任務後,就會執行相應的 callback
函數,這裏 DNS
異步解析事件的回調函數就是 swClient_onResolveCompleted
:
static void swClient_onResolveCompleted(swAio_event *event) { swConnection *socket = swReactor_get(SwooleG.main_reactor, event->fd); if (socket->removed) { sw_free(event->buf); return; } swClient *cli = event->object; cli->wait_dns = 0; if (event->error == 0) { swClient_tcp_connect_async(cli, event->buf, cli->server_port, cli->timeout, 1); } else { SwooleG.error = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; cli->socket->removed = 1; cli->close(cli); if (cli->onError) { cli->onError(cli); } } sw_free(event->buf); }
DNS
解析DNS
具體的解析函數是 swoole_gethostbyname
:
DNS
的方法有兩種:gethostbyname2_r
與 gethostbyname2
,gethostbyname2_r
是線程安全函數,能夠用於多線程。gethostbyname2_r
中的第四個參數 buf
用於存放臨時數據ip
地址可能會有多個,函數只把第一個 ip
地址複製到 addr
中。struct hostent { char *h_name; //主機的規範名 char **h_aliases; //主機的別名 int h_addrtype;//主機ip地址的類型,究竟是(AF_INET),仍是(AF_INET6) int h_length;//主機ip地址的長度 char **h_addr_list;//主機的ip地址 }; #ifdef HAVE_GETHOSTBYNAME2_R int swoole_gethostbyname(int flags, char *name, char *addr) { int __af = flags & (~SW_DNS_LOOKUP_RANDOM); int index = 0; int rc, err; int buf_len = 256; struct hostent hbuf; struct hostent *result; char *buf = (char*) sw_malloc(buf_len); memset(buf, 0, buf_len); while ((rc = gethostbyname2_r(name, __af, &hbuf, buf, buf_len, &result, &err)) == ERANGE) { buf_len *= 2; void *tmp = sw_realloc(buf, buf_len); if (NULL == tmp) { sw_free(buf); return SW_ERR; } else { buf = tmp; } } if (0 != rc || NULL == result) { sw_free(buf); return SW_ERR; } union { char v4[INET_ADDRSTRLEN]; char v6[INET6_ADDRSTRLEN]; } addr_list[SW_DNS_HOST_BUFFER_SIZE]; int i = 0; for (i = 0; i < SW_DNS_HOST_BUFFER_SIZE; i++) { if (hbuf.h_addr_list[i] == NULL) { break; } if (__af == AF_INET) { memcpy(addr_list[i].v4, hbuf.h_addr_list[i], hbuf.h_length); } else { memcpy(addr_list[i].v6, hbuf.h_addr_list[i], hbuf.h_length); } } if (__af == AF_INET) { memcpy(addr, addr_list[index].v4, hbuf.h_length); } else { memcpy(addr, addr_list[index].v6, hbuf.h_length); } sw_free(buf); return SW_OK; } #else int swoole_gethostbyname(int flags, char *name, char *addr) { int __af = flags & (~SW_DNS_LOOKUP_RANDOM); int index = 0; struct hostent *host_entry; if (!(host_entry = gethostbyname2(name, __af))) { return SW_ERR; } union { char v4[INET_ADDRSTRLEN]; char v6[INET6_ADDRSTRLEN]; } addr_list[SW_DNS_HOST_BUFFER_SIZE]; int i = 0; for (i = 0; i < SW_DNS_HOST_BUFFER_SIZE; i++) { if (host_entry->h_addr_list[i] == NULL) { break; } if (__af == AF_INET) { memcpy(addr_list[i].v4, host_entry->h_addr_list[i], host_entry->h_length); } else { memcpy(addr_list[i].v6, host_entry->h_addr_list[i], host_entry->h_length); } } if (__af == AF_INET) { memcpy(addr, addr_list[index].v4, host_entry->h_length); } else { memcpy(addr, addr_list[index].v6, host_entry->h_length); } return SW_OK; } #endif
swClient_tcp_connect_sync
同步數據流鏈接TCP
鏈接相似,函數首先要調用 swClient_inet_addr
來轉化主域與端口號,若是 inet_pton
函數返回成功,說明並不須要進行 DNS
解析,直接返回;不然就要同步調用 swoole_gethostbyname
函數進行 DNS
的解析。swSocket_set_timeout
函數爲套接字設置超時時間。openssl
要進行 SSL
握手。static int swClient_inet_addr(swClient *cli, char *host, int port) { ... if (!cli->async) { if (swoole_gethostbyname(cli->_sock_domain, host, addr) < 0) { SwooleG.error = SW_ERROR_DNSLOOKUP_RESOLVE_FAILED; return SW_ERR; } } ... } int swSocket_set_timeout(int sock, double timeout) { int ret; struct timeval timeo; timeo.tv_sec = (int) timeout; timeo.tv_usec = (int) ((timeout - timeo.tv_sec) * 1000 * 1000); ret = setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (void *) &timeo, sizeof(timeo)); if (ret < 0) { swWarn("setsockopt(SO_SNDTIMEO) failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } ret = setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (void *) &timeo, sizeof(timeo)); if (ret < 0) { swWarn("setsockopt(SO_RCVTIMEO) failed. Error: %s[%d]", strerror(errno), errno); return SW_ERR; } return SW_OK; } static int swClient_tcp_connect_sync(swClient *cli, char *host, int port, double timeout, int nonblock) { int ret, n; char buf[1024]; cli->timeout = timeout; if (swClient_inet_addr(cli, host, port) < 0) { return SW_ERR; } if (nonblock == 1) { swSetNonBlock(cli->socket->fd); } else { if (cli->timeout > 0) { swSocket_set_timeout(cli->socket->fd, timeout); } } while (1) { ret = connect(cli->socket->fd, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len); #endif if (ret < 0) { if (errno == EINTR) { continue; } } break; } if (ret >= 0) { cli->socket->active = 1; #ifdef SW_USE_OPENSSL if (cli->open_ssl) { if (swClient_enable_ssl_encrypt(cli) < 0) { return SW_ERR; } if (swClient_ssl_handshake(cli) < 0) { return SW_ERR; } } #endif } return ret; }
SSL
客戶端加密SSL
隧道加密流程類似,首先須要建立上下文 swSSL_get_context
;若是要驗證對端證書,須要調用 swSSL_set_capath
設置證書地址;若是使用 http2
協商,要使用 SSL_CTX_set_alpn_protos
函數設置 h2
建立上下文以後,就能夠利用 swSSL_create
建立 SSL
對象綁定套接字,並利用 SSL_set_tlsext_host_name
設置 SSL
主域,SSL_set_tlsext_host_name(s,name)
函數來設置ClientHello
中的 Server Name
SNI
是Server Name Indication
的縮寫,是爲了解決一個服務器使用多個域名和證書的SSL/TLS
擴展。它容許客戶端在發起SSL
握手請求時(客戶端發出ClientHello
消息中)提交請求的HostName
信息,使得服務器可以切換到正確的域並返回相應的證書。在
SNI
出現以前,HostName
信息只存在於HTTP
請求中,但SSL/TLS
層沒法獲知這一信息。經過將HostName
的信息加入到SNI
擴展中,SSL/TLS
容許服務器使用一個IP
爲不一樣的域名提供不一樣的證書,從而可以與使用同一個IP
的多個「虛擬主機」更方便地創建安全鏈接。
swSSL_connect
函數進行 SSL
握手鍊接swClient_ssl_verify
函數。int swClient_enable_ssl_encrypt(swClient *cli) { cli->ssl_context = swSSL_get_context(&cli->ssl_option); if (cli->ssl_context == NULL) { return SW_ERR; } if (cli->ssl_option.verify_peer) { if (swSSL_set_capath(&cli->ssl_option, cli->ssl_context) < 0) { return SW_ERR; } } cli->socket->ssl_send = 1; #if defined(SW_USE_HTTP2) && defined(SW_USE_OPENSSL) && OPENSSL_VERSION_NUMBER >= 0x10002000L if (cli->http2) { if (SSL_CTX_set_alpn_protos(cli->ssl_context, (const unsigned char *) "\x02h2", 3) < 0) { return SW_ERR; } } #endif return SW_OK; } int swClient_ssl_handshake(swClient *cli) { if (!cli->socket->ssl) { if (swSSL_create(cli->socket, cli->ssl_context, SW_SSL_CLIENT) < 0) { return SW_ERR; } #ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME if (cli->ssl_option.tls_host_name) { SSL_set_tlsext_host_name(cli->socket->ssl, cli->ssl_option.tls_host_name); } #endif } if (swSSL_connect(cli->socket) < 0) { return SW_ERR; } if (cli->socket->ssl_state == SW_SSL_STATE_READY && cli->ssl_option.verify_peer) { if (swClient_ssl_verify(cli, cli->ssl_option.allow_self_signed) < 0) { return SW_ERR; } } return SW_OK; }
swSSL_set_capath
提供可信任證書庫:
// 在建立上下文結構以後,必須加載一個可信任證書庫。這是成功驗證每一個證書所必需的。 // 若是不能確認證書是可信任的,那麼 OpenSSL 會將證書標記爲無效(但鏈接仍能夠繼續)。 // OpenSSL 附帶了一組可信任證書。它們位於源文件目錄樹的 certs/demo 目錄中。 // 不過,每一個證書都是一個獨立的文件,也就是說,須要單獨加載每個證書。 // 在 certs 目錄下,還有一個 expired 存放過時證書的子目錄,試圖加載這些證書將會出錯。 // 在數字證書進行信任驗證以前, // 必須爲在爲安全鏈接設置時建立的 OpenSSL SSL_CTX 對象提供一個默認的信任證書, // 這可使用幾種方法來提供, // 可是最簡單的方法是將這個證書保存爲一個 PEM 文件, // 並使用 // SSL_CTX_load_verify_locations( ctx, file, path ); // 將其加載到 OpenSSL 中。 // 該函數有三個參數: // ctx - 上下文指針( SSL_CTX_new 函數返回 ); // file - 包含一個或多個 PEM 格式的證書的文件的路徑(必需); // path - 到一個或多個 PEM 格式文件的路徑,不過文件名必須使用特定的格式(可爲 NULL); // 若是指定成功,則返回 1 ,若是遇到問題,則返回 0 。 // 儘管當信任證書在一個目錄中有多個單獨的文件時更容易添加或更新, // 可是您不太可能會如此頻繁地更新信任證書,所以沒必要擔憂這個問題。 int swSSL_set_capath(swSSL_option *cfg, SSL_CTX *ctx) { if (cfg->cafile || cfg->capath) { if (!SSL_CTX_load_verify_locations(ctx, cfg->cafile, cfg->capath)) { return SW_ERR; } } else { if (!SSL_CTX_set_default_verify_paths(ctx)) { swWarn("Unable to set default verify locations and no CA settings specified."); return SW_ERR; } } if (cfg->verify_depth > 0) { SSL_CTX_set_verify_depth(ctx, cfg->verify_depth); } return SW_OK; }
swSSL_connect
函數就是簡單調用 SSL_connect
來鏈接服務端:
int swSSL_connect(swConnection *conn) { int n = SSL_connect(conn->ssl); if (n == 1) { conn->ssl_state = SW_SSL_STATE_READY; conn->ssl_want_read = 0; conn->ssl_want_write = 0; #ifdef SW_LOG_TRACE_OPEN const char *ssl_version = SSL_get_version(conn->ssl); const char *ssl_cipher = SSL_get_cipher_name(conn->ssl); swTraceLog(SW_TRACE_SSL, "connected (%s %s)", ssl_version, ssl_cipher); #endif return SW_OK; } long err = SSL_get_error(conn->ssl, n); if (err == SSL_ERROR_WANT_READ) { conn->ssl_want_read = 1; conn->ssl_want_write = 0; conn->ssl_state = SW_SSL_STATE_WAIT_STREAM; return SW_OK; } else if (err == SSL_ERROR_WANT_WRITE) { conn->ssl_want_read = 0; conn->ssl_want_write = 1; conn->ssl_state = SW_SSL_STATE_WAIT_STREAM; return SW_OK; } else if (err == SSL_ERROR_ZERO_RETURN) { swDebug("SSL_connect(fd=%d) closed.", conn->fd); return SW_ERR; } else if (err == SSL_ERROR_SYSCALL) { if (n) { SwooleG.error = errno; return SW_ERR; } } swWarn("SSL_connect(fd=%d) failed. Error: %s[%ld|%d].", conn->fd, ERR_reason_error_string(err), err, errno); return SW_ERR; }
swClient_ssl_verify
驗證鏈接的有效性:
// 鏈接創建後,必須檢查證書,以肯定它是否有效。 // 實際上,OpenSSL 爲咱們完成了這項任務。 // 若是證書有致命的問題(例如,哈希值無效),那麼將沒法創建鏈接。 // 可是,若是證書的問題並非致命的(當它已通過期或者尚不合法時),那麼仍能夠繼續使用鏈接。 // 能夠將 SSL 結構做爲唯一參數, // 調用 SSL_get_verify_result 來查明證書是否經過了 OpenSSL 的檢驗。 // 若是證書經過了包括信任檢查在內的 OpenSSL 的內部檢查,則返回 X509_V_OK。 // 若是有地方出了問題,則返回一個錯誤代碼,在 OpenSSL 文檔的 verify 部分中都進行了介紹。 // 注:該錯誤代碼被記錄在命令行工具的 verify 選項下。 // 應該注意的是,驗證失敗並不意味着鏈接不能使用。 // 是否應該使用鏈接取決於驗證結果和安全方面的考慮。 // 例如,失敗的信任驗證可能只是意味着沒有可信任的證書。 // 鏈接仍然可用,只是須要從思想上提升安全意識。 // OpenSSL 在對證書進行驗證時,有一些安全性檢查並無執行, // 包括證書的失效檢查和對證書中通用名的有效性驗證。 int swClient_ssl_verify(swClient *cli, int allow_self_signed) { if (swSSL_verify(cli->socket, allow_self_signed) < 0) { return SW_ERR; } if (cli->ssl_option.tls_host_name && swSSL_check_host(cli->socket, cli->ssl_option.tls_host_name) < 0) { return SW_ERR; } return SW_OK; } int swSSL_verify(swConnection *conn, int allow_self_signed) { int err = SSL_get_verify_result(conn->ssl); switch (err) { case X509_V_OK: return SW_OK; case X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT: if (allow_self_signed) { return SW_OK; } else { return SW_ERR; } default: swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SSL_VEFIRY_FAILED, "Could not verify peer: code:%d %s", err, X509_verify_cert_error_string(err)); return SW_ERR; } return SW_ERR; }
驗證了證書的有效性以後,還要驗證域名的統一。高版本能夠直接使用 X509_check_host
函數驗證。
int swSSL_check_host(swConnection *conn, char *tls_host_name) { X509 *cert = SSL_get_peer_certificate(conn->ssl); if (cert == NULL) { return SW_ERR; } #ifdef X509_CHECK_FLAG_ALWAYS_CHECK_SUBJECT /* X509_check_host() is only available in OpenSSL 1.0.2+ */ if (X509_check_host(cert, tls_host_name, strlen(tls_host_name), 0, NULL) != 1) { swWarn("X509_check_host(): no match"); goto failed; } goto found; #else ... failed: X509_free(cert); return SW_ERR; found: X509_free(cert); return SW_OK; }
swClient_udp_connect
數據報鏈接UDP
來講,swoole
並不會 connect
服務端進行三次握手,只是會調用 bind
來綁定本機的一個隨機端口,這個時候客戶端能夠接受任何服務端發來的消息。若是調用 connect
函數的時候指定了第 5 個參數爲 true
,那麼就會調用 connect
函數,這時候雖然客戶端仍然不會進行三次握手,可是卻只能接受特定服務端發來的消息
調用了
connect
以後的UDP
特色:
- 不須要給輸出操做指定目的IP和目的端口,寫到UDP的緩衝區裏的數據,將自動發送到你調用connect指定的IP和端口。
- 在一個已鏈接的UDP套接字上,內核由輸入操做返回的數據報只有那些來自connect所指定的協議地址的數據報。目的地爲這個已鏈接的UDP套接字的本地協議地址(IP和端口),遠端地址不是該套接字早先connect到的協議地址的數據報,不會投遞到該套接字。這樣就限制了已鏈接的UDP套接字能且只能與一個對端交換數據報。
- 由已鏈接的套接字引起的異步錯誤發回給他們所在的進程,而未鏈接的UDP套接字不接受任何異步錯誤。
- 讀寫的操做接口方法增多了,除了可使用sendto和recvfrom的接口外,還可使用tcp的那套操做接口--read/readv/readmsg和write/writev等
UDP
客戶端可選設置 onConnect
,socket
建立成功會當即回調 onConnect
static int swClient_udp_connect(swClient *cli, char *host, int port, double timeout, int udp_connect) { if (swClient_inet_addr(cli, host, port) < 0) { return SW_ERR; } cli->socket->active = 1; cli->timeout = timeout; int bufsize = SwooleG.socket_buffer_size; if (timeout > 0) { swSocket_set_timeout(cli->socket->fd, timeout); } if (cli->type == SW_SOCK_UNIX_DGRAM) { struct sockaddr_un* client_addr = &cli->socket->info.addr.un; sprintf(client_addr->sun_path, "/tmp/swoole-client.%d.%d.sock", getpid(), cli->socket->fd); client_addr->sun_family = AF_UNIX; unlink(client_addr->sun_path); if (bind(cli->socket->fd, (struct sockaddr *) client_addr, sizeof(cli->socket->info.addr.un)) < 0) { swSysError("bind(%s) failed.", client_addr->sun_path); return SW_ERR; } } if (udp_connect != 1) { goto connect_ok; } if (connect(cli->socket->fd, (struct sockaddr *) (&cli->server_addr), cli->server_addr.len) == 0) { swSocket_clean(cli->socket->fd); connect_ok: setsockopt(cli->socket->fd, SOL_SOCKET, SO_SNDBUF, &bufsize, sizeof(bufsize)); setsockopt(cli->socket->fd, SOL_SOCKET, SO_RCVBUF, &bufsize, sizeof(bufsize)); if (cli->async && cli->onConnect) { if (cli->reactor->add(cli->reactor, cli->socket->fd, cli->reactor_fdtype | SW_EVENT_READ) < 0) { return SW_ERR; } execute_onConnect(cli); } return SW_OK; } else { swSysError("connect() failed."); cli->socket->active = 0; cli->socket->removed = 1; return SW_ERR; } }
swoole_client->isConnected
異步鏈接斷定static PHP_METHOD(swoole_client, isConnected) { swClient *cli = (swClient *) swoole_get_object(getThis()); if (!cli) { RETURN_FALSE; } if (!cli->socket) { RETURN_FALSE; } RETURN_BOOL(cli->socket->active); }