上一章咱們說了客戶端的鏈接 connect
,對於同步客戶端來講,鏈接已經創建成功;可是對於異步客戶端來講,此時可能還在進行 DNS
的解析,onConnect
回調函數還未執行。php
本節中,咱們將繼續說明客戶端發送數據的流程,同時咱們能夠看到 TCP
異步客戶端執行 onConnect
回調函數的過程。react
send
入口本入口函數邏輯很是簡單,從 PHP
函數中獲取數據 data
,而後調用 connect
函數。緩存
static PHP_METHOD(swoole_client, send) { char *data; zend_size_t data_len; zend_long flags = 0; #ifdef FAST_ZPP ZEND_PARSE_PARAMETERS_START(1, 2) Z_PARAM_STRING(data, data_len) Z_PARAM_OPTIONAL Z_PARAM_LONG(flags) ZEND_PARSE_PARAMETERS_END(); #else if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|l", &data, &data_len, &flags) == FAILURE) { return; } #endif swClient *cli = client_get_ptr(getThis() TSRMLS_CC); //clear errno SwooleG.error = 0; int ret = cli->send(cli, data, data_len, flags); if (ret < 0) { swoole_php_sys_error(E_WARNING, "failed to send(%d) %zd bytes.", cli->socket->fd, data_len); zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC); RETVAL_FALSE; } else { RETURN_LONG(ret); } }
swClient_tcp_send_sync
同步 TCP
客戶端對於同步客戶端來講,發送數據是經過 swConnection_send
函數來進行阻塞式調用 send
,當返回的錯誤是 EAGAIN
的時候調用 swSocket_wait
等待 1s。swoole
static int swClient_tcp_send_sync(swClient *cli, char *data, int length, int flags) { int written = 0; int n; assert(length > 0); assert(data != NULL); while (written < length) { n = swConnection_send(cli->socket, data, length - written, flags); if (n < 0) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { swSocket_wait(cli->socket->fd, 1000, SW_EVENT_WRITE); continue; } else { SwooleG.error = errno; return SW_ERR; } } written += n; data += n; } return written; }
swClient_tcp_send_async
異步 TCP
客戶端因爲異步客戶端已經設置爲非阻塞,而且加入了 reactor
的監控,所以發送數據只須要 reactor->write
函數便可。當此時的套接字不可寫的時候,會自動放入 out_buff
緩衝區中。異步
當 out_buffer
大於高水線時,會自動調用 onBufferFull
回調函數。socket
static int swClient_tcp_send_async(swClient *cli, char *data, int length, int flags) { int n = length; if (cli->reactor->write(cli->reactor, cli->socket->fd, data, length) < 0) { if (SwooleG.error == SW_ERROR_OUTPUT_BUFFER_OVERFLOW) { n = -1; cli->socket->high_watermark = 1; } else { return SW_ERR; } } if (cli->onBufferFull && cli->socket->out_buffer && cli->socket->high_watermark == 0 && cli->socket->out_buffer->length >= cli->buffer_high_watermark) { cli->socket->high_watermark = 1; cli->onBufferFull(cli); } return n; }
swClient_udp_send
UDP
客戶端對於 UDP
客戶端來講,若是 Socket
緩存區塞滿,並不會像 TCP
進行等待 reactor
可寫狀態,而是直接返回告終果。對於異步客戶端來講,僅僅是非阻塞調用 sendto
函數。async
static int swClient_udp_send(swClient *cli, char *data, int len, int flags) { int n; n = sendto(cli->socket->fd, data, len, 0, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len); if (n < 0 || n < len) { return SW_ERR; } else { return n; } }
sendto
UDP
客戶端相似於 send
函數,sendto
函數專門針對 UDP
客戶端,與 send
函數不一樣的是,sendto
函數在底層套接字緩衝區塞滿的時候,會調用 swSocket_wait
進行阻塞等待。tcp
static PHP_METHOD(swoole_client, sendto) { char* ip; zend_size_t ip_len; long port; char *data; zend_size_t len; if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &ip, &ip_len, &port, &data, &len) == FAILURE) { return; } swClient *cli = (swClient *) swoole_get_object(getThis()); int ret; if (cli->type == SW_SOCK_UDP) { ret = swSocket_udp_sendto(cli->socket->fd, ip, port, data, len); } else if (cli->type == SW_SOCK_UDP6) { ret = swSocket_udp_sendto6(cli->socket->fd, ip, port, data, len); } else { swoole_php_fatal_error(E_WARNING, "only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6."); RETURN_FALSE; } SW_CHECK_RETURN(ret); } int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len) { struct sockaddr_in addr; if (inet_aton(dst_ip, &addr.sin_addr) == 0) { swWarn("ip[%s] is invalid.", dst_ip); return SW_ERR; } addr.sin_family = AF_INET; addr.sin_port = htons(dst_port); return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr)); } int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len) { struct sockaddr_in6 addr; bzero(&addr, sizeof(addr)); if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) < 0) { swWarn("ip[%s] is invalid.", dst_ip); return SW_ERR; } addr.sin6_port = (uint16_t) htons(dst_port); addr.sin6_family = AF_INET6; return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr)); } int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len) { int n = 0; while (1) { n = sendto(fd, __buf, __n, flag, __addr, __addr_len); if (n >= 0) { break; } else { if (errno == EINTR) { continue; } else if (swConnection_error(errno) == SW_WAIT) { swSocket_wait(fd, 1000, SW_EVENT_WRITE); continue; } else { break; } } } return n; }
swClient_onWrite
寫就緒狀態當 reactor
監控到套接字進入了寫就緒狀態時,就會調用 swClient_onWrite
函數。函數
從上一章咱們知道,異步客戶端創建鏈接過程當中 swoole
調用了 connect
函數,該返回 0,或者返回錯誤碼 EINPROGRESS
都會對寫就緒進行監控。不管哪一種狀況,swClient_onWrite
被調用就說明此時鏈接已經創建成功,三次握手已經完成,可是 cli->socket->active
仍是 0。ui
若是 cli->socket->active
爲 0,說明此時異步客戶端雖然創建了鏈接,可是尚未調用 onConnect
回調函數,所以這時要調用 execute_onConnect
函數。若是使用了 SSL
隧道加密,還要進行 SSL
握手,而且設置 _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM
。
當 active
爲 1 的時候,就能夠調用 swReactor_onWrite
來發送數據。
static int swClient_onWrite(swReactor *reactor, swEvent *event) { swClient *cli = event->socket->object; swConnection *_socket = cli->socket; if (cli->socket->active) { #ifdef SW_USE_OPENSSL if (cli->open_ssl && _socket->ssl_state == SW_SSL_STATE_WAIT_STREAM) { if (swClient_ssl_handshake(cli) < 0) { goto connect_fail; } else if (_socket->ssl_state == SW_SSL_STATE_READY) { goto connect_success; } else { if (_socket->ssl_want_read) { cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ); } return SW_OK; } } #endif if (swReactor_onWrite(cli->reactor, event) < 0) { return SW_ERR; } if (cli->onBufferEmpty && _socket->high_watermark && _socket->out_buffer->length <= cli->buffer_low_watermark) { _socket->high_watermark = 0; cli->onBufferEmpty(cli); } return SW_OK; } socklen_t len = sizeof(SwooleG.error); if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) < 0) { swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno); return SW_ERR; } //success if (SwooleG.error == 0) { //listen read event cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ); //connected _socket->active = 1; #ifdef SW_USE_OPENSSL if (cli->open_ssl) { if (swClient_enable_ssl_encrypt(cli) < 0) { goto connect_fail; } if (swClient_ssl_handshake(cli) < 0) { goto connect_fail; } else { _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM; } return SW_OK; } connect_success: #endif if (cli->onConnect) { execute_onConnect(cli); } } else { #ifdef SW_USE_OPENSSL connect_fail: #endif _socket->active = 0; cli->close(cli); if (cli->onError) { cli->onError(cli); } } return SW_OK; } static sw_inline void execute_onConnect(swClient *cli) { if (cli->timer) { swTimer_del(&SwooleG.timer, cli->timer); cli->timer = NULL; } cli->onConnect(cli); }
client_onConnect
static void client_onConnect(swClient *cli) { zval *zobject = (zval *) cli->object; #ifdef SW_USE_OPENSSL if (cli->ssl_wait_handshake) { client_execute_callback(zobject, SW_CLIENT_CB_onSSLReady); } else #endif if (!cli->redirect) { client_execute_callback(zobject, SW_CLIENT_CB_onConnect); } else { client_callback *cb = (client_callback *) swoole_get_property(zobject, 0); if (!cb || !cb->onReceive) { swoole_php_fatal_error(E_ERROR, "has no 'onReceive' callback function."); } } }